-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add basic kafka tracking support with data streams #3
Conversation
@@ -0,0 +1,75 @@ | |||
defmodule Datadog.DataStreams.Transport do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with using Finch and manually doing the JSON decoding + gzip. Could probably be simplified with Tesla, but I wanted to keep deps low.
|
||
defp decode_time_binary(<<>>, acc), do: acc | ||
|
||
defdecoderp decode_time_binary(acc) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Macro from the Protobuf library needed for performance and decoding how integers are sent.
@spec encode_header(headers, Pathway.t()) :: headers | ||
when headers: list({binary(), binary()}) | %{required(binary()) => binary()} | nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the weirdest types I've ever made but it matches the function heads.
defmodule Datadog.DataStreams.Payload.Backlog do | ||
@moduledoc false | ||
|
||
defstruct tags: [], | ||
value: 0 | ||
|
||
@type t() :: %__MODULE__{ | ||
tags: [String.t()], | ||
value: non_neg_integer() | ||
} | ||
end | ||
|
||
defimpl Msgpax.Packer, for: Datadog.DataStreams.Payload.Backlog do | ||
def pack(_data) do | ||
[] | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file will be used later on. It's in here right now just so I don't break the spec with a half implementation.
# Splits the `map` into two maps according to the given function `fun`. | ||
# This function was taken from Elixir 1.15 for backwards support with older | ||
# versions. | ||
defp split_with(map, fun) when is_map(map) and is_function(fun, 1) do | ||
iter = :maps.iterator(map) | ||
next = :maps.next(iter) | ||
|
||
do_split_with(next, [], [], fun) | ||
end | ||
|
||
defp do_split_with(:none, while_true, while_false, _fun) do | ||
{:maps.from_list(while_true), :maps.from_list(while_false)} | ||
end | ||
|
||
defp do_split_with({key, value, iter}, while_true, while_false, fun) do | ||
if fun.({key, value}) do | ||
do_split_with(:maps.next(iter), [{key, value} | while_true], while_false, fun) | ||
else | ||
do_split_with(:maps.next(iter), while_true, [{key, value} | while_false], fun) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another Elixir 1.15 function that I want now!!!!
I can probably remove this if we use a list instead of map for storing buckets. I'll need to do some speed compares to figure out what is faster and if it's worth the effort.
unless Payload.stats_count(payload) == 0 do | ||
Task.async(fn -> | ||
with {:ok, encoded_payload} <- Payload.encode(payload), | ||
:ok <- Transport.send_pipeline_stats(encoded_payload) do | ||
{:ok, Payload.stats_count(payload)} | ||
else | ||
{:error, reason} -> {:error, reason} | ||
something -> {:error, something} | ||
end | ||
end) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love Elixir. I thought I'd have to spin up a whole new GenServer to make the HTTP requests non blocking. Nope! Just Task.async/1
and it will send the results to the GenServer. 🤯
Worth mentioning that the metrics and messages per second don't match the amount of messages I'm actually sending. I've not been able to track why that is, but I have confirmed it's matching the golang implementation, so I'm going to keep it the way it is until something upstream changes. |
Per @msutkowski's request, I added more tests around the aggregator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're getting to work on some cool stuff lately @btkostner!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did read through this, but there is a ton. I'd like to follow up with how we can add some basic coverage for at least the aggregator (edit: i had a draft and you added them in the mean time!). Ideally with a port like this, we'd be able to leverage the same tests as the other lib, but I realize thats probably a lot.
Being that this is additive and shouldn't break anything in a critical path, I'm for it.
🤖 I have created a release *beep* *boop* --- ## 1.0.0 (2023-04-06) ### ⚠ BREAKING CHANGES * Package and application configuration is now under `data_streams` instead of `dd_data_streams` ### Features * add basic implementation of ddsketch ([#1](#1)) ([125b5ed](125b5ed)) * add basic kafka tracking support with data streams ([#3](#3)) ([bfc6a0b](bfc6a0b)) * add LICENSE file ([#11](#11)) ([6c5668f](6c5668f)) * link open telemetry span to current pathway context ([#5](#5)) ([e0ed9b2](e0ed9b2)) * rename dd_data_streams to data_streams ([#9](#9)) ([a0d1742](a0d1742)) ### Bug Fixes * add case for error http status ([#8](#8)) ([ef4a95d](ef4a95d)) * **ci:** update PR title regex check ([64ef99f](64ef99f)) * dialyzer warnings for kafka integration map ([18bf936](18bf936)) * filter out nil values from kafka integration tags ([b33926f](b33926f)) * update kafka integration to not set context on produce ([#7](#7)) ([6807b6d](6807b6d)) * update otel resource service configuration ([adb9890](adb9890)) * update tag logic to be more consistant ([#4](#4)) ([48d13df](48d13df)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This adds the basic data streams code and Kafka integration functions. It tracks pathways between services. It does not yet have the functions for tracking Kafka offsets.
TODO: