Skip to content

Commit

Permalink
custom_calyptia: added interval handling and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Niedbalski authored and edsiper committed Nov 27, 2024
1 parent 7cafc50 commit ff85301
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 94 deletions.
159 changes: 65 additions & 94 deletions plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,7 @@

#include <fluent-bit/flb_hash.h>

struct calyptia {
/* config map options */
flb_sds_t api_key;
flb_sds_t store_path;
flb_sds_t cloud_host;
flb_sds_t cloud_port;
flb_sds_t machine_id;
int machine_id_auto_configured;

/* used for reporting chunk trace records. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */

int cloud_tls;
int cloud_tls_verify;

/* config reader for 'add_label' */
struct mk_list *add_labels;

/* instances */
struct flb_input_instance *i;
struct flb_output_instance *o;
struct flb_input_instance *fleet;
struct flb_custom_instance *ins;

/* Fleet configuration */
flb_sds_t fleet_id; /* fleet-id */
flb_sds_t fleet_name;
flb_sds_t fleet_config_dir; /* fleet configuration directory */
flb_sds_t fleet_max_http_buffer_size;
int fleet_interval_sec;
int fleet_interval_nsec;
};
#include "calyptia.h"

/*
* Check if the key belongs to a sensitive data field, if so report it. We never
Expand Down Expand Up @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx)
return buf;
}

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet)
{
if (!fleet) {
flb_plg_error(ctx->ins, "invalid fleet input instance");
return -1;
}

if (ctx->fleet_name) {
flb_input_set_property(fleet, "fleet_name", ctx->fleet_name);
}

if (ctx->fleet_id) {
flb_input_set_property(fleet, "fleet_id", ctx->fleet_id);
}

flb_input_set_property(fleet, "api_key", ctx->api_key);
flb_input_set_property(fleet, "host", ctx->cloud_host);
flb_input_set_property(fleet, "port", ctx->cloud_port);

/* Set TLS properties */
flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off");
flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off");

/* Optional configurations */
if (ctx->fleet_config_dir) {
flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir);
}

if (ctx->fleet_max_http_buffer_size) {
flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
}

if (ctx->machine_id) {
flb_input_set_property(fleet, "machine_id", ctx->machine_id);
}

if (ctx->fleet_interval_sec) {
flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec);
}

if (ctx->fleet_interval_nsec) {
flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec);
}

return 0;
}

static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx)
{
int ret;
Expand Down Expand Up @@ -387,16 +401,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx)
}

static int cb_calyptia_init(struct flb_custom_instance *ins,
struct flb_config *config,
void *data)
struct flb_config *config,
void *data)
{
int ret;
struct calyptia *ctx;
int is_fleet_mode;
(void) data;

ctx = flb_calloc(1, sizeof(struct calyptia));

if (!ctx) {
flb_errno();
return -1;
Expand All @@ -405,7 +417,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

/* Load the config map */
ret = flb_custom_config_map_set(ins, (void *) ctx);

if (ret == -1) {
flb_free(ctx);
return -1;
Expand All @@ -416,21 +427,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

/* If no machine_id has been provided via a configuration option get it from the local machine-id. */
if (!ctx->machine_id) {
/* machine id */
ctx->machine_id = get_machine_id(ctx);

if (ctx->machine_id == NULL) {
flb_plg_error(ctx->ins, "unable to retrieve machine_id");
flb_free(ctx);
return -1;
}

ctx->machine_id_auto_configured = 1;
}

/* input collector */
ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE);

if (!ctx->i) {
flb_plg_error(ctx->ins, "could not load metrics collector");
flb_free(ctx);
Expand All @@ -439,76 +446,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

flb_input_set_property(ctx->i, "tag", "_calyptia_cloud");
flb_input_set_property(ctx->i, "scrape_on_start", "true");
// This scrape interval should be configurable.
flb_input_set_property(ctx->i, "scrape_interval", "30");

if (ctx->fleet_name || ctx->fleet_id) {
is_fleet_mode = FLB_TRUE;
}
else {
is_fleet_mode = FLB_FALSE;
}

/* output cloud connector */
if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) ||
(is_fleet_mode == FLB_FALSE)) {
/* Setup cloud output if needed */
if (ctx->fleet_id != NULL || !ctx->fleet_name) {
ctx->o = setup_cloud_output(config, ctx);

if (ctx->o == NULL) {
flb_free(ctx);
return -1;
}
/* Set fleet_id for output if present */
if (ctx->fleet_id) {
flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
}
}

/* Setup fleet input if needed */
if (ctx->fleet_id || ctx->fleet_name) {
ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);

ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);
if (!ctx->fleet) {
flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin");
return -1;
}

if (ctx->fleet_name) {
flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name);
}

