Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kubernetes_events: Efficiently stream kubernetes events via watch #8351

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
459 changes: 330 additions & 129 deletions plugins/in_kubernetes_events/kubernetes_events.c

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ struct k8s_events {

struct flb_log_event_encoder *encoder;

/* timestamp key - deprecated, to be removed in v3.0 */
flb_sds_t timestamp_key;

/* record accessor */
struct flb_record_accessor *ra_resource_version;

Expand All @@ -85,6 +82,9 @@ struct k8s_events {
struct flb_upstream *upstream;
struct flb_input_instance *ins;

struct flb_connection *current_connection;
struct flb_http_client *streaming_client;

/* limit for event queries */
int limit_request;
/* last highest seen resource_version */
Expand All @@ -105,4 +105,4 @@ struct k8s_events {
pthread_mutex_t lock;
};

#endif
#endif
10 changes: 10 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ static int network_init(struct k8s_events *ctx, struct flb_config *config)
int io_type = FLB_IO_TCP;

ctx->upstream = NULL;
ctx->current_connection = NULL;
ctx->streaming_client = NULL;

if (ctx->api_https == FLB_TRUE) {
if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
Expand Down Expand Up @@ -280,6 +282,14 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
flb_ra_destroy(ctx->ra_resource_version);
}

if(ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
}

if(ctx->current_connection) {
flb_upstream_conn_release(ctx->current_connection);
}

if (ctx->upstream) {
flb_upstream_destroy(ctx->upstream);
}
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_TCP "in_tcp.c")
FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c")
FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c")
FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c")
endif()

# Filter Plugins
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"kind": "EventList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "177157"
},
"items": [
{
"metadata": {
"name": "fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee",
"namespace": "default",
"uid": "6e3013d5-a79b-4dc4-b6c0-6b652302672e",
"resourceVersion": "176761",
"creationTimestamp": "2023-12-24T13:37:16Z",
"managedFields": [
{
"manager": "kube-scheduler",
"operation": "Update",
"apiVersion": "events.k8s.io/v1",
"time": "2023-12-24T13:37:16Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:action": {},
"f:eventTime": {},
"f:note": {},
"f:reason": {},
"f:regarding": {},
"f:reportingController": {},
"f:reportingInstance": {},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Pod",
"namespace": "default",
"name": "fluent-bit-78945dccd8-2g7qg",
"uid": "ed7de8ff-61fb-40bb-9ecb-55a801a4cd89",
"apiVersion": "v1",
"resourceVersion": "176749"
},
"reason": "FailedScheduling",
"message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
"source": {},
"firstTimestamp": null,
"lastTimestamp": null,
"type": "Warning",
"eventTime": "2023-12-24T13:37:16.335172Z",
"action": "Scheduling",
"reportingComponent": "default-scheduler",
"reportingInstance": "default-scheduler-minikube"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1703425036.000000,{"metadata":{"name":"fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee","namespace":"default","uid":"6e3013d5-a79b-4dc4-b6c0-6b652302672e","resourceVersion":"176761","creationTimestamp":"2023-12-24T13:37:16Z","managedFields":[{"manager":"kube-scheduler","operation":"Update","apiVersion":"events.k8s.io/v1","time":"2023-12-24T13:37:16Z","fieldsType":"FieldsV1","fieldsV1":{"f:action":{},"f:eventTime":{},"f:note":{},"f:reason":{},"f:regarding":{},"f:reportingController":{},"f:reportingInstance":{},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-2g7qg","uid":"ed7de8ff-61fb-40bb-9ecb-55a801a4cd89","apiVersion":"v1","resourceVersion":"176749"},"reason":"FailedScheduling","message":"0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..","source":{},"firstTimestamp":null,"lastTimestamp":null,"type":"Warning","eventTime":"2023-12-24T13:37:16.335172Z","action":"Scheduling","reportingComponent":"default-scheduler","reportingInstance":"default-scheduler-minikube"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"kind": "EventList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "177157"
},
"items": [
{
"metadata": {
"name": ".17a3ba8b4aa36c81",
"namespace": "default",
"uid": "ec5546b7-f1b9-4e61-a90c-a1f3b611edbc",
"resourceVersion": "174688",
"creationTimestamp": "2023-12-24T09:30:07Z",
"managedFields": [
{
"manager": "storage-provisioner",
"operation": "Update",
"apiVersion": "v1",
"time": "2023-12-24T09:30:07Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:count": {},
"f:firstTimestamp": {},
"f:involvedObject": {},
"f:lastTimestamp": {},
"f:message": {},
"f:reason": {},
"f:source": {
"f:component": {}
},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Endpoints",
"apiVersion": "v1"
},
"reason": "LeaderElection",
"message": "minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading",
"source": {
"component": "k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"
},
"firstTimestamp": "2023-12-24T09:29:51Z",
"lastTimestamp": "2023-12-24T09:29:51Z",
"count": 1,
"type": "Normal",
"eventTime": null,
"reportingComponent": "",
"reportingInstance": ""
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1703410191.000000,{"metadata":{"name":".17a3ba8b4aa36c81","namespace":"default","uid":"ec5546b7-f1b9-4e61-a90c-a1f3b611edbc","resourceVersion":"174688","creationTimestamp":"2023-12-24T09:30:07Z","managedFields":[{"manager":"storage-provisioner","operation":"Update","apiVersion":"v1","time":"2023-12-24T09:30:07Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:source":{"f:component":{}},"f:type":{}}}]},"involvedObject":{"kind":"Endpoints","apiVersion":"v1"},"reason":"LeaderElection","message":"minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading","source":{"component":"k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"},"firstTimestamp":"2023-12-24T09:29:51Z","lastTimestamp":"2023-12-24T09:29:51Z","count":1,"type":"Normal","eventTime":null,"reportingComponent":"","reportingInstance":""}]
1 change: 1 addition & 0 deletions tests/runtime/data/in_kubernetes_events/token
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fakeTokenFile
Loading
Loading