Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_opentelemetry: attempt to fix tag_from_uri #8881

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message)

static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -143,7 +144,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
cfl_list_foreach(iterator, &decoded_contexts) {
context = cfl_list_entry(iterator, struct cmt, _head);

result = flb_input_metrics_append(ctx->ins, NULL, 0, context);
result = flb_input_metrics_append(ctx->ins, tag, tag_len, context);

if (result != 0) {
flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result);
Expand All @@ -158,6 +159,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co

static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -171,7 +173,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht
request->data.len,
&offset);
if (result == 0) {
result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
}

Expand All @@ -180,6 +182,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht

static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -217,24 +220,25 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http
flb_free(out_buf);
}

flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
int result;

if (ctx->raw_traces) {
result = process_payload_raw_traces(ctx, conn, tag, session, request);
result = process_payload_raw_traces(ctx, conn, tag, tag_len, session, request);
}
else {
result = process_payload_traces_proto(ctx, conn, tag, session, request);
result = process_payload_traces_proto(ctx, conn, tag, tag_len, session, request);
}

return result;
Expand Down Expand Up @@ -1459,6 +1463,7 @@ static int json_payload_to_msgpack(struct flb_opentelemetry *ctx,

static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -1494,7 +1499,7 @@ static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn
if (ret == 0) {
ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
tag_len,
encoder->output_buffer,
encoder->output_length);
}
Expand Down Expand Up @@ -1677,6 +1682,7 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
size_t original_data_size;
char *uncompressed_data;
size_t uncompressed_data_size;
size_t tag_len;

if (request->uri.data[0] != '/') {
send_response(conn, 400, "error: invalid request\n");
Expand Down Expand Up @@ -1736,6 +1742,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}
}

tag_len = flb_sds_len(tag);

/* Check if we have a Host header: Hostname ; port */
mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);

Expand Down Expand Up @@ -1785,13 +1793,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}

if (strcmp(uri, "/v1/metrics") == 0) {
ret = process_payload_metrics(ctx, conn, tag, session, request);
ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/traces") == 0) {
ret = process_payload_traces(ctx, conn, tag, session, request);
ret = process_payload_traces(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/logs") == 0) {
ret = process_payload_logs(ctx, conn, tag, session, request);
ret = process_payload_logs(ctx, conn, tag, tag_len, session, request);
}

if (uncompressed_data != NULL) {
Expand Down
Loading