From 4e44b00898cbad0ff037401b6828874940f70e5c Mon Sep 17 00:00:00 2001 From: Peter Morgan Date: Fri, 23 Aug 2024 10:26:37 +0100 Subject: [PATCH] kafka protocol proxy --- justfile | 11 ++- tansu-proxy/Cargo.toml | 15 ++++ tansu-proxy/src/lib.rs | 155 ++++++++++++++++++++++++++++++++++++++++ tansu-proxy/src/main.rs | 66 +++++++++++++++++ 4 files changed, 245 insertions(+), 2 deletions(-) create mode 100644 tansu-proxy/Cargo.toml create mode 100644 tansu-proxy/src/lib.rs create mode 100644 tansu-proxy/src/main.rs diff --git a/justfile b/justfile index 5f6670b..70141ab 100644 --- a/justfile +++ b/justfile @@ -17,10 +17,10 @@ test-topic-describe: kafka-topics --bootstrap-server 127.0.0.1:9092 --describe --topic test test-topic-create: - kafka-topics --bootstrap-server 127.0.0.1:9092 --config cleanup.policy=compact --partitions=1 --replication-factor=1 --create --topic test + kafka-topics --bootstrap-server 127.0.0.1:9092 --config cleanup.policy=compact --partitions=3 --replication-factor=1 --create --topic test test-topic-produce: - echo "h1:pqr,h2:jkl,h3:uio qwerty poiuy" | kafka-console-producer --bootstrap-server localhost:9092 --topic test --property parse.headers=true --property parse.key=true + echo "h1:pqr,h2:jkl,h3:uio qwerty poiuy\nh1:def,h2:lmn,h3:xyz asdfgh lkj\nh1:stu,h2:fgh,h3:ijk zxcvbn mnbvc" | kafka-console-producer --bootstrap-server localhost:9092 --topic test --property parse.headers=true --property parse.key=true test-topic-consume: kafka-console-consumer --consumer.config /usr/local/etc/kafka/consumer.properties --bootstrap-server localhost:9092 --topic test --from-beginning --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.headers=true --property print.value=true @@ -35,4 +35,11 @@ tansu-1: --raft-peer-url tcp://localhost:4569/ \ --work-dir work-dir/tansu-1 +kafka-proxy: + docker run -d -p 19092:9092 apache/kafka:3.8.0 + +proxy: + ./target/debug/tansu-proxy + + all: test miri diff --git a/tansu-proxy/Cargo.toml b/tansu-proxy/Cargo.toml new file mode 100644 index 0000000..d25d4f7 --- /dev/null +++ b/tansu-proxy/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tansu-proxy" +edition.workspace = true +version.workspace = true + +[dependencies] +bytes.workspace = true +clap.workspace = true +tansu-kafka-model = { path = "../tansu-kafka-model" } +tansu-kafka-sans-io = { path = "../tansu-kafka-sans-io" } +thiserror.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true +url.workspace = true diff --git a/tansu-proxy/src/lib.rs b/tansu-proxy/src/lib.rs new file mode 100644 index 0000000..61345f0 --- /dev/null +++ b/tansu-proxy/src/lib.rs @@ -0,0 +1,155 @@ +// Copyright ⓒ 2024 Peter Morgan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::{ + fmt, + io::{self, ErrorKind}, + result, + sync::Arc, +}; +use tansu_kafka_sans_io::{Frame, Header}; +use thiserror::Error; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; +use tracing::{debug, error, info}; +use url::Url; + +pub type Result = result::Result; + +#[derive(Error, Debug)] +pub enum Error { + Io(Arc), + Protocol(#[from] tansu_kafka_sans_io::Error), +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(Arc::new(value)) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +#[derive(Clone, Debug)] +pub struct Proxy { + listener: Url, + origin: Url, +} + +impl Proxy { + pub fn new(listener: Url, origin: Url) -> Self { + Self { listener, origin } + } + + pub async fn listen(&self) -> Result<()> { + debug!("listener: {}", self.listener.as_str()); + + let listener = TcpListener::bind(format!( + "{}:{}", + self.listener.host_str().unwrap(), + self.listener.port().unwrap() + )) + .await?; + + loop { + let (stream, addr) = listener.accept().await?; + info!(?addr); + + let mut connection = Connection::open(&self.origin, stream).await?; + + _ = tokio::spawn(async move { + match connection.stream_handler().await { + Err(ref error @ Error::Io(ref io)) if io.kind() == ErrorKind::UnexpectedEof => { + info!(?error); + } + + Err(error) => { + error!(?error); + } + + Ok(_) => {} + } + }); + } + } +} + +struct Connection { + proxy: TcpStream, + origin: TcpStream, +} + +impl Connection { + async fn open(origin: &Url, proxy: TcpStream) -> Result { + TcpStream::connect(format!( + "{}:{}", + origin.host_str().unwrap(), + origin.port().unwrap() + )) + .await + .map(|origin| Self { proxy, origin }) + .map_err(Into::into) + } + + async fn stream_handler(&mut self) -> Result<()> { + let mut size = [0u8; 4]; + + loop { + _ = self.proxy.read_exact(&mut size).await?; + + let mut buffer: Vec = vec![0u8; i32::from_be_bytes(size) as usize + size.len()]; + buffer[0..4].copy_from_slice(&size[..]); + _ = self.proxy.read_exact(&mut buffer[4..]).await?; + + let request = Frame::request_from_bytes(&buffer)?; + debug!(?request); + + match request { + Frame { + header: + Header::Request { + api_key, + api_version, + .. + }, + .. + } => { + self.origin.write_all(&buffer).await?; + + _ = self.origin.read_exact(&mut size).await?; + + let mut buffer: Vec = + vec![0u8; i32::from_be_bytes(size) as usize + size.len()]; + buffer[0..4].copy_from_slice(&size[..]); + _ = self.origin.read_exact(&mut buffer[4..]).await?; + + let response = Frame::response_from_bytes(&buffer, api_key, api_version)?; + + debug!(?response); + + self.proxy.write_all(&buffer).await?; + } + + _ => unreachable!(), + }; + } + } +} diff --git a/tansu-proxy/src/main.rs b/tansu-proxy/src/main.rs new file mode 100644 index 0000000..1ec1ac1 --- /dev/null +++ b/tansu-proxy/src/main.rs @@ -0,0 +1,66 @@ +// Copyright ⓒ 2024 Peter Morgan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use clap::Parser; +use tansu_proxy::Result; +use tokio::task::JoinSet; +use tracing::{debug, Level}; +use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, prelude::*}; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Cli { + #[arg(long, default_value = "tcp://localhost:9092")] + listener_url: Url, + + #[arg(long, default_value = "tcp://localhost:19092")] + origin_url: Url, +} + +#[tokio::main] +async fn main() -> Result<()> { + let filter = Targets::new().with_target("tansu_proxy", Level::DEBUG); + + tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .pretty() + .with_line_number(true) + .with_span_events(FmtSpan::ACTIVE) + .with_thread_ids(true), + ) + .with(filter) + .init(); + + let args = Cli::parse(); + + let mut set = JoinSet::new(); + + { + let proxy = tansu_proxy::Proxy::new(args.listener_url, args.origin_url); + debug!(?proxy); + + set.spawn(async move { proxy.listen().await.unwrap() }); + } + + loop { + if set.join_next().await.is_none() { + break; + } + } + + Ok(()) +}