Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(validator-node): initial state sync implementation (partial) #3826

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
889 changes: 511 additions & 378 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ pub fn copy_config_file<S: AsRef<Path>>(
file: &str,
) -> Result<(), LauncherError> {
let path = Path::new("assets").join(file);
let config_path = resolve_path(config, package_info, &path, Some(BaseDirectory::Resource))?;
let config_path = resolve_path(
config,
package_info,
&Default::default(),
&path,
Some(BaseDirectory::Resource),
)?;
let cfg = std::fs::read_to_string(&config_path).expect("The config assets were not bundled with the App");
info!("Log Configuration file ({}) loaded", file);
debug!("{}", cfg);
Expand Down
14 changes: 7 additions & 7 deletions applications/launchpad/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ use tari_launchpad::{
commands::*,
docker::{DockerWrapper, Workspaces},
};
use tauri::{api::cli::get_matches, async_runtime::block_on, utils::config::CliConfig, Event, Manager};
use tauri::{api::cli::get_matches, async_runtime::block_on, utils::config::CliConfig, Manager, PackageInfo, RunEvent};

fn main() {
env_logger::init();
let context = tauri::generate_context!();
let cli_config = context.config().tauri.cli.clone().unwrap();

// We're going to attach this to the AppState because Tauri does not expose it for some reason
let package_info = context.package_info().clone();
// Handle --help and --version. Exits after printing
handle_cli_options(&cli_config);
handle_cli_options(&cli_config, &package_info);

let docker = match DockerWrapper::new() {
Ok(docker) => docker,
Expand All @@ -34,8 +36,6 @@ fn main() {

// TODO - Load workspace definitions from persistent storage here
let workspaces = Workspaces::default();
// We're going to attach this to the AppState because Tauri does not expose it for some reason
let package_info = context.package_info().clone();
info!("Using Docker version: {}", docker.version());

let app = tauri::Builder::default()
Expand All @@ -55,7 +55,7 @@ fn main() {
.expect("error while running Launchpad");

app.run(|app, event| {
if let Event::Exit = event {
if let RunEvent::Exit = event {
info!("Received Exit event");
block_on(async move {
let state = app.state();
Expand All @@ -65,8 +65,8 @@ fn main() {
});
}

fn handle_cli_options(cli_config: &CliConfig) {
match get_matches(cli_config) {
fn handle_cli_options(cli_config: &CliConfig, pkg_info: &PackageInfo) {
match get_matches(cli_config, pkg_info) {
Ok(matches) => {
if let Some(arg_data) = matches.args.get("help") {
debug!("{}", arg_data.value.as_str().unwrap_or("No help available"));
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let response = tari_rpc::ListAssetRegistrationsResponse {
asset_public_key: output
.features
.mint_non_fungible
.map(|mint| mint.asset_public_key.to_vec())
.asset
.map(|asset| asset.public_key.to_vec())
.unwrap_or_default(),
unique_id: output.features.unique_id.unwrap_or_default(),
owner_commitment: output.commitment.to_vec(),
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ async fn run_grpc(
.serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ()))
.await
.map_err(|err| {
error!(target: LOG_TARGET, "GRPC encountered an error:{}", err);
error!(target: LOG_TARGET, "GRPC encountered an error: {}", err);
err
})?;

Expand Down
3 changes: 1 addition & 2 deletions applications/tari_collectibles/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ license = "MIT"
repository = "https://github.com/tari-project/tari"
default-run = "tari_collectibles"
edition = "2018"
build = "src/build.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[build-dependencies]
tauri-build = { version = "1.0.0-beta.4" }
tauri-build = { version = "1.0.0-beta.4", features = [] }

[dependencies]
tari_app_grpc = { path = "../../tari_app_grpc" }
Expand Down
43 changes: 42 additions & 1 deletion applications/tari_validator_node/proto/dan/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,48 @@ message GetSidechainBlocksRequest {
bytes end_hash = 3;
}


message GetSidechainBlocksResponse {
tari.dan.common.SideChainBlock block = 1;
}

message GetSidechainStateRequest {
bytes asset_public_key = 1;
}

message GetSidechainStateResponse {
oneof state {
string schema = 1;
KeyValue key_value = 2;
}
}

message KeyValue {
bytes key = 1;
bytes value = 2;
}

message GetStateOpLogsRequest {
bytes asset_public_key = 1;
uint64 height = 2;
}

message GetStateOpLogsResponse {
repeated StateOpLog op_logs = 1;
}

message StateOpLog {
uint64 height = 1;
string operation = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps an enum here rather?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably, I'm using this "weirdly" as a single char enum - S = Set, D = Delete etc. but an enum is more standard :P

string schema = 3;
bytes key = 4;
bytes value = 5;
bytes merkle_root = 6;
}

message GetTipNodeRequest{
bytes asset_public_key = 1;
}

message GetTipNodeResponse {
tari.dan.common.Node tip_node = 1;
}
24 changes: 21 additions & 3 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{collections::HashMap, sync::Arc, time::Duration};

use log::info;
use log::*;
use tari_common::{
configuration::ValidatorNodeConfig,
exit_codes::{ExitCode, ExitError},
Expand Down Expand Up @@ -59,6 +59,7 @@ use crate::{
inbound_connection_service::TariCommsInboundConnectionService,
outbound_connection_service::TariCommsOutboundService,
},
TariCommsValidatorNodeClientFactory,
};

const LOG_TARGET: &str = "tari::validator_node::app";
Expand Down Expand Up @@ -108,16 +109,29 @@ impl DanNode {
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.unwrap();
info!(
target: LOG_TARGET,
"Base node returned {} asset(s) to process",
assets.len()
);
for asset in assets {
if tasks.contains_key(&asset.public_key) {
debug!(
target: LOG_TARGET,
"Asset task already running for asset '{}'", asset.public_key
);
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
debug!(
target: LOG_TARGET,
"Asset '{}' is not whitelisted for processing ", asset.public_key
);
continue;
}
}
info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key);
info!(target: LOG_TARGET, "Adding asset '{}'", asset.public_key);
let node_identity = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
Expand All @@ -128,7 +142,7 @@ impl DanNode {
tasks.insert(
asset.public_key.clone(),
task::spawn(DanNode::start_asset_worker(
asset.clone(),
asset,
node_identity,
mempool,
handles,
Expand Down Expand Up @@ -199,6 +213,9 @@ impl DanNode {
let chain_storage = SqliteStorageService {};
let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address);
let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client);
let connectivity = handles.expect_handle();
let validator_node_client_factory =
TariCommsValidatorNodeClientFactory::new(connectivity, dht.discovery_service_requester());
let mut consensus_worker = ConsensusWorker::<DefaultServiceSpecification>::new(
receiver,
outbound,
Expand All @@ -214,6 +231,7 @@ impl DanNode {
db_factory,
chain_storage,
checkpoint_manager,
validator_node_client_factory,
);

consensus_worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::{convert::TryInto, net::SocketAddr};

use async_trait::async_trait;
use log::*;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
Expand All @@ -32,6 +33,8 @@ use tari_dan_core::{
DigitalAssetError,
};

const LOG_TARGET: &str = "tari::validator_node::app";

#[derive(Clone)]
pub struct GrpcBaseNodeClient {
endpoint: SocketAddr,
Expand Down Expand Up @@ -112,18 +115,23 @@ impl BaseNodeClient for GrpcBaseNodeClient {
self.inner.as_mut().unwrap()
},
};
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 };
// TODO: probably should use output mmr indexes here
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 100 };
let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner();
let mut assets: Vec<AssetDefinition> = vec![];
let tip = self.get_tip_info().await?;
while let Some(r) = result.message().await.unwrap() {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.asset_public_key.as_bytes()) {
if let Some(checkpoint) = self
.get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32])
.await?
{
if let Some(committee) = checkpoint.get_side_chain_committee() {
if committee.contains(&dan_node_public_key) {
debug!(
target: LOG_TARGET,
"Node is on committee for asset : {}", asset_public_key
);
assets.push(AssetDefinition {
public_key: asset_public_key,
template_parameters: r
Expand All @@ -144,4 +152,32 @@ impl BaseNodeClient for GrpcBaseNodeClient {
}
Ok(assets)
}

async fn get_asset_registration(
&mut self,
asset_public_key: PublicKey,
) -> Result<Option<BaseLayerOutput>, DigitalAssetError> {
let conn = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};

let req = grpc::GetAssetMetadataRequest {
asset_public_key: asset_public_key.to_vec(),
};
let output = conn.get_asset_metadata(req).await.unwrap().into_inner();

let output = output
.features
.map(|features| match features.try_into() {
Ok(f) => Ok(BaseLayerOutput { features: f }),
Err(e) => Err(DigitalAssetError::ConversionError(e)),
})
.transpose()?;

Ok(output)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
.get_state_db(&asset_public_key)
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?
{
let mut unit_of_work = state.new_unit_of_work();
let mut state_db_reader = state.reader();
let response_bytes = self
.asset_processor
.invoke_read_method(template_id, request.method, &request.args, &mut unit_of_work)
.invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: response_bytes.unwrap_or_default(),
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ fn main_inner() -> Result<(), ExitError> {

async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError> {
let shutdown = Shutdown::new();
let validator_node_config = config
.validator_node
.as_ref()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "validator_node configuration not found"))?;

fs::create_dir_all(&config.peer_db_path).map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
let node_identity = setup_node_identity(
Expand Down Expand Up @@ -125,7 +129,7 @@ async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError
handles.expect_handle::<Dht>().discovery_service_requester(),
);
let asset_proxy: ConcreteAssetProxy<DefaultServiceSpecification> = ConcreteAssetProxy::new(
GrpcBaseNodeClient::new(config.validator_node.clone().unwrap().base_node_grpc_address),
GrpcBaseNodeClient::new(validator_node_config.base_node_grpc_address),
validator_node_client_factory,
5,
mempool_service.clone(),
Expand Down Expand Up @@ -197,7 +201,7 @@ async fn run_grpc<TServiceSpecification: ServiceSpecification + 'static>(
.serve_with_shutdown(grpc_address, shutdown_signal.map(|_| ()))
.await
.map_err(|err| {
error!(target: LOG_TARGET, "GRPC encountered an error:{}", err);
error!(target: LOG_TARGET, "GRPC encountered an error: {}", err);
err
})?;

Expand Down
Loading