Skip to content

Commit

Permalink
chore(wren-ai-service): add correlation ID to langfuse
Browse files Browse the repository at this point in the history
  • Loading branch information
grieve54706 committed Oct 30, 2024
1 parent 1860cf8 commit 2b5e265
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
10 changes: 6 additions & 4 deletions wren-ai-service/src/pipelines/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ async def _check_if_sql_executable(
project_id: str | None = None,
):
async with aiohttp.ClientSession() as session:
status, _, error = await self._engine.execute_sql(
status, _, addition = await self._engine.execute_sql(
sql,
session,
project_id=project_id,
)

if not status:
logger.exception(f"SQL is not executable: {error}")
logger.exception(f"SQL is not executable: {addition["error_message"]}")

return status

Expand Down Expand Up @@ -151,22 +151,24 @@ async def _task(result: Dict[str, str]):
quoted_sql, no_error = add_quotes(result["sql"])

if no_error:
status, _, error = await self._engine.execute_sql(
status, _, addition = await self._engine.execute_sql(
quoted_sql, session, project_id=project_id
)

if status:
valid_generation_results.append(
{
"sql": quoted_sql,
"correlation_id": addition["correlation_id"],
}
)
else:
invalid_generation_results.append(
{
"sql": quoted_sql,
"type": "DRY_RUN",
"error": error,
"error": addition["error_message"],
"correlation_id": addition["correlation_id"],
}
)
else:
Expand Down
9 changes: 7 additions & 2 deletions wren-ai-service/src/providers/engine/wren.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ async def execute_sql(
) as response:
res = await response.json()
if data := res.get("data"):
return True, data, None
return True, data, {
"correlation_id": res.get("correlationId"),
}
return (
False,
None,
res.get("errors", [{}])[0].get("message", "Unknown error"),
{
"error_message": res.get("errors", [{}])[0].get("message", "Unknown error"),
"correlation_id": res.get("extensions", {}).get("other", {}).get("correlationId"),
}
)


Expand Down
3 changes: 1 addition & 2 deletions wren-ui/src/apollo/server/resolvers/modelResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,14 +933,13 @@ export class ModelResolver {
? await ctx.projectService.getProjectById(projectId)
: await ctx.projectService.getCurrentProject();
const { manifest } = await ctx.deployService.getLastDeployment(project.id);
const previewRes = await ctx.queryService.preview(sql, {
return await ctx.queryService.preview(sql, {
project,
limit: limit,
modelingOnly: false,
manifest,
dryRun,
});
return dryRun ? { dryRun: 'success' } : previewRes;
}

public async getNativeSql(
Expand Down
18 changes: 12 additions & 6 deletions wren-ui/src/apollo/server/services/queryService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export interface ColumnMetadata {
export interface PreviewDataResponse {
correlationId?: string;
processTime?: string;
columns: ColumnMetadata[];
data: any[][];
columns?: ColumnMetadata[];
data?: any[][];
}

export interface DescribeStatementResponse {
Expand Down Expand Up @@ -117,8 +117,7 @@ export class QueryService implements IQueryService {
this.checkDataSourceIsSupported(dataSource);
logger.debug('Use ibis adaptor to preview');
if (dryRun) {
await this.ibisDryRun(sql, dataSource, connectionInfo, mdl);
return true;
return await this.ibisDryRun(sql, dataSource, connectionInfo, mdl);
} else {
return await this.ibisQuery(sql, dataSource, connectionInfo, mdl, limit);
}
Expand Down Expand Up @@ -178,7 +177,7 @@ export class QueryService implements IQueryService {
dataSource: DataSourceName,
connectionInfo: any,
mdl: Manifest
) {
): Promise<PreviewDataResponse> {
const event = TelemetryEvent.IBIS_DRY_RUN;
try {
const res = await this.ibisAdaptor.dryRun(sql, {
Expand All @@ -187,6 +186,9 @@ export class QueryService implements IQueryService {
mdl,
});
this.sendIbisEvent(event, res, { dataSource, sql });
return {
correlationId: res.correlationId,
}
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
Expand All @@ -209,7 +211,11 @@ export class QueryService implements IQueryService {
limit,
});
this.sendIbisEvent(event, res, { dataSource, sql });
return this.transformDataType(res);
const data = this.transformDataType(res);
return {
correlationId: res.correlationId,
...data
};
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
Expand Down

0 comments on commit 2b5e265

Please sign in to comment.