Skip to content

Commit

Permalink
Integrate replication manager with networking stack (#387)
Browse files Browse the repository at this point in the history
* Use SyncMessage in replication network behaviour

* Use target set in sync request

* Convert integer to Mode

* Add replication to main behaviour struct

* Add SyncManager to replication behaviour

* Add schema provider to behaviour

* Move mananger again out of network behaviour, add replication service

* Introduce event loop to handle swarm and channel events

* Add new service message types to enum

* Better method name and structure for event loop

* Send and receive service messages on new or closed connections and replication messages

* Have peer id on network config struct

* Introduce connection manager in replication service

* Prepare methods for finished or failing sessions

* Add and remove peers in connection manager

* Count failed and successful sessions

* Initiate replication with peers

* Add some basic logging

* Do not override with default when building config in cli

* Fix checking only for certain messages in async loop

* Clippy happy, developer happy

* Make Domain error in IngestError transparent

* Add logging for replication entry exchange

* Sort system schema to the front of TargetSet

* Refactor log height diff logic

* Don't diff over schema sub-range of target set

* Introduce DuplicateSessionRequestError

* More logging and use new error type

* Logging for dropping and re-initiating duplicate session requests

* Log when re-initiating session with peer

* Fix issue when calculating local log heights

* More logging in manager

* Improve logging message

* Fix diff test

* Correct expect error message

* Ignore duplicate inbound sync requests

* Add messaging diagram to lifetime test

* Logging in behaviour

* Remove re-initiating dropped duplicate sessions if they had a different target set

* Diagram for sync lifetime test

* Test for concurrent sync request handling

* Remove duplicate diagram

* Make random target set include more

* Small logging and improved comments

* Elegantly handle concurrent session requests with duplicate target set

* Correct validation of TargetSet

* Better naming in TargetSet fixture

* Update tests

* Order log heights in Have message

* Implement Human on Message and SyncMessage

* Some work on logging

* Fix remote log height logging

* fmt

* Remove all sessions for a peer on replication error

* Add error logging to handler

* Add ConnectionId to peer identifier in replication service

* Doc string for PeerConnectionIdentifier

* Add comment to PeerConnectionId defaults

* Add (very) basic replication scheduler

* Refactor replication behaviour event triggering

* Temp fix for UNIQUE

* Send SyncMessages to one handler by ConnectionId

* Maintain list of peers and all their connections on ConnectionManager

* Remove connection from ConnectionManager when swarm issues ConnectionClosed event

* Refactor ConnectionEstablished messaging in replication behaviour

* Improve error handling and logging

* Update api in behaviour network tests

* Error logging in replication connection handler

* Cargo clippy

* fmt

* More tests for TargetSet validation

* Only identify peers by their PeerId (not ConnectionId) in replication logic

* Rename ConnectionEstablished to PeerConnected etc..

* Poll ticking stream for scheduling replication

* Dynamically retrieve target set when starting replication

* Add some more doc strings

* Fix formatting

* Fix missing peer id in e2e test

* Remove unnecessary type casting in entry SQL

* Give error logging more context

* Fix SQL query by making seq_num IN values a string

* Try different string literal

* Use IntervalStream from tokio for scheduler

* Add doc strings

* Fix filtering active sessions logic

* Update comments

* Remove repeating debug log

* Re-initiate dropped session if its concerning a different target set

* Allow max 3 sessions per peer and max one for the same target set

* Update test and fix bug in re-initiating session logic

* Correct diagram

* Inform connection handler about replication errors, introduce timeout

* Close all connection handlers on critical errors

* Fix import style

* Fix import style

* Remove no longer relevant log message

* Stop dialing peer after one address dialed successfully

* Only accept one inbound and one outbound connection per peer

* fmt x clippy

* Use libp2p from git main

* Add network info logging on incoming connection errors

* Revert

* Make clippy happy

* Do never actively close connections

* Remove dead code

* Check more often when using ping and mDNS discovery

* Close replication session on all errors

* Better error logging

* Fix issue where outbound streams could not be re-established after error

* Add behaviour logic which always uses latest healthy connection

* Rename to peers behaviour

* Make clippy happy

* Add entry to CHANGELOG.md

* Use connection ids to identify peers

* Clean up logging a little bit

* A little bit less verbose logging

* Fix tests

* Add a test for connection manager

* Write some more doc-strings

* Add more docs

* Disconnect from all peers before shutdown

* Dial peers by multiaddr on mdns discovery

* Rename Naive -> LogHeight strategy

* Naming improvement

* Doc strings

* fmt

---------

Co-authored-by: Sam Andreae <contact@samandreae.com>
  • Loading branch information
adzialocha and sandreae authored Jun 19, 2023
1 parent b304217 commit c45b96a
Show file tree
Hide file tree
Showing 40 changed files with 2,721 additions and 775 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363)
- Replication message de- / serialization [#375](https://github.com/p2panda/aquadoggo/pull/375)
- Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380)
- Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞

