Skip to content

Commit

Permalink
Merge pull request #42 from tansu-io/41-message-headers
Browse files Browse the repository at this point in the history
message headers
  • Loading branch information
shortishly authored Sep 19, 2024
2 parents 4681329 + 1cf5290 commit 9d39400
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 34 deletions.
6 changes: 6 additions & 0 deletions tansu-kafka-sans-io/src/record/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ pub struct Header {
pub value: Option<Bytes>,
}

impl Header {
pub fn builder() -> Builder {
Builder::default()
}
}

impl From<Builder> for Header {
fn from(value: Builder) -> Self {
Self {
Expand Down
9 changes: 7 additions & 2 deletions tansu-server/src/broker/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tansu_kafka_sans_io::{
Body, ErrorCode,
};
use tansu_storage::{Storage, Topition};
use tracing::debug;
use tracing::{debug, error};

#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct ProduceRequest<S> {
Expand Down Expand Up @@ -59,7 +59,12 @@ where

let tp = Topition::new(name, partition.index);

if let Ok(base_offset) = self.storage.produce(&tp, batch).await {
if let Ok(base_offset) = self
.storage
.produce(&tp, batch)
.await
.inspect_err(|err| error!(?err))
{
PartitionProduceResponse {
index: partition.index,
error_code: ErrorCode::None.into(),
Expand Down
117 changes: 85 additions & 32 deletions tansu-storage/src/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tansu_kafka_sans_io::{
delete_records_response::{DeleteRecordsPartitionResult, DeleteRecordsTopicResult},
describe_cluster_response::DescribeClusterBroker,
metadata_response::{MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic},
record::{deflated, inflated, Record},
record::{deflated, inflated, Header, Record},
to_system_time, to_timestamp, ErrorCode,
};
use tokio_postgres::{error::SqlState, Config, NoTls};
Expand Down Expand Up @@ -476,7 +476,7 @@ impl Storage for Postgres {

let tx = c.transaction().await?;

let prepared = tx
let insert_record = tx
.prepare(concat!(
"insert into record",
" (topic, partition, producer_id, sequence, timestamp, k, v)",
Expand All @@ -486,31 +486,61 @@ impl Storage for Postgres {
" where topic.name = $1",
" returning id"
))
.await?;
.await
.inspect_err(|err| error!(?err))?;

let insert_header = tx
.prepare(concat!(
"insert into header",
" (record, k, v)",
" values",
" ($1, $2, $3)",
))
.await
.inspect_err(|err| error!(?err))?;

let inflated = inflated::Batch::try_from(deflated)?;
let mut offsets = vec![];

for record in inflated.records {
debug!(?record);

let topic = topition.topic();
let partition = topition.partition();
let key = record.key.as_deref();
let value = record.value.as_deref();

let row = tx
.query_one(
&prepared,
&insert_record,
&[
&topition.topic(),
&topition.partition(),
&topic,
&partition,
&inflated.producer_id,
&inflated.base_sequence,
&(to_system_time(inflated.base_timestamp + record.timestamp_delta)?),
&record.key.as_deref(),
&record.value.as_deref(),
&key,
&value,
],
)
.await?;
.await
.inspect_err(|err| error!(?err, ?topic, ?partition, ?key, ?value))?;

let offset: i64 = row.get(0);
debug!(?offset);

for header in record.headers {
let key = header.key.as_deref();
let value = header.value.as_deref();

_ = tx
.execute(&insert_header, &[&offset, &key, &value])
.await
.inspect_err(|err| {
error!(?err, ?topic, ?partition, ?offset, ?key, ?value);
});
}

offsets.push(offset);
}

Expand All @@ -529,7 +559,7 @@ impl Storage for Postgres {
debug!(?topition, ?offset);
let c = self.connection().await?;

let prepared = c
let select_batch = c
.prepare(concat!(
"with sized as (",
" select",
Expand All @@ -550,11 +580,17 @@ impl Storage for Postgres {
") select * from sized",
" where bytes < $5",
))
.await?;
.await
.inspect_err(|err| error!(?err))?;

let rows = c
let select_headers = c
.prepare(concat!("select k, v from header where record = $1"))
.await
.inspect_err(|err| error!(?err))?;

let records = c
.query(
&prepared,
&select_batch,
&[
&self.cluster,
&topition.topic(),
Expand All @@ -565,7 +601,9 @@ impl Storage for Postgres {
)
.await?;

let builder = if let Some(first) = rows.first() {
let mut bytes = 0;

let batch = if let Some(first) = records.first() {
let base_offset: i64 = first.try_get(0)?;
debug!(?base_offset);

Expand All @@ -574,16 +612,13 @@ impl Storage for Postgres {
.map_err(Error::from)
.and_then(|system_time| to_timestamp(system_time).map_err(Into::into))?;

let mut builder = inflated::Batch::builder()
let mut batch_builder = inflated::Batch::builder()
.base_offset(base_offset)
.base_timestamp(base_timestamp);

for record in rows.iter() {
let offset_delta = record
.try_get::<_, i64>(0)
.map_err(Error::from)
.map(|offset| offset - base_offset)
.and_then(|delta| i32::try_from(delta).map_err(Into::into))?;
for record in records.iter() {
let offset = record.try_get::<_, i64>(0)?;
let offset_delta = i32::try_from(offset - base_offset)?;

let timestamp_delta = first
.try_get::<_, SystemTime>(1)
Expand All @@ -602,27 +637,45 @@ impl Storage for Postgres {
.try_get::<_, Option<&[u8]>>(3)
.map(|o| o.map(Bytes::copy_from_slice))?;

let bytes = record.try_get::<_, i64>(4)?;
bytes += record.try_get::<_, i64>(4)?;

let mut record_builder = Record::builder()
.offset_delta(offset_delta)
.timestamp_delta(timestamp_delta)
.key(k.into())
.value(v.into());

for header in c.query(&select_headers, &[&offset]).await? {
let mut header_builder = Header::builder();

if let Some(k) = header.try_get::<_, Option<&[u8]>>(0)? {
bytes += i64::try_from(k.len())?;

header_builder = header_builder.key(k.to_vec());
}

if let Some(v) = header.try_get::<_, Option<&[u8]>>(1)? {
bytes += i64::try_from(v.len())?;

header_builder = header_builder.value(v.to_vec());
}

record_builder = record_builder.header(header_builder);
}

builder = builder.record(
Record::builder()
.offset_delta(offset_delta)
.timestamp_delta(timestamp_delta)
.key(k.into())
.value(v.into()),
);
batch_builder = batch_builder.record(record_builder);

if bytes > (min_bytes as i64) {
break;
}
}

builder
batch_builder
} else {
inflated::Batch::builder()
};

builder
batch
.build()
.and_then(TryInto::try_into)
.map_err(Into::into)
Expand Down Expand Up @@ -830,7 +883,7 @@ impl Storage for Postgres {
),
};

let prepared = c.prepare(query).await?;
let prepared = c.prepare(query).await.inspect_err(|err| error!(?err))?;

let list_offset = match offset_type {
ListOffsetRequest::Earliest | ListOffsetRequest::Latest => {
Expand Down

0 comments on commit 9d39400

Please sign in to comment.