Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove destination address allocation #1042

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[target.'cfg(all())']
rustflags = [
"-Ctarget-feature=+aes,+avx2",
]
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 17 additions & 8 deletions benches/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,31 @@ fn token_router(b: Bencher, token_kind: &str) {
use rand::seq::SliceRandom as _;
let tok = tokens.choose(&mut rand).unwrap();

let mut rc = quilkin::filters::ReadContext::new(
cm.clone(),
quilkin::net::EndpointAddress::LOCALHOST,
pool.clone().alloc(),
);
rc.metadata.insert(
let mut metadata = quilkin::net::endpoint::DynamicMetadata::default();
metadata.insert(
quilkin::net::endpoint::metadata::Key::from_static(
quilkin::filters::capture::CAPTURED_BYTES,
),
quilkin::net::endpoint::metadata::Value::Bytes((*tok).clone().into()),
);

rc
(
cm.clone(),
pool.clone().alloc(),
Vec::with_capacity(1),
metadata,
)
})
.counter(divan::counter::BytesCount::new(total_token_size))
.bench_local_values(|mut rc| {
.bench_local_values(|(cm, buffer, mut dest, metadata)| {
let mut rc = quilkin::filters::ReadContext {
endpoints: cm,
destinations: &mut dest,
source: quilkin::net::EndpointAddress::LOCALHOST,
contents: buffer,
metadata,
};

let _ = divan::black_box(filter.sync_read(&mut rc));
})
}
Expand Down
9 changes: 8 additions & 1 deletion src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub enum PacketProcessorCtx {
sessions: Arc<crate::components::proxy::SessionPool>,
error_acc: super::error::ErrorAccumulator,
worker_id: usize,
destinations: Vec<crate::net::EndpointAddress>,
},
SessionPool {
pool: Arc<crate::components::proxy::SessionPool>,
Expand Down Expand Up @@ -326,6 +327,7 @@ fn process_packet(
sessions,
worker_id,
error_acc,
destinations,
} => {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
Expand All @@ -340,7 +342,12 @@ fn process_packet(
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet, *worker_id, config, sessions, error_acc,
ds_packet,
*worker_id,
config,
sessions,
error_acc,
destinations,
);

packet_processed_event.write(1);
Expand Down
13 changes: 6 additions & 7 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl DownstreamReceiveWorkerConfig {
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -70,7 +71,7 @@ impl DownstreamReceiveWorkerConfig {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions) {
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -92,6 +93,7 @@ impl DownstreamReceiveWorkerConfig {
packet: DownstreamPacket,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
destinations: &mut Vec<crate::net::EndpointAddress>,
) -> Result<(), PipelineError> {
if !config.clusters.read().has_endpoints() {
tracing::trace!("no upstream endpoints");
Expand All @@ -103,21 +105,18 @@ impl DownstreamReceiveWorkerConfig {
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
destinations,
);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext {
destinations,
contents,
..
} = context;
let ReadContext { contents, .. } = context;

// Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer
// into an immutable one with its own internal arc so it can be cloned
// cheaply and returned to the pool once all references are dropped
let contents = contents.freeze();

for epa in destinations {
for epa in destinations.drain(0..) {
let session_key = SessionKey {
source: packet.source,
dest: epa.to_socket_addr()?,
Expand Down
1 change: 1 addition & 0 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl super::DownstreamReceiveWorkerConfig {
sessions,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
destinations: Vec::with_capacity(1),
},
io_uring_shared::PacketReceiver::Router(upstream_receiver),
buffer_pool,
Expand Down
10 changes: 9 additions & 1 deletion src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl super::DownstreamReceiveWorkerConfig {

let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
Expand All @@ -131,7 +132,14 @@ impl super::DownstreamReceiveWorkerConfig {
}
last_received_at = Some(received_at);

Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc);
Self::process_task(
packet,
worker_id,
&config,
&sessions,
&mut error_acc,
&mut destinations,
);
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand Down
2 changes: 1 addition & 1 deletion src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
self.load().read(ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub trait Filter: Send + Sync {
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> {
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Capture {

impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let capture = self.capture.capture(&mut ctx.contents);
ctx.metadata.insert(
self.is_present_key,
Expand Down Expand Up @@ -160,11 +160,13 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
assert!(filter
.read(&mut ReadContext::new(
endpoints.into(),
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
alloc_buffer(b"abc"),
&mut dest,
))
.is_err());
}
Expand Down Expand Up @@ -237,10 +239,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut context = ReadContext::new(
endpoints.into(),
"127.0.0.1:80".parse().unwrap(),
alloc_buffer(b"helloabc"),
&mut dest,
);

filter.read(&mut context).unwrap();
Expand Down
40 changes: 20 additions & 20 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl schemars::JsonSchema for FilterChain {
}

impl Filter for FilterChain {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
Expand All @@ -296,12 +296,8 @@ impl Filter for FilterChain {
// has rejected, and the destinations is empty, we passthrough to all.
// Which mimics the old behaviour while avoid clones in most cases.
if ctx.destinations.is_empty() {
ctx.destinations = ctx
.endpoints
.endpoints()
.into_iter()
.map(|ep| ep.address)
.collect();
ctx.destinations
.extend(ctx.endpoints.endpoints().into_iter().map(|ep| ep.address));
}

Ok(())
Expand Down Expand Up @@ -382,10 +378,12 @@ mod tests {
crate::test::load_test_filters();
let config = TestConfig::new();
let endpoints_fixture = endpoints();
let mut dest = Vec::new();
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);

config.filters.read(&mut context).unwrap();
Expand Down Expand Up @@ -435,22 +433,24 @@ mod tests {
.unwrap();

let endpoints_fixture = endpoints();
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
);

chain.read(&mut context).unwrap();
let mut dest = Vec::new();

let (contents, metadata) = {
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);
chain.read(&mut context).unwrap();
(context.contents, context.metadata)
};
let expected = endpoints_fixture.clone();
assert_eq!(expected.endpoints(), context.destinations);
assert_eq!(
b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70",
&*context.contents
);
assert_eq!(expected.endpoints(), dest);
assert_eq!(b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70", &*contents);
assert_eq!(
"receive:receive",
context.metadata[&"downstream".into()].as_string().unwrap()
metadata[&"downstream".into()].as_string().unwrap()
);

let mut context = WriteContext::new(
Expand Down
10 changes: 9 additions & 1 deletion src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Compress {

impl Filter for Compress {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let original_size = ctx.contents.len();

match self.on_read {
Expand Down Expand Up @@ -296,10 +296,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(&expected),
&mut dest,
);
compress.read(&mut read_context).expect("should compress");

Expand Down Expand Up @@ -356,11 +358,13 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
assert!(compression
.read(&mut ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
))
.is_err());
}
Expand All @@ -379,10 +383,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);
compression.read(&mut read_context).unwrap();
assert_eq!(b"hello", &*read_context.contents);
Expand Down Expand Up @@ -474,10 +480,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
write_context.contents,
&mut dest,
);

filter.read(&mut read_context).expect("should decompress");
Expand Down
Loading
Loading