if (ctx->fleet_id) {
flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id);
}

flb_input_set_property(ctx->fleet, "api_key", ctx->api_key);
flb_input_set_property(ctx->fleet, "host", ctx->cloud_host);
flb_input_set_property(ctx->fleet, "port", ctx->cloud_port);

if (ctx->cloud_tls == 1) {
flb_input_set_property(ctx->fleet, "tls", "on");
}
else {
flb_input_set_property(ctx->fleet, "tls", "off");
}

if (ctx->cloud_tls_verify == 1) {
flb_input_set_property(ctx->fleet, "tls.verify", "on");
}
else {
flb_input_set_property(ctx->fleet, "tls.verify", "off");
}

if (ctx->fleet_config_dir) {
flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir);
}

if (ctx->fleet_max_http_buffer_size) {
flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
}
if (ctx->machine_id) {
flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id);
ret = set_fleet_input_properties(ctx, ctx->fleet);
if (ret == -1) {
return -1;
}
}

if (ctx->o) {
flb_router_connect(ctx->i, ctx->o);
}

flb_plg_info(ins, "custom initialized!");
return 0;
}
Expand Down Expand Up @@ -587,12 +558,12 @@ static struct flb_config_map config_map[] = {
"Base path for the configuration directory."
},
{
FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1",
FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1",
0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec),
"Set the collector interval"
},
{
FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1",
FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1",
0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec),
"Set the collector interval (nanoseconds)"
},
Expand Down
59 changes: 59 additions & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_CALYPTIA_H
#define FLB_CALYPTIA_H

struct calyptia {
/* config map options */
flb_sds_t api_key;
flb_sds_t store_path;
flb_sds_t cloud_host;
flb_sds_t cloud_port;
flb_sds_t machine_id;
int machine_id_auto_configured;

/* used for reporting chunk trace records. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */

int cloud_tls;
int cloud_tls_verify;

/* config reader for 'add_label' */
struct mk_list *add_labels;

/* instances */
struct flb_input_instance *i;
struct flb_output_instance *o;
struct flb_input_instance *fleet;
struct flb_custom_instance *ins;

/* Fleet configuration */
flb_sds_t fleet_id; /* fleet-id */
flb_sds_t fleet_name;
flb_sds_t fleet_config_dir; /* fleet configuration directory */
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
#endif /* FLB_CALYPTIA_H */
26 changes: 26 additions & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c")
endif()

if (FLB_CUSTOM_CALYPTIA)
# Define common variables for calyptia tests
set(CALYPTIA_TEST_LINK_LIBS
fluent-bit-static
${CMAKE_THREAD_LIBS_INIT}
)

# Add calyptia input properties test
set(TEST_TARGET "flb-rt-calyptia_input_properties")
add_executable(${TEST_TARGET}
"custom_calyptia_input_test.c"
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)

add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
endif()

if(FLB_IN_EBPF)
# Define common variables
set(EBPF_TEST_INCLUDE_DIRS
Expand Down
Loading

0 comments on commit ff85301

Please sign in to comment.