From ab3ef77a70479b0266301a02767116d9c8ded3ca Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 12:40:43 +0000 Subject: [PATCH 1/6] feat(pg-native): pipeline mode --- packages/pg-native/index.js | 325 ++++++++++++++++++++++- packages/pg-native/package.json | 2 +- packages/pg-native/test/pipeline-mode.js | 233 ++++++++++++++++ 3 files changed, 556 insertions(+), 4 deletions(-) create mode 100644 packages/pg-native/test/pipeline-mode.js diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 8c83406bb..792738526 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -17,6 +17,7 @@ const Client = (module.exports = function (config) { this.pq = new Libpq() this._reading = false this._read = this._read.bind(this) + this._readPipeline = this._readPipeline.bind(this) // allow custom type conversion to be passed in this._types = config.types || types @@ -28,6 +29,14 @@ const Client = (module.exports = function (config) { this._rows = undefined this._results = undefined + // Pipeline mode configuration + this._pipelineMode = config.pipelineMode || false + this._pipelineMaxQueries = config.pipelineMaxQueries || 1000 + this._pipelineEnabled = false + this._pipelineQueue = [] // Queue of pending queries with callbacks + this._pipelinePendingCount = 0 // Count of sent but not yet resolved queries + this._pipelineCallbacks = [] // Callbacks for sent queries awaiting results + // lazy start the reader if notifications are listened for // this way if you only run sync queries you wont block // the event loop artificially @@ -43,20 +52,46 @@ const Client = (module.exports = function (config) { util.inherits(Client, EventEmitter) Client.prototype.connect = function (params, cb) { - this.pq.connect(params, cb) + if (typeof params === 'function') { + cb = params + params = undefined + } + + const self = this + this.pq.connect(params, function (err) { + if (err) return cb(err) + + // enter pipeline mode if enabled and supported + if (self._pipelineMode && self._pipelineModeSupported()) { + self._enterPipelineMode() + self._startPipelineReading() + } + + cb() + }) } Client.prototype.connectSync = function (params) { this.pq.connectSync(params) + + // enter pipeline mode if enabled and supported + if (this._pipelineMode && this._pipelineModeSupported()) { + this._enterPipelineMode() + this._startPipelineReading() + } } Client.prototype.query = function (text, values, cb) { - let queryFn - if (typeof values === 'function') { cb = values + values = undefined + } + + if (this._pipelineMode) { + return this._pipelineQuery(text, values, cb) } + let queryFn if (Array.isArray(values)) { queryFn = () => { return this.pq.sendQueryParams(text, values) @@ -329,3 +364,287 @@ Client.prototype._onReadyForQuery = function () { cb(err, rows || [], results) } } + +// Check if pipeline mode is supported +Client.prototype._pipelineModeSupported = function () { + return this.pq.pipelineModeSupported() +} + +// Enter pipeline mode - allows sending multiple queries without waiting for results +Client.prototype._enterPipelineMode = function () { + if (!this._pipelineModeSupported()) { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+') + } + const result = this.pq.enterPipelineMode() + if (result) { + this._pipelineEnabled = true + this.pq.setNonBlocking(true) + } + return result +} + +// Exit pipeline mode +Client.prototype._exitPipelineMode = function () { + if (!this._pipelineEnabled) { + return true + } + const result = this.pq.exitPipelineMode() + if (result) { + this._pipelineEnabled = false + this._pipelineQueue = [] + this._pipelinePendingCount = 0 + this._pipelineCallbacks = [] + } + return result +} + +// Get current pipeline status (0=off, 1=on, 2=aborted) +Client.prototype._pipelineStatus = function () { + return this.pq.pipelineStatus() +} + +// Send a sync point in the pipeline to trigger result delivery +Client.prototype._pipelineSync = function () { + return this.pq.pipelineSync() +} + +// Execute a query in pipeline mode +// Called by query() when pipelineMode is enabled +Client.prototype._pipelineQuery = function (text, values, cb) { + // if pipeline mode is not enabled but was configured, auto-enter + if (this._pipelineMode && !this._pipelineEnabled) { + this._enterPipelineMode() + this._startPipelineReading() + } + + if (!this._pipelineEnabled) { + const err = new Error('Pipeline mode is not enabled. Set pipelineMode: true in config.') + if (cb) return setImmediate(() => cb(err)) + return Promise.reject(err) + } + + // Check if we need to apply backpressure + if (this._pipelinePendingCount >= this._pipelineMaxQueries) { + // Queue the query for later execution + const queued = { text, values, cb } + this._pipelineQueue.push(queued) + + if (!cb) { + return new Promise((resolve, reject) => { + queued.cb = (err, rows, result) => { + if (err) reject(err) + else resolve(rows) + } + }) + } + return + } + + // Send the query - always use sendQueryParams in pipeline mode + // (sendQuery is not allowed in pipeline mode) + const params = Array.isArray(values) ? values : [] + const sent = this.pq.sendQueryParams(text, params) + + if (!sent) { + const err = new Error(this.pq.errorMessage() || 'Failed to send query in pipeline mode') + if (cb) return setImmediate(() => cb(err)) + return Promise.reject(err) + } + + this._pipelinePendingCount++ + + // Send sync to mark this query and trigger result delivery + this._pipelineSync() + + // Flush to ensure the query and sync are sent + this.pq.flush() + + if (cb) { + this._pipelineCallbacks.push({ cb, resultCount: 0, rows: undefined, results: undefined, error: undefined }) + return + } + + return new Promise((resolve, reject) => { + this._pipelineCallbacks.push({ + cb: (err, rows, result) => { + if (err) reject(err) + else resolve(rows) + }, + resultCount: 0, + rows: undefined, + results: undefined, + error: undefined, + }) + }) +} + +// Flush and wait for all pending pipeline queries to complete +Client.prototype._pipelineFlush = function (cb) { + if (!this._pipelineEnabled) { + if (cb) return setImmediate(cb) + return Promise.resolve() + } + + // Send sync to mark end of current batch + this._pipelineSync() + this.pq.flush() + + // Add a sync callback marker + if (cb) { + this._pipelineCallbacks.push({ cb, isSync: true }) + return + } + + return new Promise((resolve, reject) => { + this._pipelineCallbacks.push({ + cb: (err) => { + if (err) reject(err) + else resolve() + }, + isSync: true, + }) + }) +} + +// Start reading for pipeline mode +Client.prototype._startPipelineReading = function () { + if (this._reading) return + this._reading = true + this.pq.on('readable', this._readPipeline) + this.pq.startReader() +} + +// Stop reading for pipeline mode +Client.prototype._stopPipelineReading = function () { + if (!this._reading) return + this._reading = false + this.pq.stopReader() + this.pq.removeListener('readable', this._readPipeline) +} + +// Read handler specificaly for pipeline mode +Client.prototype._readPipeline = function () { + const pq = this.pq + + // Read waiting data from the socket + if (!pq.consumeInput()) { + // Read error and notify all pending callbacks + const err = new Error(pq.errorMessage()) + this._pipelineCallbacks.forEach((pending) => { + if (pending.cb) pending.cb(err) + }) + this._pipelineCallbacks = [] + this._pipelinePendingCount = 0 + return + } + + // Process all available results + // In pipeline mode, getResult() returns false for NULL markers between results + // We should only break when isBusy() becomes true + let loopCount = 0 + const maxLoops = 1000 // Safety limit + while (!pq.isBusy() && loopCount < maxLoops) { + loopCount++ + const hasResult = pq.getResult() + if (!hasResult) { + // NULL result - this is normal between query results and sync markers + // Check if there are more pending callbacks, if so try again + if (this._pipelineCallbacks.length === 0) { + break + } + // Try one more time in case there's another result + continue + } + + const status = pq.resultStatus() + + // Handle pipeline sync point + if (status === 'PGRES_PIPELINE_SYNC') { + // Find and call the sync callback + const idx = this._pipelineCallbacks.findIndex((p) => p.isSync) + if (idx !== -1) { + const syncCb = this._pipelineCallbacks.splice(idx, 1)[0] + if (syncCb.cb) syncCb.cb() + } + continue + } + + // Get the next pending callbacks + if (this._pipelineCallbacks.length === 0) { + continue + } + + const pending = this._pipelineCallbacks[0] + if (pending.isSync) { + continue + } + + if (status === 'PGRES_FATAL_ERROR') { + pending.error = new Error(pq.resultErrorMessage()) + } else if (status === 'PGRES_TUPLES_OK' || status === 'PGRES_COMMAND_OK' || status === 'PGRES_EMPTY_QUERY') { + const result = this._consumeQueryResults(pq) + if (pending.resultCount === 0) { + pending.rows = result.rows + pending.results = result + } else if (pending.resultCount === 1) { + pending.rows = [pending.rows, result.rows] + pending.results = [pending.results, result] + } else { + pending.rows.push(result.rows) + pending.results.push(result) + } + pending.resultCount++ + } + + // Check if we need to get more results for this query (null result marks end) + // For pipeline mode, each query gets exactly one result set typically + // We complete the callback and move to the next one + this._pipelineCallbacks.shift() + this._pipelinePendingCount-- + + if (pending.cb) { + pending.cb(pending.error, pending.rows || [], pending.results) + } + + // Process queued queries now that we have room + this._processQueuedPipelineQueries() + } + + // Check for notifications + let notice = pq.notifies() + while (notice) { + this.emit('notification', notice) + notice = pq.notifies() + } +} + +// Process queued pipeline queries when there's room +Client.prototype._processQueuedPipelineQueries = function () { + while (this._pipelineQueue.length > 0 && this._pipelinePendingCount < this._pipelineMaxQueries) { + const queued = this._pipelineQueue.shift() + + // Always use sendQueryParams in pipeline mode + const params = Array.isArray(queued.values) ? queued.values : [] + const sent = this.pq.sendQueryParams(queued.text, params) + + if (!sent) { + const err = new Error(this.pq.errorMessage() || 'Failed to send queued query in pipeline mode') + if (queued.cb) queued.cb(err) + continue + } + + this._pipelinePendingCount++ + + // Send sync to trigger result delivery + this._pipelineSync() + this.pq.flush() + + this._pipelineCallbacks.push({ + cb: queued.cb, + resultCount: 0, + rows: undefined, + results: undefined, + error: undefined, + }) + } +} diff --git a/packages/pg-native/package.json b/packages/pg-native/package.json index 92bf5cac2..1c8a2f8b7 100644 --- a/packages/pg-native/package.json +++ b/packages/pg-native/package.json @@ -34,7 +34,7 @@ }, "homepage": "https://github.com/brianc/node-postgres/tree/master/packages/pg-native", "dependencies": { - "libpq": "^1.8.15", + "libpq": "^1.9.0", "pg-types": "2.2.0" }, "devDependencies": { diff --git a/packages/pg-native/test/pipeline-mode.js b/packages/pg-native/test/pipeline-mode.js new file mode 100644 index 000000000..82990d437 --- /dev/null +++ b/packages/pg-native/test/pipeline-mode.js @@ -0,0 +1,233 @@ +const Client = require('../') +const assert = require('assert') + +describe('pipeline mode', function () { + this.timeout(10000) + + describe('pipeline configuration options', function () { + it('accepts pipelineMode option', function () { + const client = new Client({ pipelineMode: true }) + assert.strictEqual(client._pipelineMode, true) + }) + + it('accepts pipelineMaxQueries option', function () { + const client = new Client({ pipelineMaxQueries: 100 }) + assert.strictEqual(client._pipelineMaxQueries, 100) + }) + + it('defaults pipelineMaxQueries to 1000', function () { + const client = new Client() + assert.strictEqual(client._pipelineMaxQueries, 1000) + }) + + it('defaults pipelineMode to false', function () { + const client = new Client() + assert.strictEqual(client._pipelineMode, false) + }) + }) + + describe('pipeline operations', function () { + let client + + before(function (done) { + client = new Client({ pipelineMode: true }) + client.connectSync() + + // Check if pipeline mode was successfully enabled + if (!client._pipelineEnabled) { + console.log('Pipeline mode not supported. Skipping pipeline tests.') + client.end() + this.skip() + return done() + } + + done() + }) + + after(function (done) { + if (client && client.pq.connected) { + client.end(done) + } else { + done() + } + }) + + describe('query with callbacks', function () { + it('can execute a single query in pipeline mode', function (done) { + client.query('SELECT 1 as num', function (err, rows) { + if (err) return done(err) + assert.strictEqual(rows[0].num, 1) + done() + }) + }) + + it('can execute a parameterized query in pipeline mode', function (done) { + client.query('SELECT $1::int as num', [42], function (err, rows) { + if (err) return done(err) + assert.strictEqual(rows[0].num, 42) + done() + }) + }) + + it('can execute multiple queries in pipeline mode', function (done) { + let completed = 0 + const results = [] + + const checkComplete = function () { + completed++ + if (completed === 3) { + assert.deepStrictEqual(results, [1, 2, 3]) + done() + } + } + + client.query('SELECT 1 as num', function (err, rows) { + if (err) return done(err) + results.push(rows[0].num) + checkComplete() + }) + + client.query('SELECT 2 as num', function (err, rows) { + if (err) return done(err) + results.push(rows[0].num) + checkComplete() + }) + + client.query('SELECT 3 as num', function (err, rows) { + if (err) return done(err) + results.push(rows[0].num) + checkComplete() + }) + }) + + it('handles query errors in pipeline mode', function (done) { + client.query('SELECT invalid_column FROM nonexistent_table', function (err) { + assert(err instanceof Error, 'Should return an error') + done() + }) + }) + }) + + describe('query with promises', function () { + it('returns a promise when no callback provided', async function () { + const rows = await client.query('SELECT 1 as num') + assert.strictEqual(rows[0].num, 1) + }) + + it('can execute parameterized queries with promises', async function () { + const rows = await client.query('SELECT $1::text as name', ['test']) + assert.strictEqual(rows[0].name, 'test') + }) + + it('rejects promise on query error', async function () { + try { + await client.query('SELECT invalid') + assert.fail('Should have thrown an error') + } catch (err) { + assert(err instanceof Error) + } + }) + }) + }) + + describe('backpressure', function () { + let client + + before(function (done) { + client = new Client({ pipelineMode: true, pipelineMaxQueries: 5 }) + client.connectSync() + + if (!client._pipelineEnabled) { + client.end() + this.skip() + return done() + } + + done() + }) + + after(function (done) { + if (client && client.pq.connected) { + client.end(done) + } else { + done() + } + }) + + it('queues queries when exceeding pipelineMaxQueries', async function () { + // With pipelineMaxQueries=5, sending 10 queries should queue 5 + const promises = [] + for (let i = 1; i <= 10; i++) { + promises.push(client.query('SELECT $1::int as num', [i])) + } + + const results = await Promise.all(promises) + + assert.strictEqual(results.length, 10) + for (let i = 0; i < 10; i++) { + assert.strictEqual(results[i][0].num, i + 1) + } + }) + + it('correctly tracks pending query count', async function () { + // Start fresh - exit and re-enter pipeline mode + client._exitPipelineMode() + client._enterPipelineMode() + client._startPipelineReading() + + assert.strictEqual(client._pipelinePendingCount, 0) + assert.strictEqual(client._pipelineQueue.length, 0) + + // Send 3 queries (under the limit of 5) + const p1 = client.query('SELECT 1') + const p2 = client.query('SELECT 2') + const p3 = client.query('SELECT 3') + + // All should be sent, none queued + assert.strictEqual(client._pipelinePendingCount, 3) + assert.strictEqual(client._pipelineQueue.length, 0) + + await Promise.all([p1, p2, p3]) + }) + }) + + describe('comparison with non-pipeline mode', function () { + it('standard client works without pipeline mode', function (done) { + const standardClient = new Client() + standardClient.connectSync() + + standardClient.query('SELECT 1 as num', function (err, rows) { + if (err) { + standardClient.end() + return done(err) + } + assert.strictEqual(rows[0].num, 1) + standardClient.end(done) + }) + }) + + it('pipeline mode client can execute many queries faster', async function () { + this.timeout(20000) + + const pipelineClient = new Client({ pipelineMode: true }) + pipelineClient.connectSync() + + if (!pipelineClient._pipelineEnabled) { + pipelineClient.end() + this.skip() + return + } + + const count = 100 + const promises = [] + for (let i = 0; i < count; i++) { + promises.push(pipelineClient.query('SELECT $1::int as num', [i])) + } + + const results = await Promise.all(promises) + assert.strictEqual(results.length, count) + + pipelineClient.end() + }) + }) +}) From 5c2b229d37372d743a8a6f12f9ce7725d0dd60d5 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 12:59:44 +0000 Subject: [PATCH 2/6] perf: add benchmark --- packages/pg-native/bench/index.js | 184 ++++++++++++++++++++++++------ 1 file changed, 148 insertions(+), 36 deletions(-) diff --git a/packages/pg-native/bench/index.js b/packages/pg-native/bench/index.js index 6f6641ccc..8247bd12b 100644 --- a/packages/pg-native/bench/index.js +++ b/packages/pg-native/bench/index.js @@ -1,52 +1,164 @@ const pg = require('pg').native const Native = require('../') -const warmup = function (fn, cb) { +const queryText = 'SELECT generate_series(0, 1000) as X, generate_series(0, 1000) as Y, generate_series(0, 1000) as Z' +const simpleQuery = 'SELECT 1' + +const promisify = (client, text) => { + return new Promise((resolve, reject) => { + client.query(text, (err, result) => { + if (err) reject(err) + else resolve(result) + }) + }) +} + +const bench = async (name, fn, durationMs) => { + const start = performance.now() let count = 0 - const max = 10 - const run = function (err) { - if (err) return cb(err) + while (performance.now() - start < durationMs) { + await fn() + count++ + } + const elapsed = performance.now() - start + const qps = Math.round((count / elapsed) * 1000) + return { name, count, elapsed, qps } +} + +const run = async () => { + // Setup clients + const pureClient = new pg.Client() + await new Promise((resolve, reject) => { + pureClient.connect((err) => (err ? reject(err) : resolve())) + }) + + const native = Native() + native.connectSync() + + const nativePipeline = Native({ pipelineMode: true }) + nativePipeline.connectSync() + const pipelineSupported = nativePipeline._pipelineEnabled + + console.log('='.repeat(60)) + console.log('pg-native Benchmark') + console.log('='.repeat(60)) + console.log(`Pipeline mode supported: ${pipelineSupported}`) + console.log('') + + const results = { + simple: {}, + complex: {}, + } + + const warmupMs = 1000 + const benchMs = 5000 + const iterations = 3 + + // Warmup + console.log('Warming up...') + await bench('warmup', () => promisify(pureClient, simpleQuery), warmupMs) + await bench('warmup', () => promisify(native, simpleQuery), warmupMs) + if (pipelineSupported) { + await bench('warmup', () => promisify(nativePipeline, simpleQuery), warmupMs) + } + console.log('Warmup complete.\n') + + // Run benchmarks + for (let i = 0; i < iterations; i++) { + console.log(`--- Iteration ${i + 1}/${iterations} ---`) + + // Simple query benchmarks + console.log('\nSimple query (SELECT 1):') - if (max >= count++) { - return fn(run) + let result = await bench('pg.native', () => promisify(pureClient, simpleQuery), benchMs) + console.log(` pg.native: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.simple['pg.native'] = results.simple['pg.native'] || [] + results.simple['pg.native'].push(result.qps) + + result = await bench('Native', () => promisify(native, simpleQuery), benchMs) + console.log(` Native: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.simple['Native'] = results.simple['Native'] || [] + results.simple['Native'].push(result.qps) + + if (pipelineSupported) { + result = await bench('Native+Pipeline', () => promisify(nativePipeline, simpleQuery), benchMs) + console.log(` Native+Pipeline: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.simple['Native+Pipeline'] = results.simple['Native+Pipeline'] || [] + results.simple['Native+Pipeline'].push(result.qps) } - cb() + // Complex query benchmarks + console.log('\nComplex query (generate_series):') + + result = await bench('pg.native', () => promisify(pureClient, queryText), benchMs) + console.log(` pg.native: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.complex['pg.native'] = results.complex['pg.native'] || [] + results.complex['pg.native'].push(result.qps) + + result = await bench('Native', () => promisify(native, queryText), benchMs) + console.log(` Native: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.complex['Native'] = results.complex['Native'] || [] + results.complex['Native'].push(result.qps) + + if (pipelineSupported) { + result = await bench('Native+Pipeline', () => promisify(nativePipeline, queryText), benchMs) + console.log(` Native+Pipeline: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.complex['Native+Pipeline'] = results.complex['Native+Pipeline'] || [] + results.complex['Native+Pipeline'].push(result.qps) + } + + console.log('') } - run() -} -const native = Native() -native.connectSync() + // Summary + console.log('='.repeat(60)) + console.log('SUMMARY (average QPS over', iterations, 'iterations)') + console.log('='.repeat(60)) -const queryText = 'SELECT generate_series(0, 1000) as X, generate_series(0, 1000) as Y, generate_series(0, 1000) as Z' -const client = new pg.Client() -client.connect(function () { - const pure = function (cb) { - client.query(queryText, function (err) { - if (err) throw err - cb(err) - }) + const avg = (arr) => Math.round(arr.reduce((a, b) => a + b, 0) / arr.length) + + console.log('\nSimple query (SELECT 1):') + for (const [name, qps] of Object.entries(results.simple)) { + const avgQps = avg(qps) + const improvement = + name !== 'pg.native' + ? ` (${((avgQps / avg(results.simple['pg.native']) - 1) * 100).toFixed(1)}% vs pg.native)` + : '' + console.log(` ${name.padEnd(18)} ${avgQps} qps${improvement}`) } - const nativeQuery = function (cb) { - native.query(queryText, function (err) { - if (err) throw err - cb(err) - }) + + console.log('\nComplex query (generate_series):') + for (const [name, qps] of Object.entries(results.complex)) { + const avgQps = avg(qps) + const improvement = + name !== 'pg.native' + ? ` (${((avgQps / avg(results.complex['pg.native']) - 1) * 100).toFixed(1)}% vs pg.native)` + : '' + console.log(` ${name.padEnd(18)} ${avgQps} qps${improvement}`) } - const run = function () { - console.time('pure') - warmup(pure, function () { - console.timeEnd('pure') - console.time('native') - warmup(nativeQuery, function () { - console.timeEnd('native') - }) - }) + if (pipelineSupported) { + const pipelineVsNativeSimple = ( + (avg(results.simple['Native+Pipeline']) / avg(results.simple['Native']) - 1) * + 100 + ).toFixed(1) + const pipelineVsNativeComplex = ( + (avg(results.complex['Native+Pipeline']) / avg(results.complex['Native']) - 1) * + 100 + ).toFixed(1) + console.log('\nPipeline mode impact:') + console.log(` Simple query: ${pipelineVsNativeSimple}% vs Native without pipeline`) + console.log(` Complex query: ${pipelineVsNativeComplex}% vs Native without pipeline`) } - setInterval(function () { - run() - }, 500) + console.log('\n' + '='.repeat(60)) + + // Cleanup + await new Promise((resolve) => pureClient.end(resolve)) + process.exit(0) +} + +run().catch((e) => { + console.error(e) + process.exit(1) }) From 99bb22b5ad92d4e1d16e99f87676cfc1156b3fb4 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 13:05:32 +0000 Subject: [PATCH 3/6] feat(bench): add concurrent query benchmarks for pg.native and Native+Pipeline --- packages/pg-native/bench/index.js | 58 +++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/packages/pg-native/bench/index.js b/packages/pg-native/bench/index.js index 8247bd12b..ecddbd4be 100644 --- a/packages/pg-native/bench/index.js +++ b/packages/pg-native/bench/index.js @@ -48,11 +48,13 @@ const run = async () => { const results = { simple: {}, complex: {}, + concurrent: {}, } const warmupMs = 1000 const benchMs = 5000 const iterations = 3 + const concurrentBatchSize = 10 // Warmup console.log('Warming up...') @@ -107,6 +109,40 @@ const run = async () => { results.complex['Native+Pipeline'].push(result.qps) } + // Concurrent query benchmarks (where pipeline mode shines) + console.log(`\nConcurrent queries (${concurrentBatchSize} queries in parallel):`) + + const concurrentBench = async (name, client, durationMs) => { + const start = performance.now() + let count = 0 + while (performance.now() - start < durationMs) { + const promises = [] + for (let j = 0; j < concurrentBatchSize; j++) { + promises.push(promisify(client, simpleQuery)) + } + await Promise.all(promises) + count += concurrentBatchSize + } + const elapsed = performance.now() - start + const qps = Math.round((count / elapsed) * 1000) + return { name, count, elapsed, qps } + } + + result = await concurrentBench('pg.native', pureClient, benchMs) + console.log(` pg.native: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.concurrent['pg.native'] = results.concurrent['pg.native'] || [] + results.concurrent['pg.native'].push(result.qps) + + // Native without pipeline doesn't support concurrent queries on same connection + console.log(` Native: N/A (concurrent queries not supported without pipeline)`) + + if (pipelineSupported) { + result = await concurrentBench('Native+Pipeline', nativePipeline, benchMs) + console.log(` Native+Pipeline: ${result.qps} qps (${result.count} queries in ${Math.round(result.elapsed)}ms)`) + results.concurrent['Native+Pipeline'] = results.concurrent['Native+Pipeline'] || [] + results.concurrent['Native+Pipeline'].push(result.qps) + } + console.log('') } @@ -137,6 +173,17 @@ const run = async () => { console.log(` ${name.padEnd(18)} ${avgQps} qps${improvement}`) } + console.log(`\nConcurrent queries (${concurrentBatchSize} in parallel):`) + for (const [name, qps] of Object.entries(results.concurrent)) { + const avgQps = avg(qps) + const improvement = + name !== 'pg.native' + ? ` (${((avgQps / avg(results.concurrent['pg.native']) - 1) * 100).toFixed(1)}% vs pg.native)` + : '' + console.log(` ${name.padEnd(18)} ${avgQps} qps${improvement}`) + } + console.log(` ${'Native'.padEnd(18)} N/A (not supported)`) + if (pipelineSupported) { const pipelineVsNativeSimple = ( (avg(results.simple['Native+Pipeline']) / avg(results.simple['Native']) - 1) * @@ -146,9 +193,14 @@ const run = async () => { (avg(results.complex['Native+Pipeline']) / avg(results.complex['Native']) - 1) * 100 ).toFixed(1) - console.log('\nPipeline mode impact:') - console.log(` Simple query: ${pipelineVsNativeSimple}% vs Native without pipeline`) - console.log(` Complex query: ${pipelineVsNativeComplex}% vs Native without pipeline`) + const pipelineVsPgNativeConcurrent = ( + (avg(results.concurrent['Native+Pipeline']) / avg(results.concurrent['pg.native']) - 1) * + 100 + ).toFixed(1) + console.log('\nPipeline mode impact (vs Native without pipeline):') + console.log(` Simple query: ${pipelineVsNativeSimple}%`) + console.log(` Complex query: ${pipelineVsNativeComplex}%`) + console.log(` Concurrent queries: ${pipelineVsPgNativeConcurrent}% (vs pg.native, Native N/A)`) } console.log('\n' + '='.repeat(60)) From 8afffbbcf0d2d3b30225985d0e57602ea5291327 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 14:00:13 +0000 Subject: [PATCH 4/6] fix: potential Memory Leak in Error Handling --- packages/pg-native/index.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 792738526..5e4111115 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -533,8 +533,13 @@ Client.prototype._readPipeline = function () { this._pipelineCallbacks.forEach((pending) => { if (pending.cb) pending.cb(err) }) + // Also notify queued queries + this._pipelineQueue.forEach((queued) => { + if (queued.cb) queued.cb(err) + }) this._pipelineCallbacks = [] this._pipelinePendingCount = 0 + this._pipelineQueue = [] return } From 531b2d9caa1555e731b5abcb2c0d9c61ffaa8c92 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 14:03:15 +0000 Subject: [PATCH 5/6] chore: read loop exceeded error --- packages/pg-native/index.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 5e4111115..89358cfae 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -615,6 +615,10 @@ Client.prototype._readPipeline = function () { this._processQueuedPipelineQueries() } + if (loopCount >= maxLoops) { + this.emit('error', new Error('Pipeline read loop exceeded max iterations - possible infinite loop detected')) + } + // Check for notifications let notice = pq.notifies() while (notice) { From a4b0cc8a0721dd6a460051fd2fbe3b173fb401ac Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Sat, 7 Feb 2026 14:06:06 +0000 Subject: [PATCH 6/6] chore: notify all pending callbacks with an error before clearing --- packages/pg-native/index.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 89358cfae..bed2fadaf 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -390,6 +390,11 @@ Client.prototype._exitPipelineMode = function () { } const result = this.pq.exitPipelineMode() if (result) { + // Notify all pending callbacks with an error before clearing + const err = new Error('Pipeline mode exited with pending queries') + this._pipelineCallbacks.forEach((p) => p.cb && p.cb(err)) + this._pipelineQueue.forEach((q) => q.cb && q.cb(err)) + this._pipelineEnabled = false this._pipelineQueue = [] this._pipelinePendingCount = 0