Skip to content

Commit

Permalink
fix: deserialize to vec for catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi committed Oct 3, 2024
1 parent 37b079b commit 4837535
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 71 additions & 28 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Catalog {
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
#[serde_as(as = "DatabasesAsArray")]
databases: HashMap<DbId, Arc<DatabaseSchema>>,
sequence: SequenceNumber,
/// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`)
Expand All @@ -367,6 +367,48 @@ pub struct InnerCatalog {
updated: bool,
}

serde_with::serde_conv!(
DatabasesAsArray,
HashMap<DbId, Arc<DatabaseSchema>>,
|map: &HashMap<DbId, Arc<DatabaseSchema>>| {
map.values().fold(Vec::new(), |mut acc, db| {
acc.push(DatabasesSerialized {
id: db.id,
name: Arc::clone(&db.name),
tables: db.tables.values().cloned().collect(),
});
acc
})
},
|vec: Vec<DatabasesSerialized>| -> Result<_, String> {
vec.into_iter().fold(Ok(HashMap::new()), |acc, db| {
let mut acc = acc?;
if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema {
id: db.id,
name: Arc::clone(&db.name),
tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| {
let mut acc = acc?;
let table_name = Arc::clone(&table.table_name);
if let Some(_) = acc.insert(table.table_id, table) {
return Err(format!("found duplicate table: {}", table_name));
}
Ok(acc)
})?
})) {
return Err(format!("found duplicate db: {}", db.name));
}
Ok(acc)
})
}
);

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
struct DatabasesSerialized {
pub id: DbId,
pub name: Arc<str>,
pub tables: Vec<TableDefinition>,
}

impl InnerCatalog {
pub(crate) fn new(host_id: Arc<str>, instance_id: Arc<str>) -> Self {
Self {
Expand Down Expand Up @@ -443,7 +485,6 @@ pub struct DatabaseSchema {
pub id: DbId,
pub name: Arc<str>,
/// The database is a map of tables
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
pub tables: BTreeMap<TableId, TableDefinition>,
}

Expand Down Expand Up @@ -939,55 +980,57 @@ mod tests {
// Duplicate databases
{
let json = r#"{
"databases": {
"0": {
"id": "0",
"databases": [
{
"id": 0,
"name": "db1",
"tables": {}
"tables": []
},
"0": {
"id": "0",
{
"id": 0,
"name": "db1",
"tables": {}
"tables": []
}
}
]
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
assert_contains!(err.to_string(), "found duplicate key");
assert_contains!(err.to_string(), "found duplicate db: db1");
}
// Duplicate tables
{
let json = r#"{
"databases": {
"0": {
"databases": [
{
"id": 0,
"name": "db1",
"tables": {
"0": {
"table_id": "0",
"tables": [
{
"table_id": 0,
"table_name": "tbl1",
"cols": {}
},
"0": {
"table_id": "0",
{
"table_id": 0,
"table_name": "tbl1",
"cols": {}
}
}
]
}
}
]
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
assert_contains!(err.to_string(), "found duplicate key");
assert_contains!(err.to_string(), "found duplicate table: tbl1");
}
// Duplicate columns
{
let json = r#"{
"databases": {
"0": {
"databases": [
{
"id": 0,
"name": "db1",
"tables": {
"0": {
"table_id": "0",
"tables": [
{
"table_id": 0,
"table_name": "tbl1",
"cols": {
"col1": {
Expand All @@ -1002,9 +1045,9 @@ mod tests {
}
}
}
}
]
}
}
]
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
assert_contains!(err.to_string(), "found duplicate key");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ source: influxdb3_catalog/src/catalog.rs
expression: catalog
---
{
"databases": {
"0": {
"id": "0",
"databases": [
{
"id": 0,
"name": "test_db",
"tables": {
"1": {
"table_id": "1",
"tables": [
{
"table_id": 1,
"table_name": "test_table_1",
"cols": {
"bool_field": {
Expand Down Expand Up @@ -79,8 +79,8 @@ expression: catalog
}
}
},
"2": {
"table_id": "2",
{
"table_id": 2,
"table_name": "test_table_2",
"cols": {
"bool_field": {
Expand Down Expand Up @@ -150,9 +150,9 @@ expression: catalog
}
}
}
}
]
}
},
],
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ source: influxdb3_catalog/src/catalog.rs
expression: catalog
---
{
"databases": {
"0": {
"id": "0",
"databases": [
{
"id": 0,
"name": "test_db",
"tables": {
"0": {
"table_id": "0",
"tables": [
{
"table_id": 0,
"table_name": "test",
"cols": {
"field": {
Expand Down Expand Up @@ -60,7 +60,7 @@ expression: catalog
},
"last_caches": [
{
"table_id": "0",
"table_id": 0,
"table": "test",
"name": "test_table_last_cache",
"keys": [
Expand All @@ -75,9 +75,9 @@ expression: catalog
}
]
}
}
]
}
},
],
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ source: influxdb3_catalog/src/catalog.rs
expression: catalog
---
{
"databases": {
"0": {
"id": "0",
"databases": [
{
"id": 0,
"name": "test_db",
"tables": {
"1": {
"table_id": "1",
"tables": [
{
"table_id": 1,
"table_name": "test_table_1",
"key": [
"tag_1",
Expand Down Expand Up @@ -64,9 +64,9 @@ expression: catalog
}
}
}
}
]
}
},
],
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id"
Expand Down
1 change: 0 additions & 1 deletion influxdb3_id/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ license.workspace = true

[dependencies]
serde.workspace = true
serde_with.workspace = true

[lints]
workspace = true
11 changes: 3 additions & 8 deletions influxdb3_id/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use serde_with::DisplayFromStr;
use std::fmt::Display;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;

#[serde_as]
#[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)]
pub struct DbId(#[serde_as(as = "DisplayFromStr")] u32);
pub struct DbId(u32);

static NEXT_DB_ID: AtomicU32 = AtomicU32::new(0);

Expand Down Expand Up @@ -47,9 +44,8 @@ impl Display for DbId {
}
}

#[serde_as]
#[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)]
pub struct TableId(#[serde_as(as = "DisplayFromStr")] u32);
pub struct TableId(u32);

static NEXT_TABLE_ID: AtomicU32 = AtomicU32::new(0);

Expand Down Expand Up @@ -89,9 +85,8 @@ impl Display for TableId {
}
}

#[serde_as]
#[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)]
pub struct ColumnId(#[serde_as(as = "DisplayFromStr")] u16);
pub struct ColumnId(u16);

impl ColumnId {
pub fn new(id: u16) -> Self {
Expand Down
1 change: 1 addition & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object_store.workspace = true
parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tokio.workspace = true

Expand Down
Loading

0 comments on commit 4837535

Please sign in to comment.