diff --git a/src/dialect/sqlite/sqlite-dialect-config.ts b/src/dialect/sqlite/sqlite-dialect-config.ts index 3b7ac0511..f9c6d90d7 100644 --- a/src/dialect/sqlite/sqlite-dialect-config.ts +++ b/src/dialect/sqlite/sqlite-dialect-config.ts @@ -41,4 +41,5 @@ export interface SqliteStatement { changes: number | bigint lastInsertRowid: number | bigint } + iterate(parameters: ReadonlyArray): IterableIterator } diff --git a/src/dialect/sqlite/sqlite-driver.ts b/src/dialect/sqlite/sqlite-driver.ts index 510848a1c..f39873913 100644 --- a/src/dialect/sqlite/sqlite-driver.ts +++ b/src/dialect/sqlite/sqlite-driver.ts @@ -3,6 +3,7 @@ import { QueryResult, } from '../../driver/database-connection.js' import { Driver } from '../../driver/driver.js' +import { SelectQueryNode } from '../../operation-node/select-query-node.js' import { CompiledQuery } from '../../query-compiler/compiled-query.js' import { freeze, isFunction } from '../../util/object-utils.js' import { SqliteDatabase, SqliteDialectConfig } from './sqlite-dialect-config.js' @@ -92,8 +93,22 @@ class SqliteConnection implements DatabaseConnection { } } - async *streamQuery(): AsyncIterableIterator> { - throw new Error("Sqlite driver doesn't support streaming") + async *streamQuery( + compiledQuery: CompiledQuery, + _chunkSize: number + ): AsyncIterableIterator> { + const { sql, parameters, query } = compiledQuery + const stmt = this.#db.prepare(sql) + if (SelectQueryNode.is(query)) { + const iter = stmt.iterate(parameters) as IterableIterator + for (const row of iter) { + yield { + rows: [row], + } + } + } else { + throw new Error('Sqlite driver only supports streaming of select queries') + } } } diff --git a/test/node/src/select.test.ts b/test/node/src/select.test.ts index b0442af12..71d4fc69e 100644 --- a/test/node/src/select.test.ts +++ b/test/node/src/select.test.ts @@ -879,8 +879,37 @@ for (const dialect of DIALECTS) { }) } - if (dialect === 'postgres' || dialect === 'mysql' || dialect === 'mssql') { - it('should stream results', async () => { + it('should stream results', async () => { + const males: unknown[] = [] + + const stream = ctx.db + .selectFrom('person') + .select(['first_name', 'last_name', 'gender']) + .where('gender', '=', 'male') + .orderBy('first_name') + .stream() + + for await (const male of stream) { + males.push(male) + } + + expect(males).to.have.length(2) + expect(males).to.eql([ + { + first_name: 'Arnold', + last_name: 'Schwarzenegger', + gender: 'male', + }, + { + first_name: 'Sylvester', + last_name: 'Stallone', + gender: 'male', + }, + ]) + }) + + if (dialect === 'postgres' || dialect === 'mssql') { + it('should stream results with a specific chunk size', async () => { const males: unknown[] = [] const stream = ctx.db @@ -888,7 +917,7 @@ for (const dialect of DIALECTS) { .select(['first_name', 'last_name', 'gender']) .where('gender', '=', 'male') .orderBy('first_name') - .stream() + .stream(1) for await (const male of stream) { males.push(male) @@ -908,82 +937,51 @@ for (const dialect of DIALECTS) { }, ]) }) + } - if (dialect === 'postgres' || dialect === 'mssql') { - it('should stream results with a specific chunk size', async () => { - const males: unknown[] = [] - - const stream = ctx.db - .selectFrom('person') - .select(['first_name', 'last_name', 'gender']) - .where('gender', '=', 'male') - .orderBy('first_name') - .stream(1) - - for await (const male of stream) { - males.push(male) - } - - expect(males).to.have.length(2) - expect(males).to.eql([ - { - first_name: 'Arnold', - last_name: 'Schwarzenegger', - gender: 'male', - }, - { - first_name: 'Sylvester', - last_name: 'Stallone', - gender: 'male', - }, - ]) - }) - } - - it('should release connection on premature async iterator stop', async () => { - for (let i = 0; i <= POOL_SIZE + 1; i++) { - const stream = ctx.db.selectFrom('person').selectAll().stream() + it('should release connection on premature async iterator stop', async () => { + for (let i = 0; i <= POOL_SIZE + 1; i++) { + const stream = ctx.db.selectFrom('person').selectAll().stream() - for await (const _ of stream) { - break - } + for await (const _ of stream) { + break } - }) + } + }) - it('should release connection on premature async iterator stop when using a specific chunk size', async () => { - for (let i = 0; i <= POOL_SIZE + 1; i++) { - const stream = ctx.db.selectFrom('person').selectAll().stream(1) + it('should release connection on premature async iterator stop when using a specific chunk size', async () => { + for (let i = 0; i <= POOL_SIZE + 1; i++) { + const stream = ctx.db.selectFrom('person').selectAll().stream(1) - for await (const _ of stream) { - break - } + for await (const _ of stream) { + break } - }) + } + }) - if (dialect === 'postgres') { - it('should throw an error if the cursor implementation is not provided for the postgres dialect', async () => { - const db = new Kysely({ - dialect: new PostgresDialect({ - pool: async () => new Pool(DIALECT_CONFIGS.postgres), - }), - plugins: PLUGINS, - }) + if (dialect === 'postgres') { + it('should throw an error if the cursor implementation is not provided for the postgres dialect', async () => { + const db = new Kysely({ + dialect: new PostgresDialect({ + pool: async () => new Pool(DIALECT_CONFIGS.postgres), + }), + plugins: PLUGINS, + }) - await expect( - (async () => { - for await (const _ of db - .selectFrom('person') - .selectAll() - .stream()) { - } - })() - ).to.be.rejectedWith( - "'cursor' is not present in your postgres dialect config. It's required to make streaming work in postgres." - ) + await expect( + (async () => { + for await (const _ of db + .selectFrom('person') + .selectAll() + .stream()) { + } + })() + ).to.be.rejectedWith( + "'cursor' is not present in your postgres dialect config. It's required to make streaming work in postgres." + ) - await db.destroy() - }) - } + await db.destroy() + }) } if (dialect !== 'mssql') {