diff --git a/scripts/integration/clickhouse/compose.yaml b/scripts/integration/clickhouse/compose.yaml index fe11611e9265d..62f8a90a543c2 100644 --- a/scripts/integration/clickhouse/compose.yaml +++ b/scripts/integration/clickhouse/compose.yaml @@ -2,4 +2,4 @@ version: '3' services: clickhouse: - image: docker.io/yandex/clickhouse-server:${CONFIG_VERSION} + image: docker.io/clickhouse/clickhouse-server:${CONFIG_VERSION} diff --git a/scripts/integration/clickhouse/test.yaml b/scripts/integration/clickhouse/test.yaml index 7da786d257251..7b106b914d4dd 100644 --- a/scripts/integration/clickhouse/test.yaml +++ b/scripts/integration/clickhouse/test.yaml @@ -7,7 +7,7 @@ env: CLICKHOUSE_ADDRESS: http://clickhouse:8123 matrix: - version: ['19'] + version: ['23'] # changes to these files/paths will invoke the integration test in CI # expressions are evaluated using https://github.com/micromatch/picomatch diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index 08299eb541099..0efffb97ef3ce 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -1,21 +1,18 @@ -use vector_config::configurable_component; +use http::{Request, StatusCode, Uri}; +use hyper::Body; +use super::{ + service::{ClickhouseRetryLogic, ClickhouseService}, + sink::ClickhouseSink, +}; use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - http::Auth, + http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth}, sinks::{ - util::{ - BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, - UriSerde, - }, - Healthcheck, VectorSink, + prelude::*, + util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde}, }, - tls::TlsConfig, }; -use super::http_sink::build_http_sink; - /// Configuration for the `clickhouse` sink. #[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))] #[derive(Clone, Debug, Default)] @@ -82,9 +79,41 @@ impl_generate_config_from_default!(ClickhouseConfig); #[typetag::serde(name = "clickhouse")] impl SinkConfig for ClickhouseConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - // later we can build different sink(http, native) here - // according to the clickhouseConfig - build_http_sink(self, cx).await + let endpoint = self.endpoint.with_default_parts().uri; + let protocol = get_http_scheme_from_uri(&endpoint); + + let auth = self.auth.choose_one(&self.endpoint.auth)?; + + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls_settings, &cx.proxy)?; + + let service = ClickhouseService::new( + client.clone(), + auth.clone(), + &endpoint, + self.database.as_deref(), + self.table.as_str(), + self.skip_unknown_fields, + self.date_time_best_effort, + )?; + + let request_limits = self.request.unwrap_with(&Default::default()); + let service = ServiceBuilder::new() + .settings(request_limits, ClickhouseRetryLogic::default()) + .service(service); + + let batch_settings = self.batch.into_batcher_settings()?; + let sink = ClickhouseSink::new( + batch_settings, + self.compression, + self.encoding.clone(), + service, + protocol, + ); + + let healthcheck = Box::pin(healthcheck(client, endpoint, auth)); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) } fn input(&self) -> Input { @@ -95,3 +124,30 @@ impl SinkConfig for ClickhouseConfig { &self.acknowledgements } } + +async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option) -> crate::Result<()> { + // TODO: check if table exists? + let uri = format!("{}/?query=SELECT%201", endpoint); + let mut request = Request::get(uri).body(Body::empty()).unwrap(); + + if let Some(auth) = auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => Ok(()), + status => Err(HealthcheckError::UnexpectedStatus { status }.into()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/clickhouse/http_sink.rs b/src/sinks/clickhouse/http_sink.rs deleted file mode 100644 index 91c197d126723..0000000000000 --- a/src/sinks/clickhouse/http_sink.rs +++ /dev/null @@ -1,250 +0,0 @@ -use bytes::{BufMut, Bytes, BytesMut}; -use futures::{FutureExt, SinkExt}; -use http::{Request, StatusCode, Uri}; -use hyper::Body; -use snafu::ResultExt; - -use super::ClickhouseConfig; -use crate::{ - codecs::Transformer, - config::SinkContext, - event::Event, - http::{HttpClient, HttpError, MaybeAuth}, - sinks::{ - util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpRetryLogic, HttpSink}, - retries::{RetryAction, RetryLogic}, - Buffer, TowerRequestConfig, - }, - Healthcheck, HealthcheckError, UriParseSnafu, VectorSink, - }, - tls::TlsSettings, -}; - -pub(crate) async fn build_http_sink( - cfg: &ClickhouseConfig, - cx: SinkContext, -) -> crate::Result<(VectorSink, Healthcheck)> { - let batch = cfg.batch.into_batch_settings()?; - let request = cfg.request.unwrap_with(&TowerRequestConfig::default()); - let tls_settings = TlsSettings::from_options(&cfg.tls)?; - let client = HttpClient::new(tls_settings, &cx.proxy)?; - - let config = ClickhouseConfig { - auth: cfg.auth.choose_one(&cfg.endpoint.auth)?, - ..cfg.clone() - }; - - let sink = BatchedHttpSink::with_logic( - config.clone(), - Buffer::new(batch.size, cfg.compression), - ClickhouseRetryLogic::default(), - request, - batch.timeout, - client.clone(), - ) - .sink_map_err(|error| error!(message = "Fatal clickhouse sink error.", %error)); - - let healthcheck = healthcheck(client, config).boxed(); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) -} - -pub struct ClickhouseEventEncoder { - transformer: Transformer, -} - -impl HttpEventEncoder for ClickhouseEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - let log = event.into_log(); - - let mut body = crate::serde::json::to_bytes(&log).expect("Events should be valid json!"); - body.put_u8(b'\n'); - - Some(body) - } -} - -#[async_trait::async_trait] -impl HttpSink for ClickhouseConfig { - type Input = BytesMut; - type Output = BytesMut; - type Encoder = ClickhouseEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - ClickhouseEventEncoder { - transformer: self.encoding.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let database = if let Some(database) = &self.database { - database.as_str() - } else { - "default" - }; - - let uri = set_uri_query( - &self.endpoint.with_default_parts().uri, - database, - &self.table, - self.skip_unknown_fields, - self.date_time_best_effort, - ) - .expect("Unable to encode uri"); - - let mut builder = Request::post(&uri).header("Content-Type", "application/x-ndjson"); - - if let Some(ce) = self.compression.content_encoding() { - builder = builder.header("Content-Encoding", ce); - } - - let mut request = builder.body(events.freeze()).unwrap(); - - if let Some(auth) = &self.auth { - auth.apply(&mut request); - } - - Ok(request) - } -} - -async fn healthcheck(client: HttpClient, config: ClickhouseConfig) -> crate::Result<()> { - // TODO: check if table exists? - let uri = format!("{}/?query=SELECT%201", config.endpoint.with_default_parts()); - let mut request = Request::get(uri).body(Body::empty()).unwrap(); - - if let Some(auth) = &config.auth { - auth.apply(&mut request); - } - - let response = client.send(request).await?; - - match response.status() { - StatusCode::OK => Ok(()), - status => Err(HealthcheckError::UnexpectedStatus { status }.into()), - } -} - -fn set_uri_query( - uri: &Uri, - database: &str, - table: &str, - skip_unknown: bool, - date_time_best_effort: bool, -) -> crate::Result { - let query = url::form_urlencoded::Serializer::new(String::new()) - .append_pair( - "query", - format!( - "INSERT INTO \"{}\".\"{}\" FORMAT JSONEachRow", - database, - table.replace('\"', "\\\"") - ) - .as_str(), - ) - .finish(); - - let mut uri = uri.to_string(); - if !uri.ends_with('/') { - uri.push('/'); - } - uri.push_str("?input_format_import_nested_json=1&"); - if skip_unknown { - uri.push_str("input_format_skip_unknown_fields=1&"); - } - if date_time_best_effort { - uri.push_str("date_time_input_format=best_effort&") - } - uri.push_str(query.as_str()); - - uri.parse::() - .context(UriParseSnafu) - .map_err(Into::into) -} - -#[derive(Debug, Default, Clone)] -struct ClickhouseRetryLogic { - inner: HttpRetryLogic, -} - -impl RetryLogic for ClickhouseRetryLogic { - type Error = HttpError; - type Response = http::Response; - - fn is_retriable_error(&self, error: &Self::Error) -> bool { - self.inner.is_retriable_error(error) - } - - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - match response.status() { - StatusCode::INTERNAL_SERVER_ERROR => { - let body = response.body(); - - // Currently, ClickHouse returns 500's incorrect data and type mismatch errors. - // This attempts to check if the body starts with `Code: {code_num}` and to not - // retry those errors. - // - // Reference: https://github.com/vectordotdev/vector/pull/693#issuecomment-517332654 - // Error code definitions: https://github.com/ClickHouse/ClickHouse/blob/master/dbms/src/Common/ErrorCodes.cpp - // - // Fix already merged: https://github.com/ClickHouse/ClickHouse/pull/6271 - if body.starts_with(b"Code: 117") { - RetryAction::DontRetry("incorrect data".into()) - } else if body.starts_with(b"Code: 53") { - RetryAction::DontRetry("type mismatch".into()) - } else { - RetryAction::Retry(String::from_utf8_lossy(body).to_string().into()) - } - } - _ => self.inner.should_retry_response(response), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[test] - fn encode_valid() { - let uri = set_uri_query( - &"http://localhost:80".parse().unwrap(), - "my_database", - "my_table", - false, - true, - ) - .unwrap(); - assert_eq!(uri.to_string(), "http://localhost:80/?input_format_import_nested_json=1&date_time_input_format=best_effort&query=INSERT+INTO+%22my_database%22.%22my_table%22+FORMAT+JSONEachRow"); - - let uri = set_uri_query( - &"http://localhost:80".parse().unwrap(), - "my_database", - "my_\"table\"", - false, - false, - ) - .unwrap(); - assert_eq!(uri.to_string(), "http://localhost:80/?input_format_import_nested_json=1&query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONEachRow"); - } - - #[test] - fn encode_invalid() { - set_uri_query( - &"localhost:80".parse().unwrap(), - "my_database", - "my_table", - false, - false, - ) - .unwrap_err(); - } -} diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 21acee5aecc15..f16b19ffcc7ed 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -24,7 +24,7 @@ use crate::{ config::{log_schema, SinkConfig, SinkContext}, sinks::util::{BatchConfig, Compression, TowerRequestConfig}, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{run_and_assert_sink_compliance, SINK_TAGS}, random_string, trace_init, }, }; @@ -70,12 +70,8 @@ async fn insert_events() { .as_mut_log() .insert("items", vec!["item1", "item2"]); - run_and_assert_sink_compliance( - sink, - stream::once(ready(input_event.clone())), - &HTTP_SINK_TAGS, - ) - .await; + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -119,12 +115,8 @@ async fn skip_unknown_fields() { let (mut input_event, mut receiver) = make_event(); input_event.as_mut_log().insert("unknown", "mysteries"); - run_and_assert_sink_compliance( - sink, - stream::once(ready(input_event.clone())), - &HTTP_SINK_TAGS, - ) - .await; + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -171,12 +163,8 @@ async fn insert_events_unix_timestamps() { let (mut input_event, _receiver) = make_event(); - run_and_assert_sink_compliance( - sink, - stream::once(ready(input_event.clone())), - &HTTP_SINK_TAGS, - ) - .await; + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -239,12 +227,8 @@ timestamp_format = "unix""#, let (mut input_event, _receiver) = make_event(); - run_and_assert_sink_compliance( - sink, - stream::once(ready(input_event.clone())), - &HTTP_SINK_TAGS, - ) - .await; + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; let output = client.select_all(&table).await; assert_eq!(1, output.rows); @@ -292,10 +276,10 @@ async fn no_retry_on_incorrect_data() { }; let client = ClickhouseClient::new(host); - // the event contains a message field, but its being omitted to - // fail the request. + // The event contains a message field, but it's of type String, which will cause + // the request to fail. client - .create_table(&table, "host String, timestamp String") + .create_table(&table, "host String, timestamp String, message Int32") .await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); @@ -351,7 +335,7 @@ async fn no_retry_on_incorrect_data_warp() { .unwrap() .unwrap(); - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Errored)); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); } fn make_event() -> (Event, BatchStatusReceiver) { @@ -378,7 +362,6 @@ impl ClickhouseClient { let response = self .client .post(&self.host) - // .body(format!( "CREATE TABLE {} ({}) diff --git a/src/sinks/clickhouse/mod.rs b/src/sinks/clickhouse/mod.rs index 2f4e5af1870fc..488df9181a8bf 100644 --- a/src/sinks/clickhouse/mod.rs +++ b/src/sinks/clickhouse/mod.rs @@ -1,5 +1,17 @@ +//! The Clickhouse [`vector_core::sink::VectorSink`] +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`] instances and forwarding them to Clickhouse. +//! +//! Events are sent to Clickhouse using the HTTP interface with a query of the following structure: +//! `INSERT INTO my_db.my_table FORMAT JSONEachRow`. The event payload is encoded as new-line +//! delimited JSON. +//! +//! This sink only supports logs for now but could support metrics and traces as well in the future. + mod config; -mod http_sink; #[cfg(all(test, feature = "clickhouse-integration-tests"))] mod integration_tests; +mod service; +mod sink; pub use self::config::ClickhouseConfig; diff --git a/src/sinks/clickhouse/service.rs b/src/sinks/clickhouse/service.rs new file mode 100644 index 0000000000000..ecce62e537290 --- /dev/null +++ b/src/sinks/clickhouse/service.rs @@ -0,0 +1,257 @@ +use bytes::Bytes; +use http::{ + header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE}, + Request, Response, StatusCode, Uri, +}; +use hyper::{body, Body}; +use snafu::ResultExt; +use std::task::{Context, Poll}; +use tracing::Instrument; + +use crate::{ + http::{Auth, HttpClient, HttpError}, + sinks::{ + prelude::*, + util::{http::HttpRetryLogic, retries::RetryAction}, + UriParseSnafu, + }, +}; + +#[derive(Debug, Clone)] +pub struct ClickhouseRequest { + pub body: Bytes, + pub compression: Compression, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl MetaDescriptive for ClickhouseRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for ClickhouseRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +pub struct ClickhouseResponse { + http_response: Response, + events_byte_size: GroupedCountByteSize, + raw_byte_size: usize, +} + +impl DriverResponse for ClickhouseResponse { + fn event_status(&self) -> EventStatus { + match self.http_response.status().is_success() { + true => EventStatus::Delivered, + false => EventStatus::Rejected, + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +#[derive(Debug, Default, Clone)] +pub struct ClickhouseRetryLogic { + inner: HttpRetryLogic, +} + +impl RetryLogic for ClickhouseRetryLogic { + type Error = HttpError; + type Response = ClickhouseResponse; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + self.inner.is_retriable_error(error) + } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + match response.http_response.status() { + StatusCode::INTERNAL_SERVER_ERROR => { + let body = response.http_response.body(); + + // Currently, ClickHouse returns 500's incorrect data and type mismatch errors. + // This attempts to check if the body starts with `Code: {code_num}` and to not + // retry those errors. + // + // Reference: https://github.com/vectordotdev/vector/pull/693#issuecomment-517332654 + // Error code definitions: https://github.com/ClickHouse/ClickHouse/blob/master/dbms/src/Common/ErrorCodes.cpp + // + // Fix already merged: https://github.com/ClickHouse/ClickHouse/pull/6271 + if body.starts_with(b"Code: 117") { + RetryAction::DontRetry("incorrect data".into()) + } else if body.starts_with(b"Code: 53") { + RetryAction::DontRetry("type mismatch".into()) + } else { + RetryAction::Retry(String::from_utf8_lossy(body).to_string().into()) + } + } + _ => self.inner.should_retry_response(&response.http_response), + } + } +} + +/// `ClickhouseService` is a `Tower` service used to send logs to Clickhouse. +#[derive(Debug, Clone)] +pub struct ClickhouseService { + client: HttpClient, + uri: Uri, + auth: Option, +} + +impl ClickhouseService { + /// Creates a new `ClickhouseService`. + pub fn new( + client: HttpClient, + auth: Option, + endpoint: &Uri, + database: Option<&str>, + table: &str, + skip_unknown_fields: bool, + date_time_best_effort: bool, + ) -> crate::Result { + // Set the URI query once during initialization, as it won't change throughout the lifecycle + // of the service. + let uri = set_uri_query( + endpoint, + database.unwrap_or("default"), + table, + skip_unknown_fields, + date_time_best_effort, + )?; + Ok(Self { client, auth, uri }) + } +} + +impl Service for ClickhouseService { + type Response = ClickhouseResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + // Emission of Error internal event is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of Error internal event is handled upstream by the caller. + fn call(&mut self, request: ClickhouseRequest) -> Self::Future { + let mut client = self.client.clone(); + + let mut builder = Request::post(&self.uri) + .header(CONTENT_TYPE, "application/x-ndjson") + .header(CONTENT_LENGTH, request.body.len()); + if let Some(ce) = request.compression.content_encoding() { + builder = builder.header(CONTENT_ENCODING, ce); + } + if let Some(auth) = &self.auth { + builder = auth.apply_builder(builder); + } + + let http_request = builder + .body(Body::from(request.body)) + .expect("building HTTP request failed unexpectedly"); + + Box::pin(async move { + let response = client.call(http_request).in_current_span().await?; + let (parts, body) = response.into_parts(); + let body = body::to_bytes(body).await?; + Ok(ClickhouseResponse { + http_response: hyper::Response::from_parts(parts, body), + raw_byte_size: request.metadata.request_encoded_size(), + events_byte_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) + }) + } +} + +fn set_uri_query( + uri: &Uri, + database: &str, + table: &str, + skip_unknown: bool, + date_time_best_effort: bool, +) -> crate::Result { + let query = url::form_urlencoded::Serializer::new(String::new()) + .append_pair( + "query", + format!( + "INSERT INTO \"{}\".\"{}\" FORMAT JSONEachRow", + database, + table.replace('\"', "\\\"") + ) + .as_str(), + ) + .finish(); + + let mut uri = uri.to_string(); + if !uri.ends_with('/') { + uri.push('/'); + } + + uri.push_str("?input_format_import_nested_json=1&"); + if skip_unknown { + uri.push_str("input_format_skip_unknown_fields=1&"); + } + if date_time_best_effort { + uri.push_str("date_time_input_format=best_effort&") + } + uri.push_str(query.as_str()); + + uri.parse::() + .context(UriParseSnafu) + .map_err(Into::into) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encode_valid() { + let uri = set_uri_query( + &"http://localhost:80".parse().unwrap(), + "my_database", + "my_table", + false, + true, + ) + .unwrap(); + assert_eq!(uri.to_string(), "http://localhost:80/?input_format_import_nested_json=1&date_time_input_format=best_effort&query=INSERT+INTO+%22my_database%22.%22my_table%22+FORMAT+JSONEachRow"); + + let uri = set_uri_query( + &"http://localhost:80".parse().unwrap(), + "my_database", + "my_\"table\"", + false, + false, + ) + .unwrap(); + assert_eq!(uri.to_string(), "http://localhost:80/?input_format_import_nested_json=1&query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONEachRow"); + } + + #[test] + fn encode_invalid() { + set_uri_query( + &"localhost:80".parse().unwrap(), + "my_database", + "my_table", + false, + false, + ) + .unwrap_err(); + } +} diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs new file mode 100644 index 0000000000000..805cc50bcf4bc --- /dev/null +++ b/src/sinks/clickhouse/sink.rs @@ -0,0 +1,117 @@ +use bytes::Bytes; +use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; + +use super::service::{ClickhouseRequest, ClickhouseRetryLogic, ClickhouseService}; +use crate::{internal_events::SinkRequestBuildError, sinks::prelude::*}; + +pub struct ClickhouseSink { + batch_settings: BatcherSettings, + compression: Compression, + encoding: (Transformer, Encoder), + service: Svc, + protocol: &'static str, +} + +impl ClickhouseSink { + pub fn new( + batch_settings: BatcherSettings, + compression: Compression, + transformer: Transformer, + service: Svc, + protocol: &'static str, + ) -> Self { + Self { + batch_settings, + compression, + encoding: ( + transformer, + Encoder::::new( + NewlineDelimitedEncoderConfig::default().build().into(), + JsonSerializerConfig::default().build().into(), + ), + ), + service, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + ClickhouseRequestBuilder { + compression: self.compression, + encoding: self.encoding, + }, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol(self.protocol) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for ClickhouseSink { + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +struct ClickhouseRequestBuilder { + compression: Compression, + encoding: (Transformer, Encoder), +} + +impl RequestBuilder> for ClickhouseRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = (Transformer, Encoder); + type Payload = Bytes; + type Request = ClickhouseRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoding + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + ClickhouseRequest { + body: payload.into_payload(), + compression: self.compression, + finalizers: metadata, + metadata: request_metadata, + } + } +} diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index ffef449c78df1..775b530c41002 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -17,7 +17,7 @@ pub use crate::{ BatchConfig, Compression, NoDefaultsBatchSettings, RequestBuilder, SinkBatchSettings, TowerRequestConfig, }, - Healthcheck, + Healthcheck, HealthcheckError, }, template::{Template, TemplateParseError}, tls::TlsConfig,