Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream() support for sqlite dialect #754

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dialect/sqlite/sqlite-dialect-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ export interface SqliteStatement {
changes: number | bigint
lastInsertRowid: number | bigint
}
iterate(parameters: ReadonlyArray<unknown>): IterableIterator<unknown>
}
19 changes: 17 additions & 2 deletions src/dialect/sqlite/sqlite-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
QueryResult,
} from '../../driver/database-connection.js'
import { Driver } from '../../driver/driver.js'
import { SelectQueryNode } from '../../operation-node/select-query-node.js'
import { CompiledQuery } from '../../query-compiler/compiled-query.js'
import { freeze, isFunction } from '../../util/object-utils.js'
import { SqliteDatabase, SqliteDialectConfig } from './sqlite-dialect-config.js'
Expand Down Expand Up @@ -92,8 +93,22 @@ class SqliteConnection implements DatabaseConnection {
}
}

async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
throw new Error("Sqlite driver doesn't support streaming")
async *streamQuery<R>(
compiledQuery: CompiledQuery,
_chunkSize: number
igalklebanov marked this conversation as resolved.
Show resolved Hide resolved
): AsyncIterableIterator<QueryResult<R>> {
const { sql, parameters, query } = compiledQuery
const stmt = this.#db.prepare(sql)
if (SelectQueryNode.is(query)) {
const iter = stmt.iterate(parameters) as IterableIterator<R>
for (const row of iter) {
yield {
rows: [row],
}
}
} else {
throw new Error('Sqlite driver only supports streaming of select queries')
}
}
}

Expand Down
138 changes: 68 additions & 70 deletions test/node/src/select.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -879,16 +879,45 @@ for (const dialect of DIALECTS) {
})
}

if (dialect === 'postgres' || dialect === 'mysql' || dialect === 'mssql') {
igalklebanov marked this conversation as resolved.
Show resolved Hide resolved
it('should stream results', async () => {
it('should stream results', async () => {
const males: unknown[] = []

const stream = ctx.db
.selectFrom('person')
.select(['first_name', 'last_name', 'gender'])
.where('gender', '=', 'male')
.orderBy('first_name')
.stream()

for await (const male of stream) {
males.push(male)
}

expect(males).to.have.length(2)
expect(males).to.eql([
{
first_name: 'Arnold',
last_name: 'Schwarzenegger',
gender: 'male',
},
{
first_name: 'Sylvester',
last_name: 'Stallone',
gender: 'male',
},
])
})

if (dialect === 'postgres' || dialect === 'mssql') {
it('should stream results with a specific chunk size', async () => {
const males: unknown[] = []

const stream = ctx.db
.selectFrom('person')
.select(['first_name', 'last_name', 'gender'])
.where('gender', '=', 'male')
.orderBy('first_name')
.stream()
.stream(1)

for await (const male of stream) {
males.push(male)
Expand All @@ -908,82 +937,51 @@ for (const dialect of DIALECTS) {
},
])
})
}

if (dialect === 'postgres' || dialect === 'mssql') {
it('should stream results with a specific chunk size', async () => {
const males: unknown[] = []

const stream = ctx.db
.selectFrom('person')
.select(['first_name', 'last_name', 'gender'])
.where('gender', '=', 'male')
.orderBy('first_name')
.stream(1)

for await (const male of stream) {
males.push(male)
}

expect(males).to.have.length(2)
expect(males).to.eql([
{
first_name: 'Arnold',
last_name: 'Schwarzenegger',
gender: 'male',
},
{
first_name: 'Sylvester',
last_name: 'Stallone',
gender: 'male',
},
])
})
}

it('should release connection on premature async iterator stop', async () => {
for (let i = 0; i <= POOL_SIZE + 1; i++) {
const stream = ctx.db.selectFrom('person').selectAll().stream()
it('should release connection on premature async iterator stop', async () => {
for (let i = 0; i <= POOL_SIZE + 1; i++) {
const stream = ctx.db.selectFrom('person').selectAll().stream()

for await (const _ of stream) {
break
}
for await (const _ of stream) {
break
}
})
}
})

it('should release connection on premature async iterator stop when using a specific chunk size', async () => {
for (let i = 0; i <= POOL_SIZE + 1; i++) {
const stream = ctx.db.selectFrom('person').selectAll().stream(1)
it('should release connection on premature async iterator stop when using a specific chunk size', async () => {
for (let i = 0; i <= POOL_SIZE + 1; i++) {
const stream = ctx.db.selectFrom('person').selectAll().stream(1)

for await (const _ of stream) {
break
}
for await (const _ of stream) {
break
}
})
}
})

if (dialect === 'postgres') {
it('should throw an error if the cursor implementation is not provided for the postgres dialect', async () => {
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: async () => new Pool(DIALECT_CONFIGS.postgres),
}),
plugins: PLUGINS,
})
if (dialect === 'postgres') {
it('should throw an error if the cursor implementation is not provided for the postgres dialect', async () => {
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: async () => new Pool(DIALECT_CONFIGS.postgres),
}),
plugins: PLUGINS,
})

await expect(
(async () => {
for await (const _ of db
.selectFrom('person')
.selectAll()
.stream()) {
}
})()
).to.be.rejectedWith(
"'cursor' is not present in your postgres dialect config. It's required to make streaming work in postgres."
)
await expect(
(async () => {
for await (const _ of db
.selectFrom('person')
.selectAll()
.stream()) {
}
})()
).to.be.rejectedWith(
"'cursor' is not present in your postgres dialect config. It's required to make streaming work in postgres."
)

await db.destroy()
})
}
await db.destroy()
})
}

if (dialect !== 'mssql') {
Expand Down
Loading