diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 82003386d6e..7dfa4bf17c5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -48,11 +48,11 @@ struct flb_config { int is_running; /* service running ? */ double flush; /* Flush timeout */ - /* - * Maximum grace time on shutdown. If set to -1, the engine will + /* + * Maximum grace time on shutdown. If set to -1, the engine will * shutdown when all remaining tasks are flushed */ - int grace; + int grace; int grace_count; /* Count of grace shutdown tries */ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int convert_nan_to_null; /* convert null to nan ? */ @@ -227,6 +227,7 @@ struct flb_config { char *storage_bl_mem_limit; /* storage backlog memory limit */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ int storage_trim_files; /* enable/disable file trimming */ + size_t storage_chunk_max_size; /* The max chunk size */ /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB @@ -354,15 +355,16 @@ enum conf_type { #define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6" /* Storage / Chunk I/O */ -#define FLB_CONF_STORAGE_PATH "storage.path" -#define FLB_CONF_STORAGE_SYNC "storage.sync" -#define FLB_CONF_STORAGE_METRICS "storage.metrics" -#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" -#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" -#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" +#define FLB_CONF_STORAGE_PATH "storage.path" +#define FLB_CONF_STORAGE_SYNC "storage.sync" +#define FLB_CONF_STORAGE_METRICS "storage.metrics" +#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" +#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \ - "storage.delete_irrecoverable_chunks" -#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" + "storage.delete_irrecoverable_chunks" +#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" +#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/include/fluent-bit/flb_input_log.h b/include/fluent-bit/flb_input_log.h index 91839981d27..b62d3f65e60 100644 --- a/include/fluent-bit/flb_input_log.h +++ b/include/fluent-bit/flb_input_log.h @@ -25,17 +25,17 @@ int flb_input_log_append(struct flb_input_instance *ins, const char *tag, size_t tag_len, - const void *buf, size_t buf_size); + void *buf, size_t buf_size); int flb_input_log_append_records(struct flb_input_instance *ins, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size); + void *buf, size_t buf_size); int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t processor_starting_stage, const char *tag, size_t tag_len, - const void *buf, + void *buf, size_t buf_size); #endif diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index c20594c7f1c..ef40e4045ce 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -615,8 +615,9 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) file->tag_len, file->sl_log_event_encoder->output_buffer, file->sl_log_event_encoder->output_length); - flb_log_event_encoder_reset(file->sl_log_event_encoder); + flb_free(file->sl_log_event_encoder->output_buffer); + flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder); } } else if (file->skip_next) { diff --git a/src/flb_config.c b/src/flb_config.c index 747d855cf08..89184201a1b 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -154,6 +155,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_TRIM_FILES, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_trim_files)}, + {FLB_CONF_STORAGE_CHUNK_MAX_SIZE, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, storage_chunk_max_size)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -278,6 +282,7 @@ struct flb_config *flb_config_init() config->storage_path = NULL; config->storage_input_plugin = NULL; config->storage_metrics = FLB_TRUE; + config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index ca02e6fca68..01bdc4f9fda 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -1124,7 +1125,9 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, } if (id >= 0) { - if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) { + if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk) || + (flb_input_chunk_get_real_size(ic) + chunk_size) > + FLB_INPUT_CHUNK_FS_MAX_SIZE) { ic = NULL; } else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { diff --git a/src/flb_input_log.c b/src/flb_input_log.c index ac4b5cdfa67..f33ede87aeb 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -23,17 +23,138 @@ #include #include #include +#include +#include + +#include + +struct buffer_entry { + char *buf; + size_t buf_size; + struct mk_list _head; +}; + +static struct buffer_entry *new_buffer_entry(void *buf, size_t buf_size) +{ + struct buffer_entry *new_entry = flb_malloc(sizeof(struct buffer_entry)); + new_entry->buf_size = buf_size; + new_entry->buf = buf; + return new_entry; +} + +static void buffer_entry_destroy(struct buffer_entry *entry) { + if (!entry) { + return; + } + if (entry->buf) { + flb_free(entry->buf); + } + mk_list_del(&entry->_head); + flb_free(entry); +} + +static int split_buffer_entry(struct buffer_entry *entry, + struct mk_list *entries, + int buf_entry_max_size) +{ + int ret; + int encoder_result; + void *tmp_encoder_buf; + size_t tmp_encoder_buf_size; + struct flb_log_event_encoder log_encoder; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int entries_processed; + struct buffer_entry *new_buffer; + + ret = flb_log_event_decoder_init(&log_decoder, entry->buf, entry->buf_size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_error("Log event decoder initialization error : %d", ret); + + return FLB_FALSE; + } + + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FALSE; + } + + entries_processed = 0; + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + encoder_result = flb_log_event_encoder_begin_record(&log_encoder); + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_set_timestamp( + &log_encoder, &log_event.timestamp); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = \ + flb_log_event_encoder_set_metadata_from_msgpack_object( + &log_encoder, log_event.metadata); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = \ + flb_log_event_encoder_set_body_from_msgpack_object( + &log_encoder, log_event.body); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_commit_record(&log_encoder); + } + + if (encoder_result != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("log event encoder error : %d", encoder_result); + continue; + } + + if (log_encoder.output_length >= buf_entry_max_size) { + tmp_encoder_buf_size = log_encoder.output_length; + tmp_encoder_buf = log_encoder.output_buffer; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); + mk_list_add(&new_buffer->_head, entries); + } + + entries_processed++; + } + + if (log_encoder.output_length >= 0) { + tmp_encoder_buf_size = log_encoder.output_length; + tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); + memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); + new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); + mk_list_add(&new_buffer->_head, entries); + } + + flb_log_event_encoder_destroy(&log_encoder); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_TRUE; +} + static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; int processor_is_active; void *out_buf = (void *) buf; size_t out_size = buf_size; + struct mk_list buffers; + struct mk_list *head; + struct mk_list *tmp; + struct buffer_entry *start_buffer; + struct buffer_entry *iter_buffer; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -68,9 +189,23 @@ static int input_log_append(struct flb_input_instance *ins, } } - ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, out_buf, out_size); - + if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + mk_list_init(&buffers); + start_buffer = new_buffer_entry(buf, buf_size); + split_buffer_entry(start_buffer, &buffers, ins->config->storage_chunk_max_size); + flb_free(start_buffer); + mk_list_foreach_safe(head, tmp, &buffers) { + iter_buffer = mk_list_entry(head, struct buffer_entry, _head); + records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size); + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, + tag, tag_len, + iter_buffer->buf, iter_buffer->buf_size); + buffer_entry_destroy(iter_buffer); + } + } else { + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, + tag, tag_len, buf, buf_size); + } if (processor_is_active && buf != out_buf) { flb_free(out_buf); @@ -81,7 +216,7 @@ static int input_log_append(struct flb_input_instance *ins, /* Take a msgpack serialized record and enqueue it as a chunk */ int flb_input_log_append(struct flb_input_instance *ins, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; size_t records; @@ -96,7 +231,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t processor_starting_stage, const char *tag, size_t tag_len, - const void *buf, + void *buf, size_t buf_size) { return input_log_append(ins, @@ -112,7 +247,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, int flb_input_log_append_records(struct flb_input_instance *ins, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret;