Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consensus Integration #1467

Merged
merged 23 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ jobs:
isRust: ${{ steps.diff.outputs.isRust }}
steps:
- uses: actions/checkout@v2
- name: Configure Git
env:
TOKEN: ${{ secrets.NARWHAL_ACCESS_TOKEN }}
run: git config --global url."https://${TOKEN}:x-oauth-basic@github.com/".insteadOf "https://github.com/"
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
- name: Detect Changes
uses: dorny/paths-filter@v2.10.2
id: diff
Expand Down
3 changes: 3 additions & 0 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ move-bytecode-verifier = { git = "https://github.com/move-language/move", rev =
move-binary-format = { git = "https://github.com/move-language/move", rev = "2e7c37edada44436805e047dd26724a26c07635a" }
move-bytecode-utils = { git = "https://github.com/move-language/move", rev = "2e7c37edada44436805e047dd26724a26c07635a" }
move-unit-test = { git = "https://github.com/move-language/move", rev = "2e7c37edada44436805e047dd26724a26c07635a" }
narwhal-node = { git = "https://github.com/MystenLabs/narwhal", rev = "3b8545531175241a803945af254ce78f3b1b3a6d", package = "node" }
narwhal-config = { git = "https://github.com/MystenLabs/narwhal", rev = "3b8545531175241a803945af254ce78f3b1b3a6d", package = "config" }
narwhal-crypto = { git = "https://github.com/MystenLabs/narwhal", rev = "3b8545531175241a803945af254ce78f3b1b3a6d", package = "crypto" }

once_cell = "1.9.0"
reqwest = { version = "0.11.10", features=["json","serde_json", "blocking"]}
Expand Down
8 changes: 7 additions & 1 deletion sui/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ pub async fn spawn_authority_server(
network_server: NetworkServer,
state: AuthorityState,
) -> transport::SpawnedServer<AuthorityServer> {
// The following two fields are only needed for shared objects (not by this bench).
let consensus_address = "127.0.0.1:0".parse().unwrap();
let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1);

let server = AuthorityServer::new(
network_server.base_address,
network_server.base_port,
network_server.buffer_size,
state,
Arc::new(state),
consensus_address,
tx_consensus_listener,
);
server.spawn().await.unwrap()
}
Expand Down
90 changes: 78 additions & 12 deletions sui/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,36 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::gateway::GatewayType;
use crate::keystore::KeystoreType;
use narwhal_config::Committee as ConsensusCommittee;
use narwhal_config::{Authority, PrimaryAddresses, Stake, WorkerAddresses};
use narwhal_crypto::ed25519::Ed25519PublicKey;
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::hex::Hex;
use serde_with::serde_as;
use std::fmt::Write;
use std::fmt::{Display, Formatter};
use std::fs::{self, File};
use std::io::BufReader;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::Mutex;

use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::hex::Hex;
use serde_with::serde_as;
use tracing::log::trace;

use sui_framework::DEFAULT_FRAMEWORK_PATH;
use sui_network::network::PortAllocator;
use sui_types::base_types::*;
use sui_types::crypto::{get_key_pair, KeyPair};

use crate::gateway::GatewayType;
use crate::keystore::KeystoreType;
use tracing::log::trace;

const DEFAULT_WEIGHT: usize = 1;
const DEFAULT_GAS_AMOUNT: u64 = 100000;
pub const AUTHORITIES_DB_NAME: &str = "authorities_db";
pub const DEFAULT_STARTING_PORT: u16 = 10000;
pub const CONSENSUS_DB_NAME: &str = "consensus_db";

static PORT_ALLOCATOR: Lazy<Mutex<PortAllocator>> =
Lazy::new(|| Mutex::new(PortAllocator::new(DEFAULT_STARTING_PORT)));
Expand All @@ -49,6 +51,7 @@ pub struct AuthorityPrivateInfo {
pub port: u16,
pub db_path: PathBuf,
pub stake: usize,
pub consensus_address: SocketAddr,
}

// Custom deserializer with optional default fields
Expand Down Expand Up @@ -91,13 +94,24 @@ impl<'de> Deserialize<'de> for AuthorityPrivateInfo {
} else {
DEFAULT_WEIGHT
};
let consensus_address = if let Some(val) = json.get("consensus_address") {
SocketAddr::deserialize(val).map_err(serde::de::Error::custom)?
} else {
let port = PORT_ALLOCATOR
.lock()
.map_err(serde::de::Error::custom)?
.next_port()
.ok_or_else(|| serde::de::Error::custom("No available port."))?;
format!("127.0.0.1:{port}").parse().unwrap()
};