### Changed

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tokio = { version = "1.25.0", features = [
"sync",
"time",
] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tower-http = { version = "0.3.4", default-features = false, features = [
"cors",
] }
Expand Down
17 changes: 17 additions & 0 deletions aquadoggo/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use p2panda_rs::operation::OperationId;

use crate::manager::Sender;
use crate::network::Peer;
use crate::replication::SyncMessage;

/// Sender for cross-service communication bus.
pub type ServiceSender = Sender<ServiceMessage>;
Expand All @@ -12,4 +14,19 @@ pub type ServiceSender = Sender<ServiceMessage>;
pub enum ServiceMessage {
/// A new operation arrived at the node.
NewOperation(OperationId),

/// Node established a bi-directional connection to another node.
PeerConnected(Peer),

/// Node closed a connection to another node.
PeerDisconnected(Peer),

/// Node sent a message to remote node for replication.
SentReplicationMessage(Peer, SyncMessage),

/// Node received a message from remote node for replication.
ReceivedReplicationMessage(Peer, SyncMessage),

/// Replication protocol failed with an critical error.
ReplicationFailed(Peer),
}
23 changes: 23 additions & 0 deletions aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ impl Configuration {
}
};

// Derive peer id from key pair
// @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388
let key_pair = NetworkConfiguration::load_or_generate_key_pair(config.base_path.clone())?;
config.network.set_peer_id(&key_pair.public());

Ok(config)
}
}

