Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow tracking kafka produce and consume offsets #18

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion lib/datadog/data_streams/aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ defmodule Datadog.DataStreams.Aggregator do

iex> :ok = Aggregator.add(%Aggregator.Point{})

iex> :ok = Aggregator.add(%Aggregator.Offset{})

"""
@spec add(Aggregator.Point.t()) :: :ok
@spec add(Aggregator.Point.t() | Aggregator.Offset.t()) :: :ok
def add(%Aggregator.Point{} = point) do
:telemetry.execute([:datadog, :datastreams, :aggregator, :payloads_in], %{count: 1})
GenServer.cast(__MODULE__, {:add, point})
end

def add(%Aggregator.Offset{} = offset) do
GenServer.cast(__MODULE__, {:add, offset})
end

@doc """
Sends all stored data to the Datadog agent.

Expand Down Expand Up @@ -122,6 +128,19 @@ defmodule Datadog.DataStreams.Aggregator do
}}
end

def handle_cast({:add, %Aggregator.Offset{} = offset}, state) do
new_ts_type_current_buckets =
Aggregator.Bucket.upsert(state.ts_type_current_buckets, offset.timestamp, fn bucket ->
type_key =
if offset.type == :commit, do: :latest_commit_offsets, else: :latest_produce_offsets

new_offsets = bucket |> Map.get(:type_key, []) |> Aggregator.Offset.upsert(offset)
Map.put(bucket, type_key, new_offsets)
end)

{:noreply, %{state | ts_type_current_buckets: new_ts_type_current_buckets}}
end

@doc false
def handle_info(:send, state) do
Process.cancel_timer(state.send_timer)
Expand Down
13 changes: 4 additions & 9 deletions lib/datadog/data_streams/aggregator/bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,19 @@ defmodule Datadog.DataStreams.Aggregator.Bucket do
@bucket_duration 10 * 1_000 * 1_000 * 1_000

defstruct groups: %{},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 0,
duration: @bucket_duration

@type t :: %__MODULE__{
groups: %{non_neg_integer() => Aggregator.Group.t()},
latest_commit_offsets: %{non_neg_integer() => non_neg_integer()},
latest_produce_offsets: %{partition_key() => non_neg_integer()},
latest_commit_offsets: [Aggregator.Offset.t()],
latest_produce_offsets: [Aggregator.Offset.t()],
start: non_neg_integer(),
duration: non_neg_integer()
}

@type partition_key :: %{
partition: non_neg_integer(),
topic: String.t()
}

@spec align_timestamp(non_neg_integer()) :: non_neg_integer()
defp align_timestamp(timestamp) do
timestamp - rem(timestamp, @bucket_duration)
Expand Down
48 changes: 48 additions & 0 deletions lib/datadog/data_streams/aggregator/offset.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule Datadog.DataStreams.Aggregator.Offset do
@moduledoc false

defstruct offset: 0,
timestamp: 0,
type: :commit,
tags: %{}

@type type :: :commit | :produce

@type t :: %__MODULE__{
offset: integer(),
timestamp: non_neg_integer(),
type: type(),
tags: %{String.t() => any()}
}

@doc """
Creates a new offset map with the given offset and options
"""
@spec new(type(), integer(), non_neg_integer(), Keyword.t()) :: t()
def new(type, offset, timestamp, opts \\ []) do
%__MODULE__{
offset: offset,
timestamp: timestamp,
type: type,
tags: Map.new(opts)
}
end

@doc """
Updates an existing `#{__MODULE__}` where all properties except the
`offset` match. If no match is found, we create a new one.
"""
@spec upsert([t()], t()) :: [t()]
def upsert(offsets, %{tags: upsert_tags} = upsert_offset) do
matching_index =
Enum.find(offsets, fn %{tags: tags} ->
match?(^tags, upsert_tags)
end)

if is_nil(matching_index) do
offsets ++ [upsert_offset]
else
List.replace_at(offsets, matching_index, upsert_offset)
end
end
end
41 changes: 40 additions & 1 deletion lib/datadog/data_streams/integrations/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Datadog.DataStreams.Integrations.Kafka do

require OpenTelemetry.Tracer, as: Tracer

alias Datadog.DataStreams.{Context, Pathway, Propagator, Tags}
alias Datadog.DataStreams.{Aggregator, Context, Pathway, Propagator, Tags}

@otel_attribute "pathway.hash"

Expand Down Expand Up @@ -88,6 +88,25 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
|> Map.merge(%{type: "kafka", direction: "out"})
end

@doc """
Tracks Kafka produce events via their offset. This is used by Datadog
to calculate the lag without requiring the consumer to be on and
reading trace headers.
"""
@spec track_produce(String.t(), non_neg_integer(), integer()) :: :ok
def track_produce(topic, partition, offset) do
Aggregator.add(%Aggregator.Offset{
offset: offset,
timestamp: :erlang.system_time(:nanosecond),
type: :produce,
tags: %{
"partition" => partition,
"topic" => topic,
"type" => "kafka_produce"
}
})
end