Ok(AuthorityPrivateInfo {
key_pair,
host,
port,
db_path,
stake,
consensus_address,
})
}
}
Expand Down Expand Up @@ -302,3 +316,55 @@ impl<C> DerefMut for PersistedConfig<C> {
&mut self.inner
}
}

/// Make a default Narwhal-compatible committee.
pub fn make_default_narwhal_committee(
authorities: &[AuthorityPrivateInfo],
) -> Result<ConsensusCommittee<Ed25519PublicKey>, anyhow::Error> {
let mut ports = Vec::new();
for _ in authorities {
let mut authority_ports = Vec::new();
for _ in 0..4 {
let port = PORT_ALLOCATOR
.lock()
.map_err(|e| anyhow::anyhow!("{e}"))?
.next_port()
.ok_or_else(|| anyhow::anyhow!("No available ports"))?;
authority_ports.push(port);
}
ports.push(authority_ports);
}

Ok(ConsensusCommittee {
authorities: authorities
.iter()
.enumerate()
.map(|(i, x)| {
let name = x.key_pair.make_narwhal_keypair().name;

let primary = PrimaryAddresses {
primary_to_primary: format!("127.0.0.1:{}", ports[i][0]).parse().unwrap(),
worker_to_primary: format!("127.0.0.1:{}", ports[i][1]).parse().unwrap(),
};
let workers = [(
/* worker_id */ 0,
WorkerAddresses {
primary_to_worker: format!("127.0.0.1:{}", ports[i][2]).parse().unwrap(),
transactions: x.consensus_address,
worker_to_worker: format!("127.0.0.1:{}", ports[i][3]).parse().unwrap(),
},
)]
.iter()
.cloned()
.collect();

let authority = Authority {
stake: x.stake as Stake,
primary,
workers,
};
(name, authority)
})
.collect(),
})
}
124 changes: 103 additions & 21 deletions sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use crate::config::{make_default_narwhal_committee, CONSENSUS_DB_NAME};
use crate::config::{
AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, PersistedConfig, WalletConfig,
};
use crate::gateway::{GatewayConfig, GatewayType};
use crate::keystore::{Keystore, KeystoreType, SuiKeystore};
use crate::{sui_config_dir, SUI_GATEWAY_CONFIG, SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG};
use anyhow::{anyhow, bail};
use clap::*;
use futures::future::join_all;
use move_binary_format::CompiledModule;
use move_package::BuildConfig;
use tracing::{error, info};

use narwhal_config::{Committee as ConsensusCommittee, Parameters as ConsensusParameters};
use narwhal_crypto::ed25519::Ed25519PublicKey;
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use sui_adapter::adapter::generate_package_id;
use sui_adapter::genesis;
use sui_core::authority::{AuthorityState, AuthorityStore};
use sui_core::authority_server::AuthorityServer;
use sui_core::consensus_adapter::ConsensusListener;
use sui_network::transport::SpawnedServer;
use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE;
use sui_types::base_types::decode_bytes_hex;
use sui_types::base_types::encode_bytes_hex;
use sui_types::base_types::{SequenceNumber, SuiAddress, TxContext};
use sui_types::committee::Committee;
use sui_types::error::SuiResult;
use sui_types::object::Object;

use crate::config::{
AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, PersistedConfig, WalletConfig,
};
use crate::gateway::{GatewayConfig, GatewayType};
use crate::keystore::{Keystore, KeystoreType, SuiKeystore};
use crate::{sui_config_dir, SUI_GATEWAY_CONFIG, SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG};
use tokio::sync::mpsc::channel;
use tracing::{error, info};

