Skip to content

Commit

Permalink
Porpagate OpenTelemetry context in tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
rewritten committed Feb 8, 2023
1 parent ea3a78d commit 5965013
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 3 deletions.
26 changes: 26 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,29 @@ After a query is executed, you'll see something like:
}
}
```

## Opentelemetry

When using Opentelemetry, one usually wants to correlate spans that are created
in spawned tasks with the main trace. For example, you might have a trace started
in a Phoenix endpoint, and then have spans around database access.

One can correlate manually by attaching the OTel context the task function:

```elixir
ctx = OpenTelemetry.Ctx.get_current()

Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)

# do stuff that might create spans
end)
```

When using Dataloader, the tasks are spawned by the loader itself, so you can't
attach the context manually.

Instead, you can add the `:opentelemetry_process_propagator` package to your
dependencies, which has suitable wrappers that will attach the context
automatically. If the package is installed, Dataloader will use it in place
of the default `Task.async/1` and `Task.async_stream/3`.
20 changes: 18 additions & 2 deletions lib/dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ defmodule Dataloader do
# if the current process is linked to something, and then that something
# dies in the middle of us loading stuff.
task =
Task.async(fn ->
async(fn ->
# The purpose of `:trap_exit` here is so that we can ensure that any failures
# within the tasks do not kill the current process. We want to get results
# back no matter what.
Expand Down Expand Up @@ -359,7 +359,7 @@ defmodule Dataloader do
results =
if Keyword.get(opts, :async?, true) do
items
|> Task.async_stream(fun, task_opts)
|> async_stream(fun, task_opts)
|> Enum.map(fn
{:ok, result} -> {:ok, result}
{:exit, reason} -> {:error, reason}
Expand Down Expand Up @@ -390,4 +390,20 @@ defmodule Dataloader do
def pmap(items, fun, opts \\ []) do
async_safely(__MODULE__, :run_tasks, [items, fun, opts])
end

# Optionally use `async/1` and `async_stream/3` functions from
# `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: OpentelemetryProcessPropagator.Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task
else
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: Task
end
end
12 changes: 11 additions & 1 deletion lib/dataloader/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ if Code.ensure_loaded?(Ecto) do
end

defp maybe_async_stream(batches, fun, options, true) do
Task.async_stream(batches, fun, options)
async_stream(batches, fun, options)
end

defp maybe_async_stream(batches, fun, _options, _) do
Expand Down Expand Up @@ -964,6 +964,16 @@ if Code.ensure_loaded?(Ecto) do
other
end
end

# Optionally use `async_stream/3` function from
# `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task
else
@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: Task
end
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule Dataloader.Mixfile do
[
{:telemetry, "~> 1.0 or ~> 0.4"},
{:ecto, ">= 3.4.3 and < 4.0.0", optional: true},
{:opentelemetry_process_propagator, "~> 0.2.1", optional: true},
{:ecto_sql, "~> 3.0", optional: true, only: :test},
{:postgrex, "~> 0.14", only: :test, runtime: false},
{:dialyxir, "~> 1.0.0", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"},
"opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"},
"postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"},
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
}
16 changes: 16 additions & 0 deletions test/dataloader/kv_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ defmodule Dataloader.KVTest do
assert not_found_users == [nil]
end

test "propagates the OTel context", %{loader: loader} do
OpenTelemetry.Ctx.set_value("stored_value", "some_value")

context_value =
loader
|> Dataloader.load(Test, :otel_context, "stored_value")
|> Dataloader.run()
|> Dataloader.get(Test, :otel_context, "stored_value")

assert context_value == "some_value"
end

defp query(batch_key, ids, test_pid) do
send(test_pid, :querying)

Expand All @@ -216,6 +228,10 @@ defmodule Dataloader.KVTest do
defp query(_batch_key, "something_that_errors"),
do: raise("Failed when fetching key 'something_that_errors'")

defp query(:otel_context, key) do
{key, OpenTelemetry.Ctx.get_value(key, nil)}
end

defp query(batch_key, id) do
item =
@data[batch_key]
Expand Down

0 comments on commit 5965013

Please sign in to comment.