From 61d97c48c2312cf9ab1676c08bc645bc1e53310b Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 12 Apr 2024 16:23:56 -0400 Subject: [PATCH] in_http: return a 400 error when unable to receive a message. return a 400 error when the incoming message is invalid, unable to be processed or cannot be appended as a log entry. at the moment only the logs themselves can distinguish these different error cases but at least clients will be alerted to problems. Signed-off-by: Phillip Whelan --- plugins/in_http/http_prot.c | 293 +++++++++++++++--------------------- 1 file changed, 120 insertions(+), 173 deletions(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index be861385e78..188591e4d0d 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -127,7 +127,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message) } else if (http_status == 400) { flb_sds_printf(&out, - "HTTP/1.1 400 Forbidden\r\n" + "HTTP/1.1 400 Bad Request\r\n" "Server: Fluent Bit v%s\r\n" "Content-Length: %i\r\n\r\n%s", FLB_VERSION_STR, @@ -211,6 +211,56 @@ static flb_sds_t tag_key(struct flb_http *ctx, msgpack_object *map) return NULL; } +static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, + flb_sds_t tag, + msgpack_object *record) +{ + int ret; + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_timestamp(&ctx->log_encoder, tm); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_body_from_msgpack_object( + &ctx->log_encoder, + record); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + if (tag) { + ret = flb_input_log_append(ctx->ins, + tag, + flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + /* use default plugin Tag (it internal name, e.g: http.0 */ + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + return 0; +} + int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) { int ret; @@ -233,48 +283,19 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) tag_from_record = tag_key(ctx, obj); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, obj); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &result.data); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, obj); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, obj); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } flb_log_event_encoder_reset(&ctx->log_encoder); @@ -289,48 +310,19 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) tag_from_record = tag_key(ctx, &record); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, &record); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &record); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, &record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, &record); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + goto log_event_error; } /* TODO : Optimize this @@ -350,7 +342,6 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) result.data.type); msgpack_unpacked_destroy(&result); - return -1; } } @@ -358,6 +349,11 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) msgpack_unpacked_destroy(&result); return 0; + +log_event_error: + msgpack_unpacked_destroy(&result); + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + return ret; } static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, @@ -390,10 +386,10 @@ static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_pack(ctx, tag, pack, out_size); + ret = process_pack(ctx, tag, pack, out_size); flb_free(pack); - return 0; + return ret; } static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, @@ -548,14 +544,15 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, } if (type == HTTP_CONTENT_JSON) { - parse_payload_json(ctx, tag, request->data.data, request->data.len); + ret = parse_payload_json(ctx, tag, request->data.data, request->data.len); } else if (type == HTTP_CONTENT_URLENCODED) { ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); - if (ret != 0) { - send_response(conn, 400, "error: invalid payload\n"); - return -1; - } + } + + if (ret != 0) { + send_response(conn, 400, "error: invalid payload\n"); + return -1; } return 0; @@ -685,6 +682,9 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn, if (ret == 0) { send_response(conn, ctx->successful_response_code, NULL); } + else { + send_response(conn, 400, "unable to process records\n"); + } return ret; } @@ -779,53 +779,25 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { if (result.data.type == MSGPACK_OBJECT_MAP) { tag_from_record = NULL; + obj = &result.data; + if (ctx->tag_key) { - obj = &result.data; tag_from_record = tag_key(ctx, obj); } - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, obj); + flb_sds_destroy(tag_from_record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &result.data); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, obj); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + else { + ret = process_pack_record(ctx, &tm, NULL, obj); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } flb_log_event_encoder_reset(&ctx->log_encoder); @@ -836,53 +808,19 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ { record = obj->via.array.ptr[i]; - tag_from_record = NULL; - if (ctx->tag_key) { - tag_from_record = tag_key(ctx, &record); + if (tag_from_record) { + ret = process_pack_record(ctx, &tm, tag_from_record, &record); + flb_sds_destroy(tag_from_record); } - - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); + else if (tag) { + ret = process_pack_record(ctx, &tm, tag, &record); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - &record); + else { + ret = process_pack_record(ctx, &tm, NULL, &record); } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + if (ret != 0) { + goto log_event_error; } /* TODO : Optimize this @@ -908,8 +846,12 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ } msgpack_unpacked_destroy(&result); - return 0; + +log_event_error: + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + msgpack_unpacked_destroy(&result); + return -1; } static ssize_t parse_payload_json_ng(flb_sds_t tag, @@ -949,10 +891,10 @@ static ssize_t parse_payload_json_ng(flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_pack_ng(ctx, tag, pack, out_size); + ret = process_pack_ng(ctx, tag, pack, out_size); flb_free(pack); - return 0; + return ret; } static int process_payload_ng(flb_sds_t tag, @@ -988,13 +930,13 @@ static int process_payload_ng(flb_sds_t tag, } if (type == HTTP_CONTENT_JSON) { - parse_payload_json_ng(tag, request); + return parse_payload_json_ng(tag, request); } else if (type == HTTP_CONTENT_URLENCODED) { ctx = (struct flb_http *) request->stream->user_data; payload = (char *) request->body; if (payload) { - parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); + return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); } } @@ -1056,7 +998,12 @@ int http_prot_handle_ng(struct flb_http_request *request, ret = process_payload_ng(tag, request, response); flb_sds_destroy(tag); - send_response_ng(response, ctx->successful_response_code, NULL); + if (ret == 0) { + send_response_ng(response, ctx->successful_response_code, NULL); + } + else { + send_response_ng(response, 400, "error: unable to process records\n"); + } return ret; }