Skip to content

Commit

Permalink
Maintain sorted operation indexes (#438)
Browse files Browse the repository at this point in the history
* Introduce sorted_index column to operations_v1 table

* Iterate over sorted operations in reduce task

* Update operation sorted index when reducing documents

* Order operations by their index when topological sorted in SQL queries

* Test for reduce task updating operation sorted index

* Remove unused import

* Make clippy happy

* Doc strings

* Add insert_operation_with_index method to store

* Operation sorted index starts from 0

* Make sorted_index an INT

* Include missed change in utils

* Add sorted_index to StorageOperation

* TaskInput is now an enum

* fmt

* Make insert_operation_with_index private

* Update doc strings

* Update CHANGELOG

* fmt

* Simplify doc-string

* Update doc-strings

---------

Co-authored-by: adz <x12@adz.garden>
  • Loading branch information
sandreae and adzialocha authored Jul 7, 2023
1 parent 11a884c commit 17a1096
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 148 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380)
- Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞
- Reverse lookup for pinned relations in dependency task [#434](https://github.com/p2panda/aquadoggo/pull/434)
- Persist and maintain index of operation's position in document [#438](https://github.com/p2panda/aquadoggo/pull/438)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions aquadoggo/migrations/20230523135826_alter-operations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- SPDX-License-Identifier: AGPL-3.0-or-later

ALTER TABLE operations_v1 ADD COLUMN sorted_index INT;
12 changes: 12 additions & 0 deletions aquadoggo/src/db/models/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ pub struct OperationRow {
/// The previous operations of this operation concatenated into string format with `_`
/// separator.
pub previous: Option<String>,

/// Index of this operation once topological sorting of the operation graph has been performed.
///
/// If this value is `None` we can assume the operation has not been processed yet and we are
/// waiting for the `reduce` task to complete materialization.
pub sorted_index: Option<i32>,
}

/// A struct representing a single operation field row as it is inserted in the database.
Expand Down Expand Up @@ -99,4 +105,10 @@ pub struct OperationFieldsJoinedRow {
/// This numeric value is a simple list index to represent multiple values within one operation
/// field.
pub list_index: Option<i32>,

/// Index of this operation once topological sorting of the operation graph has been performed.
///
/// If this value is `None` we can assume the operation has not been processed yet and we are
/// waiting for the `reduce` task to complete materialization.
pub sorted_index: Option<i32>,
}
15 changes: 15 additions & 0 deletions aquadoggo/src/db/models/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn parse_operation_rows(
let public_key = PublicKey::new(&first_row.public_key).unwrap();
let operation_id = first_row.operation_id.parse().unwrap();
let document_id = first_row.document_id.parse().unwrap();
let sorted_index = first_row.sorted_index;

let mut relation_lists: BTreeMap<String, Vec<DocumentId>> = BTreeMap::new();
let mut pinned_relation_lists: BTreeMap<String, Vec<DocumentViewId>> = BTreeMap::new();
Expand Down Expand Up @@ -188,6 +189,7 @@ pub fn parse_operation_rows(
previous: operation.previous(),
fields: operation.fields(),
public_key,
sorted_index,
};

Some(operation)
Expand Down Expand Up @@ -431,6 +433,7 @@ mod tests {
field_type: Some("int".to_string()),
value: Some("28".to_string()),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -449,6 +452,7 @@ mod tests {
field_type: Some("float".to_string()),
value: Some("3.5".to_string()),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -467,6 +471,7 @@ mod tests {
field_type: Some("bool".to_string()),
value: Some("false".to_string()),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -488,6 +493,7 @@ mod tests {
.to_string(),
),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -509,6 +515,7 @@ mod tests {
.to_string(),
),
list_index: Some(1),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -530,6 +537,7 @@ mod tests {
.to_string(),
),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -551,6 +559,7 @@ mod tests {
.to_string(),
),
list_index: Some(1),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -572,6 +581,7 @@ mod tests {
.to_string(),
),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -593,6 +603,7 @@ mod tests {
.to_string(),
),
list_index: Some(1),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -614,6 +625,7 @@ mod tests {
.to_string(),
),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -635,6 +647,7 @@ mod tests {
.to_string(),
),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -653,6 +666,7 @@ mod tests {
field_type: Some("str".to_string()),
value: Some("bubu".to_string()),
list_index: Some(0),
sorted_index: None,
},
OperationFieldsJoinedRow {
public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96"
Expand All @@ -671,6 +685,7 @@ mod tests {
field_type: Some("pinned_relation_list".to_string()),
value: None,
list_index: Some(0),
sorted_index: None,
},
];

Expand Down
Loading

0 comments on commit 17a1096

Please sign in to comment.