@doc """
Traces a Kafka message being consumed. Requires the current Kafka
consumer group. Uses the pathway in the current
Expand Down Expand Up @@ -130,4 +149,24 @@ defmodule Datadog.DataStreams.Integrations.Kafka do
|> Map.take([:topic, :partition])
|> Map.merge(%{type: "kafka", direction: "in", group: consumer_group})
end

@doc """
Tracks Kafka produce events via their offset. This is used by Datadog
to calculate the lag without requiring the consumer to be on and
reading trace headers.
"""
@spec track_consume(String.t(), String.t(), non_neg_integer(), integer()) :: :ok
def track_consume(group, topic, partition, offset) do
Aggregator.add(%Aggregator.Offset{
offset: offset,
timestamp: :erlang.system_time(:nanosecond),
type: :commit,
tags: %{
"consumer_group" => group,
"partition" => partition,
"topic" => topic,
"type" => "kafka_commit"
}
})
end
end
24 changes: 21 additions & 3 deletions lib/datadog/data_streams/payload/backlog.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Datadog.DataStreams.Payload.Backlog do
@moduledoc false

alias Datadog.DataStreams.Tags
alias Datadog.DataStreams.{Aggregator, Tags}

defstruct tags: [],
value: 0
Expand All @@ -10,10 +10,28 @@ defmodule Datadog.DataStreams.Payload.Backlog do
tags: Tags.encoded(),
value: non_neg_integer()
}

@doc """
Creates a new backlog struct from an aggregator offset.
"""
@spec new(Aggregator.Offset.t()) :: t()
def new(%Aggregator.Offset{offset: offset, tags: tags}) do
%__MODULE__{
tags: tags |> Tags.parse() |> Tags.encode(),
value: offset
}
end
end

defimpl Msgpax.Packer, for: Datadog.DataStreams.Payload.Backlog do
def pack(_data) do
[]
def pack(data) do
[
# Tags
[0x82, 0xA4, 0x54, 0x61, 0x67, 0x73],
Msgpax.Packer.pack(data.tags),
# Value
[0xA5, 0x56, 0x61, 0x6C, 0x75, 0x65],
Msgpax.Packer.pack(data.value)
]
end
end
5 changes: 4 additions & 1 deletion lib/datadog/data_streams/payload/bucket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ defmodule Datadog.DataStreams.Payload.Bucket do
%__MODULE__{
start: bucket.start,
duration: bucket.duration,
stats: bucket.groups |> Map.values() |> Enum.map(&Payload.Point.new(&1, timestamp_type))
stats: bucket.groups |> Map.values() |> Enum.map(&Payload.Point.new(&1, timestamp_type)),
backlogs:
Enum.map(bucket.latest_produce_offsets, &Payload.Backlog.new/1) ++
Enum.map(bucket.latest_commit_offsets, &Payload.Backlog.new/1)
}
end
end
Expand Down
61 changes: 51 additions & 10 deletions test/datadog/data_streams/aggregator_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ defmodule Datadog.DataStreams.AggregatorTest do
end

describe "add/1" do
test "sends AggregatorPoint to module when not started" do
test "sends Aggregator.Point to module when not started" do
refute Process.whereis(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Point{})
end

@tag :capture_log
test "sends AggregatorPoint to module when registered" do
test "sends Aggregator.Point to module when registered" do
Application.put_env(:data_streams, :agent, enabled?: true)
start_supervised!(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Point{})
end

test "sends Aggregator.Offset to module when not started" do
refute Process.whereis(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Offset{})
end

@tag :capture_log
test "sends Aggregator.Offset to module when registered" do
Application.put_env(:data_streams, :agent, enabled?: true)
start_supervised!(Aggregator)
assert :ok = Aggregator.add(%Aggregator.Offset{})
end
end

describe "init/1" do
Expand Down Expand Up @@ -52,41 +64,70 @@ defmodule Datadog.DataStreams.AggregatorTest do
)

assert %{
1_678_471_420_000_000_000 => %Datadog.DataStreams.Aggregator.Bucket{
1_678_471_420_000_000_000 => %Aggregator.Bucket{
groups: %{
9_808_874_869_469_701_221 => %Datadog.DataStreams.Aggregator.Group{
9_808_874_869_469_701_221 => %Aggregator.Group{
edge_tags: ["type:test"],
hash: 9_808_874_869_469_701_221,
parent_hash: 17_210_443_572_488_294_574,
pathway_latency: _,
edge_latency: _
}
},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 1_678_471_420_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_current_buckets

assert %{
1_678_471_410_000_000_000 => %Datadog.DataStreams.Aggregator.Bucket{
1_678_471_410_000_000_000 => %Aggregator.Bucket{
groups: %{
9_808_874_869_469_701_221 => %Datadog.DataStreams.Aggregator.Group{
9_808_874_869_469_701_221 => %Aggregator.Group{
edge_tags: ["type:test"],
hash: 9_808_874_869_469_701_221,
parent_hash: 17_210_443_572_488_294_574,
pathway_latency: _,
edge_latency: _
}
},
latest_commit_offsets: %{},
latest_produce_offsets: %{},
latest_commit_offsets: [],
latest_produce_offsets: [],
start: 1_678_471_410_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_origin_buckets
end

test "adds aggregator offset to bucket", %{state: state} do
offset = %Aggregator.Offset{
offset: 13,
timestamp: 1_687_986_447_538_450_340,
type: :commit,
tags: %{
"consumer_group" => "test-group",
"partition" => 0,
"topic" => "test-topic",
"type" => "kafka_commit"
}
}

assert {:noreply, new_state} = Aggregator.handle_cast({:add, offset}, state)

assert %{
1_687_986_440_000_000_000 => %Aggregator.Bucket{
groups: %{},
latest_commit_offsets: [^offset],
latest_produce_offsets: [],
start: 1_687_986_440_000_000_000,
duration: 10_000_000_000
}
} = new_state.ts_type_current_buckets

# It should find and update the existing data, not add to the bucket.
assert {:noreply, ^new_state} = Aggregator.handle_cast({:add, offset}, new_state)
end
end

describe "handle_info/2 {task_ref, {:ok, count}}" do
Expand Down