Skip to content

Commit

Permalink
refactor: use row writers for otlp
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Sep 22, 2023
1 parent d9a5a33 commit 9010120
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 53 deletions.
121 changes: 68 additions & 53 deletions src/servers/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn to_grpc_insert_requests(
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, &metric, resource_attrs, scope_attrs)?;
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
Expand Down Expand Up @@ -101,10 +101,11 @@ fn write_attributes(
if let Some(attrs) = attrs {
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((attr.key.to_string(), s.to_string())),
any_value::Value::IntValue(v) => Some((attr.key.to_string(), v.to_string())),
any_value::Value::DoubleValue(v) => Some((attr.key.to_string(), v.to_string())),
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
Expand Down Expand Up @@ -174,7 +175,7 @@ fn encode_gauge(
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let mut table = table_writer.get_or_default_table_data(
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
Expand All @@ -183,15 +184,16 @@ fn encode_gauge(
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
&mut table,
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;

write_data_point_value(&mut table, &mut row, GREPTIME_VALUE, &data_point.value)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}

Ok(())
Expand All @@ -207,7 +209,7 @@ fn encode_sum(
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let mut table = table_writer.get_or_default_table_data(
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
Expand All @@ -216,14 +218,15 @@ fn encode_sum(
for data_point in &sum.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
&mut table,
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(&mut table, &mut row, GREPTIME_VALUE, &data_point.value)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}

Ok(())
Expand Down Expand Up @@ -262,9 +265,9 @@ fn encode_histogram(
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);

for data_point in &hist.data_points {
let mut bucket_row = bucket_table.alloc_one_row();
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
Expand Down Expand Up @@ -298,6 +301,8 @@ fn encode_histogram(
accumulated_count as f64,
&mut bucket_row,
)?;

bucket_table.add_row(bucket_row);
}

if let Some(sum) = data_point.sum {
Expand All @@ -312,6 +317,7 @@ fn encode_histogram(
)?;

row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}

let mut count_row = count_table.alloc_one_row();
Expand All @@ -330,6 +336,7 @@ fn encode_histogram(
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}

table_writer.add_table_data(bucket_table_name, bucket_table);
Expand All @@ -352,7 +359,7 @@ fn encode_summary(
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let mut table = table_writer.get_or_default_table_data(
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
Expand All @@ -378,12 +385,8 @@ fn encode_summary(
)?;
}

row_writer::write_f64(
&mut table,
GREPTIME_COUNT,
data_point.count as f64,
&mut row,
)?;
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}

Ok(())
Expand Down Expand Up @@ -419,6 +422,8 @@ mod tests {

#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();

let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
Expand All @@ -434,20 +439,21 @@ mod tests {
},
];
let gauge = Gauge { data_points };
let inserts = encode_gauge(
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();

assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 5);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand All @@ -463,6 +469,8 @@ mod tests {

#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();

let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
Expand All @@ -481,20 +489,21 @@ mod tests {
data_points,
..Default::default()
};
let inserts = encode_sum(
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();

assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 2);
assert_eq!(inserts.columns.len(), 5);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand All @@ -510,6 +519,8 @@ mod tests {

#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();

let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
Expand All @@ -528,20 +539,21 @@ mod tests {
..Default::default()
}];
let summary = Summary { data_points };
let inserts = encode_summary(
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();

assert_eq!(inserts.table_name, "datamon");
assert_eq!(inserts.row_count, 1);
assert_eq!(inserts.columns.len(), 7);
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
inserts
.columns
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand All @@ -559,6 +571,8 @@ mod tests {

#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();

let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
Expand All @@ -576,23 +590,24 @@ mod tests {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
let inserts = encode_histogram(
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();

assert_eq!(3, inserts.len());
assert_eq!(3, tables.num_tables());

// bucket table
assert_eq!(inserts[0].table_name, "histo_bucket");
assert_eq!(inserts[0].row_count, 5);
assert_eq!(inserts[0].columns.len(), 6);
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
inserts[0]
.columns
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand All @@ -606,12 +621,12 @@ mod tests {
]
);

assert_eq!(inserts[1].table_name, "histo_sum");
assert_eq!(inserts[1].row_count, 1);
assert_eq!(inserts[1].columns.len(), 5);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
inserts[1]
.columns
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand All @@ -624,12 +639,12 @@ mod tests {
]
);

assert_eq!(inserts[2].table_name, "histo_count");
assert_eq!(inserts[2].row_count, 1);
assert_eq!(inserts[2].columns.len(), 5);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
inserts[2]
.columns
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
Expand Down
10 changes: 10 additions & 0 deletions src/servers/src/row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ impl TableData {
self.rows.push(Row { values })
}

#[allow(dead_code)]
pub fn columns(&self) -> &Vec<ColumnSchema> {
&self.schema
}

pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
(self.schema, self.rows)
}
Expand Down Expand Up @@ -100,6 +105,11 @@ impl MultiTableData {
.insert(table_name.to_string(), table_data);
}

#[allow(dead_code)]
pub fn num_tables(&self) -> usize {
self.table_data_map.len()
}

/// Returns the request and number of rows in it.
pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
let mut total_rows = 0;
Expand Down

0 comments on commit 9010120

Please sign in to comment.