From 1cf52900a64b462c78b8d53fa23ba62846ea6f2e Mon Sep 17 00:00:00 2001 From: Peter Morgan Date: Thu, 19 Sep 2024 11:22:47 +0100 Subject: [PATCH] message headers --- tansu-kafka-sans-io/src/record/header.rs | 6 ++ tansu-server/src/broker/produce.rs | 9 +- tansu-storage/src/pg.rs | 117 ++++++++++++++++------- 3 files changed, 98 insertions(+), 34 deletions(-) diff --git a/tansu-kafka-sans-io/src/record/header.rs b/tansu-kafka-sans-io/src/record/header.rs index 824eb5c..9f1a81f 100644 --- a/tansu-kafka-sans-io/src/record/header.rs +++ b/tansu-kafka-sans-io/src/record/header.rs @@ -28,6 +28,12 @@ pub struct Header { pub value: Option, } +impl Header { + pub fn builder() -> Builder { + Builder::default() + } +} + impl From for Header { fn from(value: Builder) -> Self { Self { diff --git a/tansu-server/src/broker/produce.rs b/tansu-server/src/broker/produce.rs index e73fbd0..7423bbd 100644 --- a/tansu-server/src/broker/produce.rs +++ b/tansu-server/src/broker/produce.rs @@ -20,7 +20,7 @@ use tansu_kafka_sans_io::{ Body, ErrorCode, }; use tansu_storage::{Storage, Topition}; -use tracing::debug; +use tracing::{debug, error}; #[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ProduceRequest { @@ -59,7 +59,12 @@ where let tp = Topition::new(name, partition.index); - if let Ok(base_offset) = self.storage.produce(&tp, batch).await { + if let Ok(base_offset) = self + .storage + .produce(&tp, batch) + .await + .inspect_err(|err| error!(?err)) + { PartitionProduceResponse { index: partition.index, error_code: ErrorCode::None.into(), diff --git a/tansu-storage/src/pg.rs b/tansu-storage/src/pg.rs index fa61fd5..a85bbab 100644 --- a/tansu-storage/src/pg.rs +++ b/tansu-storage/src/pg.rs @@ -30,7 +30,7 @@ use tansu_kafka_sans_io::{ delete_records_response::{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}, describe_cluster_response::DescribeClusterBroker, metadata_response::{MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic}, - record::{deflated, inflated, Record}, + record::{deflated, inflated, Header, Record}, to_system_time, to_timestamp, ErrorCode, }; use tokio_postgres::{error::SqlState, Config, NoTls}; @@ -476,7 +476,7 @@ impl Storage for Postgres { let tx = c.transaction().await?; - let prepared = tx + let insert_record = tx .prepare(concat!( "insert into record", " (topic, partition, producer_id, sequence, timestamp, k, v)", @@ -486,7 +486,18 @@ impl Storage for Postgres { " where topic.name = $1", " returning id" )) - .await?; + .await + .inspect_err(|err| error!(?err))?; + + let insert_header = tx + .prepare(concat!( + "insert into header", + " (record, k, v)", + " values", + " ($1, $2, $3)", + )) + .await + .inspect_err(|err| error!(?err))?; let inflated = inflated::Batch::try_from(deflated)?; let mut offsets = vec![]; @@ -494,23 +505,42 @@ impl Storage for Postgres { for record in inflated.records { debug!(?record); + let topic = topition.topic(); + let partition = topition.partition(); + let key = record.key.as_deref(); + let value = record.value.as_deref(); + let row = tx .query_one( - &prepared, + &insert_record, &[ - &topition.topic(), - &topition.partition(), + &topic, + &partition, &inflated.producer_id, &inflated.base_sequence, &(to_system_time(inflated.base_timestamp + record.timestamp_delta)?), - &record.key.as_deref(), - &record.value.as_deref(), + &key, + &value, ], ) - .await?; + .await + .inspect_err(|err| error!(?err, ?topic, ?partition, ?key, ?value))?; let offset: i64 = row.get(0); debug!(?offset); + + for header in record.headers { + let key = header.key.as_deref(); + let value = header.value.as_deref(); + + _ = tx + .execute(&insert_header, &[&offset, &key, &value]) + .await + .inspect_err(|err| { + error!(?err, ?topic, ?partition, ?offset, ?key, ?value); + }); + } + offsets.push(offset); } @@ -529,7 +559,7 @@ impl Storage for Postgres { debug!(?topition, ?offset); let c = self.connection().await?; - let prepared = c + let select_batch = c .prepare(concat!( "with sized as (", " select", @@ -550,11 +580,17 @@ impl Storage for Postgres { ") select * from sized", " where bytes < $5", )) - .await?; + .await + .inspect_err(|err| error!(?err))?; - let rows = c + let select_headers = c + .prepare(concat!("select k, v from header where record = $1")) + .await + .inspect_err(|err| error!(?err))?; + + let records = c .query( - &prepared, + &select_batch, &[ &self.cluster, &topition.topic(), @@ -565,7 +601,9 @@ impl Storage for Postgres { ) .await?; - let builder = if let Some(first) = rows.first() { + let mut bytes = 0; + + let batch = if let Some(first) = records.first() { let base_offset: i64 = first.try_get(0)?; debug!(?base_offset); @@ -574,16 +612,13 @@ impl Storage for Postgres { .map_err(Error::from) .and_then(|system_time| to_timestamp(system_time).map_err(Into::into))?; - let mut builder = inflated::Batch::builder() + let mut batch_builder = inflated::Batch::builder() .base_offset(base_offset) .base_timestamp(base_timestamp); - for record in rows.iter() { - let offset_delta = record - .try_get::<_, i64>(0) - .map_err(Error::from) - .map(|offset| offset - base_offset) - .and_then(|delta| i32::try_from(delta).map_err(Into::into))?; + for record in records.iter() { + let offset = record.try_get::<_, i64>(0)?; + let offset_delta = i32::try_from(offset - base_offset)?; let timestamp_delta = first .try_get::<_, SystemTime>(1) @@ -602,27 +637,45 @@ impl Storage for Postgres { .try_get::<_, Option<&[u8]>>(3) .map(|o| o.map(Bytes::copy_from_slice))?; - let bytes = record.try_get::<_, i64>(4)?; + bytes += record.try_get::<_, i64>(4)?; + + let mut record_builder = Record::builder() + .offset_delta(offset_delta) + .timestamp_delta(timestamp_delta) + .key(k.into()) + .value(v.into()); + + for header in c.query(&select_headers, &[&offset]).await? { + let mut header_builder = Header::builder(); + + if let Some(k) = header.try_get::<_, Option<&[u8]>>(0)? { + bytes += i64::try_from(k.len())?; + + header_builder = header_builder.key(k.to_vec()); + } + + if let Some(v) = header.try_get::<_, Option<&[u8]>>(1)? { + bytes += i64::try_from(v.len())?; + + header_builder = header_builder.value(v.to_vec()); + } + + record_builder = record_builder.header(header_builder); + } - builder = builder.record( - Record::builder() - .offset_delta(offset_delta) - .timestamp_delta(timestamp_delta) - .key(k.into()) - .value(v.into()), - ); + batch_builder = batch_builder.record(record_builder); if bytes > (min_bytes as i64) { break; } } - builder + batch_builder } else { inflated::Batch::builder() }; - builder + batch .build() .and_then(TryInto::try_into) .map_err(Into::into) @@ -830,7 +883,7 @@ impl Storage for Postgres { ), }; - let prepared = c.prepare(query).await?; + let prepared = c.prepare(query).await.inspect_err(|err| error!(?err))?; let list_offset = match offset_type { ListOffsetRequest::Earliest | ListOffsetRequest::Latest => {