Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DuckDB Connection Pool Initialization Issue #319

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions packages/extension-driver-duckdb/src/lib/duckdbDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,13 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
// create new connection for each query
const parameters = Array.from(bindParams.values());
this.logRequest(firstDataSQL, parameters, options);
const connection = db.connect();
await this.loadExtensions(connection, configurationParameters);
await this.setExecConfig(connection);
if (restDataSQL) this.logRequest(restDataSQL, parameters, options);
const [firstData, restDataStream] = await this.acquireData(
firstDataSQL,
restDataSQL,
parameters,
db
db,
configurationParameters
);
const readable = this.createReadableStream(firstData, restDataStream);
return {
Expand Down Expand Up @@ -168,15 +166,24 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
firstDataSql: string,
restDataSql: string | undefined,
parameters: any[],
db: duckdb.Database
db: duckdb.Database,
configurationParameters: ConfigurationParameters
) {
// conn.all() is faster then stream.checkChunk().
// For the small size data we use conn.all() to get the data at once
// To limit memory use and prevent server crashes, we will use conn.all() to acquire the initial chunk of data, then conn.stream() to receive the remainder of the data.
const c1 = db.connect();
const c2 = db.connect();
await Promise.all([
await this.loadExtensions(c1, configurationParameters),
await this.setExecConfig(c1),
await this.loadExtensions(c2, configurationParameters),
await this.setExecConfig(c2),
]);

return await Promise.all([
new Promise<duckdb.TableData>((resolve, reject) => {
const c = db.connect();
c.all(
c1.all(
firstDataSql,
...parameters,
(err: duckdb.DuckDbError | null, res: duckdb.TableData) => {
Expand All @@ -190,8 +197,7 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
new Promise<duckdb.QueryResult | undefined>((resolve, reject) => {
if (!restDataSql) resolve(undefined);
try {
const c = db.connect();
const result = c.stream(restDataSql, ...parameters);
const result = c2.stream(restDataSql, ...parameters);
resolve(result);
} catch (err: any) {
reject(err);
Expand Down
61 changes: 38 additions & 23 deletions packages/extension-driver-duckdb/src/lib/duckdbExtensionLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,51 @@ export class DuckDBExtensionLoader {
return;
}
try {
conn.run(`LOAD ${extensionName}`);
await new Promise<void>((resolve, reject) => {
conn.run(`LOAD ${extensionName}`, (err: any) => {
if (err) reject(err);
this.logger.debug('Extension loaded');
resolve();
});
});
} catch (error) {
this.logger.debug(`Error when loading extension:${extensionName}`);
throw error;
}
await Promise.all(
Object.entries(extensionConfigurations).map(
async ([dbParameterName, configurationKey]) => {
const configurationValue =
this.configurations[
configurationKey as keyof ConfigurationParameters
];
// if configuration is not undefined
if (configurationValue !== undefined) {
return await new Promise<void>((resolve, reject) => {
conn.run(
`SET ${dbParameterName}='${configurationValue}'`,
(err: any) => {
if (err) {
this.logger.debug(
`Configuration error "${dbParameterName}": ${err}`
);
reject(err);
}

Object.entries(extensionConfigurations).forEach(
([dbParameterName, configurationKey]) => {
const configurationValue =
this.configurations[
configurationKey as keyof ConfigurationParameters
];
// if configuration is not undefined
if (configurationValue !== undefined) {
conn.run(
`SET ${dbParameterName}='${configurationValue}'`,
(err: any) => {
if (err) throw err;
this.logger.debug(
`Configuration error "${dbParameterName}": ${err}`
this.logger.debug(
`Configuration parameter "${dbParameterName}" set`
);
resolve();
}
);
}
);
this.logger.debug(`Configuration parameter "${dbParameterName}" set`);
} else {
this.logger.debug(
`Configuration "${dbParameterName}" has not been set`
);
});
} else {
this.logger.debug(
`Configuration "${dbParameterName}" has not been set`
);
}
}
}
)
);
}
}
11 changes: 8 additions & 3 deletions packages/extension-driver-duckdb/tests/duckdbDataSource.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ it('Should print queries without binding when log-queries = true', async () => {
profileName: 'mocked-profile',
});
// Assert
expect(/select \$1::INTEGER as test/.test(logs.slice(-1)[0][0])).toBe(true);
expect(
logs.flat(2).find((log) => /select \$1::INTEGER as test/.test(log))
).not.toBe(undefined);
});

it('Should print queries with binding when log-queries = true and log-parameters = true', async () => {
Expand Down Expand Up @@ -357,8 +359,11 @@ it('Should print queries with binding when log-queries = true and log-parameters
profileName: 'mocked-profile',
});
// Assert
expect(/select \$1::INTEGER as test/.test(logs.slice(-1)[0][0])).toBe(true);
expect(logs.slice(-1)[0][1]).toEqual([1234]);
expect(
logs.flat(2).find((log) => /select \$1::INTEGER as test/.test(log))
).not.toBe(undefined);

expect(logs.flat(2)).toContain(1234);
});

it('Should share db instances for same path besides in-memory only db', async () => {
Expand Down