Skip to content

Commit

Permalink
chore(wren-ai-service): add correlation ID to langfuse (#846)
Browse files Browse the repository at this point in the history
Co-authored-by: Chih-Yu Yeh <chihyu.jimmy.yeh@gmail.com>
  • Loading branch information
grieve54706 and cyyeh authored Nov 1, 2024
1 parent 57e9a0d commit a3b2c38
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
10 changes: 6 additions & 4 deletions wren-ai-service/src/pipelines/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ async def _check_if_sql_executable(
project_id: str | None = None,
):
async with aiohttp.ClientSession() as session:
status, _, error = await self._engine.execute_sql(
status, _, addition = await self._engine.execute_sql(
sql,
session,
project_id=project_id,
)

if not status:
logger.exception(f"SQL is not executable: {error}")
logger.exception(f"SQL is not executable: {addition["error_message"]}")

return status

Expand Down Expand Up @@ -165,22 +165,24 @@ async def _task(result: Dict[str, str]):
quoted_sql, no_error = add_quotes(result["sql"])

if no_error:
status, _, error = await self._engine.execute_sql(
status, _, addition = await self._engine.execute_sql(
quoted_sql, session, project_id=project_id
)

if status:
valid_generation_results.append(
{
"sql": quoted_sql,
"correlation_id": addition["correlation_id"],
}
)
else:
invalid_generation_results.append(
{
"sql": quoted_sql,
"type": "DRY_RUN",
"error": error,
"error": addition["error_message"],
"correlation_id": addition["correlation_id"],
}
)
else:
Expand Down
9 changes: 7 additions & 2 deletions wren-ai-service/src/providers/engine/wren.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ async def execute_sql(
) as response:
res = await response.json()
if data := res.get("data"):
return True, data, None
return True, data, {
"correlation_id": res.get("correlationId"),
}
return (
False,
None,
res.get("errors", [{}])[0].get("message", "Unknown error"),
{
"error_message": res.get("errors", [{}])[0].get("message", "Unknown error"),
"correlation_id": res.get("extensions", {}).get("other", {}).get("correlationId"),
}
)
except asyncio.TimeoutError:
return False, None, f"Request timed out: {timeout} seconds"
Expand Down
3 changes: 1 addition & 2 deletions wren-ui/src/apollo/server/resolvers/modelResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,14 +933,13 @@ export class ModelResolver {
? await ctx.projectService.getProjectById(projectId)
: await ctx.projectService.getCurrentProject();
const { manifest } = await ctx.deployService.getLastDeployment(project.id);
const previewRes = await ctx.queryService.preview(sql, {
return await ctx.queryService.preview(sql, {
project,
limit: limit,
modelingOnly: false,
manifest,
dryRun,
});
return dryRun ? { dryRun: 'success' } : previewRes;
}

public async getNativeSql(
Expand Down
22 changes: 13 additions & 9 deletions wren-ui/src/apollo/server/services/queryService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ export interface ColumnMetadata {
type: string;
}

export interface PreviewDataResponse {
correlationId?: string;
processTime?: string;
export interface PreviewDataResponse extends IbisResponse {
columns: ColumnMetadata[];
data: any[][];
}
Expand Down Expand Up @@ -60,7 +58,7 @@ export interface IQueryService {
preview(
sql: string,
options: PreviewOptions,
): Promise<PreviewDataResponse | boolean>;
): Promise<IbisResponse| PreviewDataResponse | boolean>;

describeStatement(
sql: string,
Expand Down Expand Up @@ -97,7 +95,7 @@ export class QueryService implements IQueryService {
public async preview(
sql: string,
options: PreviewOptions,
): Promise<PreviewDataResponse | boolean> {
): Promise<IbisResponse | PreviewDataResponse | boolean> {
const { project, manifest: mdl, limit, dryRun } = options;
const { type: dataSource, connectionInfo } = project;
if (this.useEngine(dataSource)) {
Expand All @@ -117,8 +115,7 @@ export class QueryService implements IQueryService {
this.checkDataSourceIsSupported(dataSource);
logger.debug('Use ibis adaptor to preview');
if (dryRun) {
await this.ibisDryRun(sql, dataSource, connectionInfo, mdl);
return true;
return await this.ibisDryRun(sql, dataSource, connectionInfo, mdl);
} else {
return await this.ibisQuery(sql, dataSource, connectionInfo, mdl, limit);
}
Expand Down Expand Up @@ -178,7 +175,7 @@ export class QueryService implements IQueryService {
dataSource: DataSourceName,
connectionInfo: any,
mdl: Manifest
) {
): Promise<IbisResponse> {
const event = TelemetryEvent.IBIS_DRY_RUN;
try {
const res = await this.ibisAdaptor.dryRun(sql, {
Expand All @@ -187,6 +184,9 @@ export class QueryService implements IQueryService {
mdl,
});
this.sendIbisEvent(event, res, { dataSource, sql });
return {
correlationId: res.correlationId,
}
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
Expand All @@ -209,7 +209,11 @@ export class QueryService implements IQueryService {
limit,
});
this.sendIbisEvent(event, res, { dataSource, sql });
return this.transformDataType(res);
const data = this.transformDataType(res);
return {
correlationId: res.correlationId,
...data
};
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
Expand Down

0 comments on commit a3b2c38

Please sign in to comment.