#[cfg(test)]
impl Configuration {
/// Returns a new configuration object for a node which stores all data temporarily in memory.
pub fn new_ephemeral() -> Self {
let mut config = Configuration {
database_url: Some("sqlite::memory:".to_string()),
..Default::default()
};

// Generate a random key pair and just keep it in memory
// @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388
let key_pair: libp2p::identity::Keypair = crate::network::identity::Identity::new();
config.network.set_peer_id(&key_pair.public());

config
}
}
1 change: 1 addition & 0 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ async fn insert_document_view(
)
VALUES
($1, $2, $3)
ON CONFLICT(document_view_id) DO NOTHING -- @TODO: temp fix for double document view insertions: https://github.com/p2panda/aquadoggo/issues/398
",
)
.bind(document_view.id().to_string())
Expand Down
10 changes: 6 additions & 4 deletions aquadoggo/src/db/stores/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ impl EntryStore for SqlStore {
log_id: &LogId,
initial_seq_num: &SeqNum,
) -> Result<Vec<StorageEntry>, EntryStorageError> {
// Formatting query string in this way as `sqlx` currently doesn't support binding list
// arguments for IN queries.
let cert_pool_seq_nums = get_lipmaa_links_back_to(initial_seq_num.as_u64(), 1)
.iter()
.map(|seq_num| seq_num.to_string())
.map(|seq_num| format!("'{seq_num}'"))
.collect::<Vec<String>>()
.join(",");

// Formatting query string in this way as `sqlx` currently
// doesn't support binding list arguments for IN queries.
let sql_str = format!(
"SELECT
public_key,
Expand All @@ -299,7 +299,7 @@ impl EntryStore for SqlStore {
WHERE
public_key = $1
AND log_id = $2
AND CAST(seq_num AS NUMERIC) IN ({})
AND seq_num IN ({})
ORDER BY
CAST(seq_num AS NUMERIC) DESC
",
Expand Down Expand Up @@ -337,6 +337,8 @@ impl SqlStore {
logs.schema = $1
GROUP BY
entries.public_key, entries.log_id
ORDER BY
entries.public_key, CAST(entries.log_id AS NUMERIC)
",
)
.bind(schema_id.to_string())
Expand Down
3 changes: 2 additions & 1 deletion aquadoggo/src/graphql/scalars/document_view_id_scalar.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::{fmt::Display, str::FromStr};
use std::fmt::Display;
use std::str::FromStr;

use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value};
use p2panda_rs::document::DocumentViewId;
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl GraphQLSchemaManager {
let shared = self.shared.clone();
let schemas = self.schemas.clone();

info!("Subscribing GraphQL manager to schema provider");
debug!("Subscribing GraphQL manager to schema provider");
let mut on_schema_added = shared.schema_provider.on_schema_added();

// Create the new GraphQL based on the current state of known p2panda application schemas
Expand Down
46 changes: 24 additions & 22 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,30 @@ pub async fn materializer_service(

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
}
}
}
Expand Down
49 changes: 45 additions & 4 deletions aquadoggo/src/network/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::time::Duration;

use anyhow::Result;
use libp2p::identity::Keypair;
use libp2p::swarm::behaviour::toggle::Toggle;
Expand All @@ -8,9 +10,31 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous
use log::debug;

use crate::network::config::NODE_NAMESPACE;
use crate::network::peers;
use crate::network::NetworkConfiguration;

/// How often do we broadcast mDNS queries into the network.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5);

/// How often do we ping other peers to check for a healthy connection.
const PING_INTERVAL: Duration = Duration::from_secs(5);

/// How long do we wait for an answer from the other peer before we consider the connection as
/// stale.
const PING_TIMEOUT: Duration = Duration::from_secs(3);

/// Network behaviour for the aquadoggo node.
///
/// In libp2p all different behaviours are "merged" into one "main behaviour" with help of the
/// `NetworkBehaviour` derive macro.
///
/// All behaviours share the same connections with each other. Together they form something we
/// could call our "custom" networking behaviour.
///
/// It is possible for a peer to not support all behaviours, internally libp2p negotiates the
/// capabilities of each peer for us and upgrades the protocol accordingly. For example two peers
/// can handle p2panda messages with each others (using the `peers` behaviour) but do not
/// necessarily need to be able to support the `relay` behaviour.
#[derive(NetworkBehaviour)]
pub struct Behaviour {
/// Determine NAT status by requesting remote peers to dial the public address of the
Expand Down Expand Up @@ -42,9 +66,12 @@ pub struct Behaviour {
/// Register with a rendezvous server and query remote peer addresses.
pub rendezvous_client: Toggle<rendezvous::client::Behaviour>,

/// Serve as a rendezvous point for remote peers to register their external addresses
/// and query the addresses of other peers.
/// Serve as a rendezvous point for remote peers to register their external addresses and query
/// the addresses of other peers.
pub rendezvous_server: Toggle<rendezvous::server::Behaviour>,

/// Register peer connections and handle p2panda messaging with them.
pub peers: peers::Behaviour,
}

impl Behaviour {
Expand Down Expand Up @@ -86,15 +113,25 @@ impl Behaviour {
// Create an mDNS behaviour with default configuration if the mDNS flag is set
let mdns = if network_config.mdns {
debug!("mDNS network behaviour enabled");
Some(mdns::Behaviour::new(Default::default(), peer_id)?)
Some(mdns::Behaviour::new(
mdns::Config {
query_interval: MDNS_QUERY_INTERVAL,
..mdns::Config::default()
},
peer_id,
)?)
} else {
None
};

// Create a ping behaviour with default configuration if the ping flag is set
let ping = if network_config.ping {
debug!("Ping network behaviour enabled");
Some(ping::Behaviour::default())
Some(ping::Behaviour::new(
ping::Config::new()
.with_interval(PING_INTERVAL)
.with_timeout(PING_TIMEOUT),
))
} else {
None
};
Expand Down Expand Up @@ -132,6 +169,9 @@ impl Behaviour {
None
};

// Create behaviour to manage peer connections and handle p2panda messaging
let peers = peers::Behaviour::new();

Ok(Self {
autonat: autonat.into(),
identify: identify.into(),
Expand All @@ -142,6 +182,7 @@ impl Behaviour {
rendezvous_server: rendezvous_server.into(),
relay_client: relay_client.into(),
relay_server: relay_server.into(),
peers,
})
}
}
19 changes: 14 additions & 5 deletions aquadoggo/src/network/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use anyhow::Result;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::identity::Keypair;
use libp2p::identity::{Keypair, PublicKey};
use libp2p::{Multiaddr, PeerId};
use log::info;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -72,8 +72,8 @@ pub struct NetworkConfiguration {

/// Ping behaviour enabled.
///
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings.
/// Every sent ping must yield a response within 20 seconds in order to be successful.
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings. Every
/// sent ping must yield a response within 20 seconds in order to be successful.
pub ping: bool,

/// QUIC transport port.
Expand Down Expand Up @@ -103,6 +103,9 @@ pub struct NetworkConfiguration {
///
/// Serve as a rendezvous point for peer discovery, allowing peer registration and queries.
pub rendezvous_server_enabled: bool,

/// Our local peer id.
pub peer_id: Option<PeerId>,
}

impl Default for NetworkConfiguration {
Expand All @@ -127,11 +130,17 @@ impl Default for NetworkConfiguration {
rendezvous_address: None,
rendezvous_peer_id: None,
rendezvous_server_enabled: false,
peer_id: None,
}
}
}

impl NetworkConfiguration {
/// Derive peer id from a given public key.
pub fn set_peer_id(&mut self, public_key: &PublicKey) {
self.peer_id = Some(PeerId::from_public_key(public_key));
}

/// Define the connection limits of the swarm.
pub fn connection_limits(&self) -> ConnectionLimits {
ConnectionLimits::default()
Expand All @@ -144,8 +153,8 @@ impl NetworkConfiguration {

/// Load the key pair from the file at the specified path.
///
/// If the file does not exist, a random key pair is generated and saved.
/// If no path is specified, a random key pair is generated.
/// If the file does not exist, a random key pair is generated and saved. If no path is
/// specified, a random key pair is generated.
pub fn load_or_generate_key_pair(path: Option<PathBuf>) -> Result<Keypair> {
let key_pair = match path {
Some(mut path) => {
Expand Down
Loading

0 comments on commit c45b96a

Please sign in to comment.