#[derive(Parser)]
#[clap(rename_all = "kebab-case")]
Expand Down Expand Up @@ -63,6 +66,7 @@ impl SuiCommand {
pub async fn execute(&self) -> Result<(), anyhow::Error> {
match self {
SuiCommand::Start { config } => {
// Load the config of the Sui authority.
let config_path = config
.clone()
.unwrap_or(sui_config_dir()?.join(SUI_NETWORK_CONFIG));
Expand All @@ -72,6 +76,8 @@ impl SuiCommand {
config_path
))
})?;

// Start a sui validator (including its consensus node).
SuiNetwork::start(&config)
.await?
.wait_for_completion()
Expand Down Expand Up @@ -223,9 +229,24 @@ impl SuiNetwork {
.collect(),
);

let consensus_committee = make_default_narwhal_committee(&config.authorities)?;
let consensus_parameters = ConsensusParameters::default();

let mut spawned_authorities = Vec::new();
for authority in &config.authorities {
let server = make_server(authority, &committee, config.buffer_size).await?;
let consensus_store_path = sui_config_dir()?
.join(CONSENSUS_DB_NAME)
.join(encode_bytes_hex(authority.key_pair.public_key_bytes()));

let server = make_server(
authority,
&committee,
config.buffer_size,
&consensus_committee,
&consensus_store_path,
&consensus_parameters,
)
.await?;
spawned_authorities.push(server.spawn().await?);
}
info!("Started {} authorities", spawned_authorities.len());
Expand Down Expand Up @@ -271,7 +292,6 @@ pub async fn genesis(
loaded_move_packages: vec![],
};
let mut voting_right = BTreeMap::new();

for authority in genesis_conf.authorities {
voting_right.insert(*authority.key_pair.public_key_bytes(), authority.stake);
network_config.authorities.push(authority);
Expand Down Expand Up @@ -366,6 +386,9 @@ pub async fn make_server(
authority: &AuthorityPrivateInfo,
committee: &Committee,
buffer_size: usize,
consensus_committee: &ConsensusCommittee<Ed25519PublicKey>,
consensus_store_path: &PathBuf,
asonnino marked this conversation as resolved.
Show resolved Hide resolved
consensus_parameters: &ConsensusParameters,
) -> SuiResult<AuthorityServer> {
let store = Arc::new(AuthorityStore::open(&authority.db_path, None));
let name = *authority.key_pair.public_key_bytes();
Expand All @@ -376,12 +399,16 @@ pub async fn make_server(
store,
)
.await;
Ok(AuthorityServer::new(
authority.host.clone(),
authority.port,

make_authority(
authority,
buffer_size,
state,
))
consensus_committee,
consensus_store_path,
consensus_parameters,
)
.await
}

async fn make_server_with_genesis_ctx(
Expand Down Expand Up @@ -409,10 +436,65 @@ async fn make_server_with_genesis_ctx(
state.insert_genesis_object(object.clone()).await;
}

let (tx_sui_to_consensus, _rx_sui_to_consensus) = channel(1);
Ok(AuthorityServer::new(
authority.host.clone(),
authority.port,
buffer_size,
state,
Arc::new(state),
authority.consensus_address,
/* tx_consensus_listener */ tx_sui_to_consensus,
))
}

/// Spawn all the subsystems run by a Sui authority: a consensus node, a sui authority server,
/// and a consensus listener bridging the consensus node and the sui authority.
async fn make_authority(
authority: &AuthorityPrivateInfo,
buffer_size: usize,
state: AuthorityState,
consensus_committee: &ConsensusCommittee<Ed25519PublicKey>,
consensus_store_path: &PathBuf,
asonnino marked this conversation as resolved.
Show resolved Hide resolved
consensus_parameters: &ConsensusParameters,
) -> SuiResult<AuthorityServer> {
let (tx_consensus_to_sui, rx_consensus_to_sui) = channel(1_000);
let (tx_sui_to_consensus, rx_sui_to_consensus) = channel(1_000);

let authority_state = Arc::new(state);

// Spawn the consensus node of this authority.
let consensus_keypair = authority.key_pair.make_narwhal_keypair();
let consensus_name = consensus_keypair.name.clone();
let consensus_store = narwhal_node::NodeStorage::reopen(consensus_store_path);
narwhal_node::Node::spawn_primary(
consensus_keypair,
consensus_committee.clone(),
&consensus_store,
consensus_parameters.clone(),
/* consensus */ true, // Indicate that we want to run consensus.
/* execution_state */ authority_state.clone(),
/* tx_confirmation */ tx_consensus_to_sui,
)
.await?;
narwhal_node::Node::spawn_workers(
consensus_name,
/* ids */ vec![0], // We run a single worker with id '0'.
consensus_committee.clone(),
&consensus_store,
consensus_parameters.clone(),
);

// Spawn a consensus listener. It listen for consensus outputs and notifies the
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);

// Return new authority server. It listen to users transactions and send back replies.
Ok(AuthorityServer::new(
authority.host.clone(),
authority.port,
buffer_size,
authority_state,
authority.consensus_address,
/* tx_consensus_listener */ tx_sui_to_consensus,
))
}
1 change: 1 addition & 0 deletions sui/src/unit_tests/sui_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub async fn start_test_network(
port: 0,
db_path: info.db_path.clone(),
stake: info.stake,
consensus_address: info.consensus_address,
})
.collect();
genesis_config.authorities = authorities;
Expand Down
Loading