Skip to content

Commit

Permalink
chore(socket source): Remove deprecated max_length setting from `tc…
Browse files Browse the repository at this point in the history
…p` and `unix` modes. (vectordotdev#17162)
  • Loading branch information
neuronull authored Apr 19, 2023
1 parent 854d71e commit 9ecfc8c
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 97 deletions.
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/newline_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct NewlineDelimitedDecoderOptions {
/// consider setting the maximum length to a reasonably large value as a safety net. This
/// ensures that processing is not actually unbounded.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

impl NewlineDelimitedDecoderOptions {
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/octet_counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OctetCountingDecoderConfig {
pub struct OctetCountingDecoderOptions {
/// The maximum length of the byte buffer.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

/// Codec using the `Octet Counting` format as specified in
Expand Down
72 changes: 25 additions & 47 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod udp;
#[cfg(unix)]
mod unix;

use codecs::{decoding::DeserializerConfig, NewlineDelimitedDecoderConfig};
use codecs::decoding::DeserializerConfig;
use lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
Expand Down Expand Up @@ -113,26 +113,18 @@ impl SourceConfig for SocketConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
match self.mode.clone() {
Mode::Tcp(config) => {
let decoding = config.decoding().clone();
// TODO: in v0.30.0 , remove the `max_length` setting from
// the UnixConfig, and all of the below mess and replace
// it with the configured framing /
// decoding.default_stream_framing().
let framing = match (config.framing().clone(), config.max_length()) {
(Some(framing), Some(_)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Since a `framing` setting was provided, the `max_length` setting has no effect.");
framing
}
(Some(framing), None) => framing,
(None, Some(max_length)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Please configure the `max_length` from the `framing` setting instead.");
NewlineDelimitedDecoderConfig::new_with_max_length(max_length).into()
}
(None, None) => decoding.default_stream_framing(),
};

let log_namespace = cx.log_namespace(config.log_namespace);
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();

let decoding = config.decoding().clone();
let decoder = DecodingConfig::new(
config
.framing
.clone()
.unwrap_or_else(|| decoding.default_stream_framing()),
decoding,
log_namespace,
)
.build();

let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
Expand Down Expand Up @@ -190,27 +182,18 @@ impl SourceConfig for SocketConfig {
}
#[cfg(unix)]
Mode::UnixStream(config) => {
let decoding = config.decoding.clone();

// TODO: in v0.30.0 , remove the `max_length` setting from
// the UnixConfig, and all of the below mess and replace
// it with the configured framing /
// decoding.default_stream_framing().
let framing = match (config.framing.clone(), config.max_length) {
(Some(framing), Some(_)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Since a `framing` setting was provided, the `max_length` setting has no effect.");
framing
}
(Some(framing), None) => framing,
(None, Some(max_length)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Please configure the `max_length` from the `framing` setting instead.");
NewlineDelimitedDecoderConfig::new_with_max_length(max_length).into()
}
(None, None) => decoding.default_stream_framing(),
};

let log_namespace = cx.log_namespace(config.log_namespace);
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();

let decoding = config.decoding().clone();
let decoder = DecodingConfig::new(
config
.framing
.clone()
.unwrap_or_else(|| decoding.default_stream_framing()),
decoding,
log_namespace,
)
.build();

unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
}
Expand Down Expand Up @@ -333,10 +316,6 @@ pub(crate) fn default_host_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!(log_schema().host_key()))
}

fn default_max_length() -> Option<usize> {
Some(crate::serde::default_max_length())
}

#[cfg(test)]
mod test {
use approx::assert_relative_eq;
Expand Down Expand Up @@ -527,7 +506,6 @@ mod test {
let addr = next_addr();

let mut config = TcpConfig::from_address(addr.into());
config.set_max_length(None);
config.set_framing(Some(
NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
));
Expand Down Expand Up @@ -1024,7 +1002,7 @@ mod test {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
config.max_length = Some(11);
config.max_length = 11;
let address = init_udp_with_config(tx, config).await;

send_lines_udp(
Expand Down Expand Up @@ -1060,7 +1038,7 @@ mod test {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
config.max_length = Some(10);
config.max_length = 10;
config.framing = CharacterDelimitedDecoderConfig {
character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
}
Expand Down
26 changes: 3 additions & 23 deletions src/sources/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
tls::TlsSourceConfig,
};

use super::{default_host_key, default_max_length, SocketConfig};
use super::{default_host_key, SocketConfig};

/// TCP configuration for the `socket` source.
#[serde_as]
Expand All @@ -30,16 +30,6 @@ pub struct TcpConfig {
#[configurable(derived)]
keepalive: Option<TcpKeepaliveConfig>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
// TODO: communicated as deprecated in v0.29.0, can be removed in v0.30.0
#[configurable(
deprecated = "This option has been deprecated. Configure `max_length` on the framing config instead."
)]
#[configurable(metadata(docs::type_unit = "bytes"))]
max_length: Option<usize>,

/// The timeout before a connection is forcefully closed during shutdown.
#[serde(default = "default_shutdown_timeout_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
Expand Down Expand Up @@ -85,11 +75,11 @@ pub struct TcpConfig {
pub connection_limit: Option<u32>,

#[configurable(derived)]
framing: Option<FramingConfig>,
pub(super) framing: Option<FramingConfig>,

#[configurable(derived)]
#[serde(default = "default_decoding")]
decoding: DeserializerConfig,
pub(super) decoding: DeserializerConfig,

/// The namespace to use for logs. This overrides the global setting.
#[serde(default)]
Expand All @@ -110,7 +100,6 @@ impl TcpConfig {
Self {
address,
keepalive: None,
max_length: default_max_length(),
shutdown_timeout_secs: default_shutdown_timeout_secs(),
host_key: default_host_key(),
port_key: default_port_key(),
Expand Down Expand Up @@ -152,10 +141,6 @@ impl TcpConfig {
self.keepalive
}

pub const fn max_length(&self) -> Option<usize> {
self.max_length
}

pub const fn shutdown_timeout_secs(&self) -> Duration {
self.shutdown_timeout_secs
}
Expand All @@ -173,11 +158,6 @@ impl TcpConfig {
self
}

pub fn set_max_length(&mut self, val: Option<usize>) -> &mut Self {
self.max_length = val;
self
}

pub fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
self.shutdown_timeout_secs = Duration::from_secs(val);
self
Expand Down
10 changes: 4 additions & 6 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct UdpConfig {
/// Messages larger than this are truncated.
#[serde(default = "default_max_length")]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub(super) max_length: Option<usize>,
pub(super) max_length: usize,

/// Overrides the name of the log field used to add the peer host to each event.
///
Expand Down Expand Up @@ -95,8 +95,8 @@ fn default_port_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("port"))
}

fn default_max_length() -> Option<usize> {
Some(crate::serde::default_max_length())
fn default_max_length() -> usize {
crate::serde::default_max_length()
}

impl UdpConfig {
Expand Down Expand Up @@ -163,9 +163,7 @@ pub(super) fn udp(
}
}

let mut max_length = config
.max_length
.unwrap_or_else(|| default_max_length().unwrap());
let mut max_length = config.max_length;

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
max_length = std::cmp::min(max_length, receive_buffer_bytes);
Expand Down
29 changes: 14 additions & 15 deletions src/sources/socket/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
SourceSender,
};

use super::{default_host_key, default_max_length, SocketConfig};
use super::{default_host_key, SocketConfig};

/// Unix domain socket configuration for the `socket` source.
#[configurable_component]
Expand All @@ -41,16 +41,6 @@ pub struct UnixConfig {
#[configurable(metadata(docs::examples = 508))]
pub socket_file_mode: Option<u32>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
// TODO: communicated as deprecated in v0.29.0, can be removed in v0.30.0
#[configurable(
deprecated = "This option has been deprecated. Configure `max_length` on the framing config instead."
)]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_length: Option<usize>,

/// Overrides the name of the log field used to add the peer host to each event.
///
/// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
Expand Down Expand Up @@ -82,7 +72,6 @@ impl UnixConfig {
Self {
path,
socket_file_mode: None,
max_length: default_max_length(),
host_key: default_host_key(),
framing: None,
decoding: default_decoding(),
Expand Down Expand Up @@ -135,12 +124,22 @@ pub(super) fn unix_datagram(
out: SourceSender,
log_namespace: LogNamespace,
) -> crate::Result<Source> {
let max_length = config
.framing
.and_then(|framing| match framing {
FramingConfig::CharacterDelimited {
character_delimited,
} => character_delimited.max_length,
FramingConfig::NewlineDelimited { newline_delimited } => newline_delimited.max_length,
FramingConfig::OctetCounting { octet_counting } => octet_counting.max_length,
_ => None,
})
.unwrap_or_else(crate::serde::default_max_length);

build_unix_datagram_source(
config.path,
config.socket_file_mode,
config
.max_length
.unwrap_or_else(crate::serde::default_max_length),
max_length,
decoder,
move |events, received_from| {
handle_events(events, &config.host_key, received_from, log_namespace)
Expand Down
32 changes: 32 additions & 0 deletions website/content/en/highlights/2023-05-23-0-30-0-upgrade-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
date: "2023-05-23"
title: "0.30 Upgrade Guide"
description: "An upgrade guide that addresses breaking changes in 0.30.0"
authors: ["neuronull"]
release: "0.30.0"
hide_on_release_notes: false
badges:
type: breaking change
---

Vector's 0.30.0 release includes **breaking changes**:

1. [Removal of the `socket` source's `tcp` and `unix` mode `max_length` setting](#socket-source-max-length)

We cover them below to help you upgrade quickly:

## Upgrade guide

### Breaking changes

#### Removal of the `socket` source's `tcp` and `unix` mode `max_length` setting {#socket-source-max-length}

In v0.29.0 the `max_length` setting, used by the `tcp` and `unix` modes
of the `socket` source, was marked as deprecated.

That setting was replaced by the `max_length` setting within the `framing`
setting.

Any explicit usages of `max_length` for those modes of the `socket`
source will no longer work and configurations will need to instead use
a `framing` setting in order to set a maximum length.
10 changes: 6 additions & 4 deletions website/cue/reference/components/sources/base/socket.cue
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,17 @@ base: components: sources: socket: configuration: {
type: uint: unit: "seconds"
}
max_length: {
deprecated: true
deprecated_message: "This option has been deprecated. Configure `max_length` on the framing config instead."
description: """
The maximum buffer size of incoming messages.
Messages larger than this are truncated.
"""
required: false
type: uint: unit: "bytes"
relevant_when: "mode = \"udp\""
required: false
type: uint: {
default: 102400
unit: "bytes"
}
}
mode: {
description: "The type of socket to use."
Expand Down

0 comments on commit 9ecfc8c

Please sign in to comment.