Skip to content

Commit

Permalink
feat(codecs): Add full codec support to AWS S3 source/sink (vectordot…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuchsnj authored Apr 28, 2023
1 parent d286d16 commit d648c86
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 101 deletions.
22 changes: 14 additions & 8 deletions src/codecs/decoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use crate::{
/// messages.
#[derive(Clone)]
pub struct Decoder {
framer: Framer,
deserializer: Deserializer,
log_namespace: LogNamespace,
/// The framer being used.
pub framer: Framer,
/// The deserializer being used.
pub deserializer: Deserializer,
/// The `log_namespace` being used.
pub log_namespace: LogNamespace,
}

impl Default for Decoder {
Expand Down Expand Up @@ -61,16 +64,19 @@ impl Decoder {
Error::FramingError(error)
})?;

let frame = match frame {
Some(frame) => frame,
_ => return Ok(None),
};
frame
.map(|frame| self.deserializer_parse(frame))
.transpose()
}

/// Parses a frame using the included deserializer, and handles any errors by logging.
pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
let byte_size = frame.len();

// Parse structured events from the byte frame.
self.deserializer
.parse(frame, self.log_namespace)
.map(|events| Some((events, byte_size)))
.map(|events| (events, byte_size))
.map_err(|error| {
emit!(DecoderDeserializeError { error: &error });
Error::ParsingError(error)
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::sink::S3RequestOptions;
use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
sinks::{
s3_common::{
self,
Expand Down Expand Up @@ -177,7 +174,7 @@ impl SinkConfig for S3SinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down
34 changes: 30 additions & 4 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@ use std::{convert::TryInto, io::ErrorKind};

use async_compression::tokio::bufread;
use aws_sdk_s3::types::ByteStream;
use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions};
use codecs::BytesDeserializerConfig;
use futures::{stream, stream::StreamExt, TryStreamExt};
use lookup::owned_value_path;
use snafu::Snafu;
use tokio_util::io::StreamReader;
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
use vector_core::config::{DataType, LegacyKey, LogNamespace};
use vector_core::config::{LegacyKey, LogNamespace};

use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
use crate::config::DataType;
use crate::{
aws::{auth::AwsAuthentication, create_client, RegionOrEndpoint},
common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
config::{
ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
},
line_agg,
serde::bool_or_struct,
serde::{bool_or_struct, default_decoding},
tls::TlsConfig,
};

Expand Down Expand Up @@ -71,7 +74,8 @@ enum Strategy {
//
// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(default, deny_unknown_fields)]
pub struct AwsS3Config {
#[serde(flatten)]
Expand Down Expand Up @@ -115,6 +119,23 @@ pub struct AwsS3Config {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default = "default_framing")]
#[derivative(Default(value = "default_framing()"))]
pub framing: FramingConfig,

#[configurable(derived)]
#[serde(default = "default_decoding")]
#[derivative(Default(value = "default_decoding()"))]
pub decoding: DeserializerConfig,
}

const fn default_framing() -> FramingConfig {
// This is used for backwards compatibility. It used to be the only (hardcoded) option.
FramingConfig::NewlineDelimited {
newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
}
}

impl_generate_config_from_default!(AwsS3Config);
Expand All @@ -133,7 +154,7 @@ impl SourceConfig for AwsS3Config {

match self.strategy {
Strategy::Sqs => Ok(Box::pin(
self.create_sqs_ingestor(multiline_config, &cx.proxy)
self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
.await?
.run(cx, self.acknowledgements, log_namespace),
)),
Expand Down Expand Up @@ -200,6 +221,7 @@ impl AwsS3Config {
&self,
multiline: Option<line_agg::Config>,
proxy: &ProxyConfig,
log_namespace: LogNamespace,
) -> crate::Result<sqs::Ingestor> {
let region = self
.region
Expand All @@ -221,6 +243,9 @@ impl AwsS3Config {
)
.await?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();

match self.sqs {
Some(ref sqs) => {
let sqs_client = create_client::<SqsClientBuilder>(
Expand All @@ -240,6 +265,7 @@ impl AwsS3Config {
sqs.clone(),
self.compression,
multiline,
decoder,
)
.await?;

Expand Down
Loading

0 comments on commit d648c86

Please sign in to comment.