Skip to content

Commit

Permalink
.withQueryId
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Oct 16, 2024
1 parent a02941a commit e48be4b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
3 changes: 2 additions & 1 deletion packages/sql-clickhouse/examples/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Effect.gen(function*() {
)

yield* sql`SELECT * FROM clickhouse_js_example_cloud_table ORDER BY id`.stream.pipe(
Stream.runForEach(Effect.log)
Stream.runForEach(Effect.log),
sql.withQueryId("select")
)
}).pipe(
Effect.provide(ClickhouseLive),
Expand Down
57 changes: 37 additions & 20 deletions packages/sql-clickhouse/src/ClickhouseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type { ConfigError } from "effect/ConfigError"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as FiberRef from "effect/FiberRef"
import { identity } from "effect/Function"
import { dual, identity } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
import * as Layer from "effect/Layer"
import type * as Scope from "effect/Scope"
Expand Down Expand Up @@ -49,6 +49,10 @@ export interface ClickhouseClient extends Client.SqlClient {
readonly values: Clickhouse.InsertValues<Readable, T>
readonly format?: Clickhouse.DataFormat
}) => Effect.Effect<Clickhouse.InsertResult, SqlError>
readonly withQueryId: {
(queryId: string): <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
<A, E, R>(effect: Effect.Effect<A, E, R>, queryId: string): Effect.Effect<A, E, R>
}
}

/**
Expand Down Expand Up @@ -101,7 +105,7 @@ export const make = (
return Effect.withFiberRuntime<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((fiber) => {
const method = fiber.getFiberRef(currentClientMethod)
return Effect.async<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((resume) => {
const queryId = Crypto.randomUUID()
const queryId = fiber.getFiberRef(currentQueryId) ?? Crypto.randomUUID()
const controller = new AbortController()
if (method === "command") {
this.conn.command({
Expand Down Expand Up @@ -227,24 +231,28 @@ export const make = (
readonly values: Clickhouse.InsertValues<Readable, T>
readonly format?: Clickhouse.DataFormat
}) {
return Effect.async<Clickhouse.InsertResult, SqlError>((resume) => {
const queryId = Crypto.randomUUID()
const controller = new AbortController()
client.insert({
format: "JSONEachRow",
...options,
abort_signal: controller.signal,
query_id: queryId
}).then(
(result) => resume(Effect.succeed(result)),
(cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to insert data" })))
)
return Effect.suspend(() => {
controller.abort()
return Effect.promise(() => client.command({ query: `KILL QUERY WHERE query_id = '${queryId}'` }))
})
})
}
return FiberRef.getWith(currentQueryId, (queryId_) =>
Effect.async<Clickhouse.InsertResult, SqlError>((resume) => {
const queryId = queryId_ ?? Crypto.randomUUID()
const controller = new AbortController()
client.insert({
format: "JSONEachRow",
...options,
abort_signal: controller.signal,
query_id: queryId
}).then(
(result) =>
resume(Effect.succeed(result)),
(cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to insert data" })))
)
return Effect.suspend(() => {
controller.abort()
return Effect.promise(() => client.command({ query: `KILL QUERY WHERE query_id = '${queryId}'` }))
})
}))
},
withQueryId: dual(2, <A, E, R>(effect: Effect.Effect<A, E, R>, queryId: string) =>
Effect.locally(effect, currentQueryId, queryId))
}
)
})
Expand All @@ -258,6 +266,15 @@ export const currentClientMethod: FiberRef.FiberRef<"query" | "command" | "inser
() => FiberRef.unsafeMake<"query" | "command" | "insert">("query")
)

/**
* @category fiber refs
* @since 1.0.0
*/
export const currentQueryId: FiberRef.FiberRef<string | undefined> = globalValue(
"@effect/sql-clickhouse/ClickhouseClient/currentQueryId",
() => FiberRef.unsafeMake<string | undefined>(undefined)
)

/**
* @category constructor
* @since 1.0.0
Expand Down

0 comments on commit e48be4b

Please sign in to comment.