Skip to content

Commit

Permalink
feat: add asset proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Dec 14, 2021
1 parent 9d55eb6 commit b818429
Show file tree
Hide file tree
Showing 45 changed files with 4,111 additions and 521 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions applications/tari_app_grpc/proto/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ message InvokeReadMethodRequest{

message InvokeReadMethodResponse {
optional bytes result = 1;
Authority authority = 2;
}

message Authority {
bytes node_public_key =1;
bytes signature = 2;
optional bytes proxied_by = 3;
}

message InvokeMethodRequest{
Expand Down
2,907 changes: 2,907 additions & 0 deletions applications/tari_explorer/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tari_storage = { path = "../../infrastructure/storage" }
tari_core = {path = "../../base_layer/core"}
tari_dan_core = {path = "../../dan_layer/core"}
tari_dan_storage_sqlite = {path = "../../dan_layer/storage_sqlite"}
tari_common_types = {path = "../../base_layer/common_types"}

anyhow = "1.0.32"
async-trait = "0.1.50"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message HotStuffMessage {
HotStuffTreeNode node= 4;
Signature partial_sig = 5;
optional bytes node_hash = 6;
bytes asset_public_key = 7;
}

message QuorumCertificate {
Expand Down
110 changes: 77 additions & 33 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration};
use std::{
fs,
fs::File,
io::BufReader,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};

use futures::future::try_join_all;
use log::*;
use tari_app_utilities::{
identity_management,
Expand Down Expand Up @@ -67,7 +75,7 @@ use tari_p2p::{
};
use tari_service_framework::{ServiceHandles, StackBuilder};
use tari_shutdown::ShutdownSignal;
use tokio::task;
use tokio::{task, try_join};

use crate::{
grpc::services::base_node_client::GrpcBaseNodeClient,
Expand Down Expand Up @@ -124,23 +132,50 @@ impl DanNode {

let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?;
if asset_definitions.is_empty() {
warn!(target: LOG_TARGET, "No assets to process. Exiting");
warn!(
target: LOG_TARGET,
"No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \
extension."
);
}
let db_factory = SqliteDbFactory::new(&self.config);
let mut tasks = vec![];
for asset in asset_definitions {
// TODO: spawn into multiple processes. This requires some routing as well.
self.start_asset_worker(
asset,
node_identity.as_ref().clone(),
mempool_service.clone(),
handles.clone(),
subscription_factory.clone(),
shutdown.clone(),
dan_config,
db_factory.clone(),
)
.await?;
let data_dir = self.config.data_dir.clone();
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();

tasks.push(task::spawn(async move {
DanNode::start_asset_worker(
data_dir,
asset,
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}));
}

if tasks.is_empty() {
// If there are no assets to process, work in proxy mode
tasks.push(task::spawn(DanNode::wait_for_exit()));
}
try_join_all(tasks)
.await
.map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))?
.into_iter()
.collect::<Result<_, _>>()?;

Ok(())
}

Expand All @@ -153,7 +188,6 @@ impl DanNode {
let mut result = vec![];
for path in paths {
let path = path.expect("Not a valid file").path();
dbg!(&path.extension());

if !path.is_dir() && path.extension().unwrap_or_default() == "asset" {
let file = File::open(path).expect("could not open file");
Expand All @@ -166,22 +200,38 @@ impl DanNode {
Ok(result)
}

async fn wait_for_exit() -> Result<(), ExitCodes> {
println!("Type `exit` to exit");
loop {
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line");
if line.to_lowercase().trim() == "exit" {
return Err(ExitCodes::UnknownError("User cancelled".to_string()));
} else {
println!("Type `exit` to exit");
}
}
}

// async fn start_asset_proxy(&self) -> Result<(), ExitCodes> {
// todo!()
// }

async fn start_asset_worker<
TMempoolService: MempoolService + Clone,
TDbFactory: DbFactory + Clone + Send + Sync,
>(
&self,
data_dir: PathBuf,
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
mempool_service: TMempoolService,
handles: ServiceHandles,
subscription_factory: SubscriptionFactory,
shutdown: ShutdownSignal,
config: &ValidatorNodeConfig,
config: ValidatorNodeConfig,
db_factory: TDbFactory,
) -> Result<(), ExitCodes> {
let timeout = Duration::from_secs(asset_definition.phase_timeout);
// TODO: read from base layer get asset definition
let committee = asset_definition
.initial_committee
.iter()
Expand All @@ -191,15 +241,6 @@ impl DanNode {
})
.collect::<Result<Vec<_>, _>>()?;

// let committee: Vec<CommsPublicKey> = dan_config
// .committee
// .iter()
// .map(|s| {
// CommsPublicKey::from_hex(s)
// .map_err(|e| ExitCodes::ConfigError(format!("could not convert to hex:{}", e)))
// })
// .collect::<Result<Vec<_>, _>>()?;
//
let committee = Committee::new(committee);
let committee_service = ConcreteCommitteeManager::new(committee);

Expand All @@ -208,26 +249,29 @@ impl DanNode {
let events_publisher = LoggingEventsPublisher::default();
let signing_service = NodeIdentitySigningService::new(node_identity.clone());

let _backend = LmdbAssetStore::initialize(self.config.data_dir.join("asset_data"), Default::default())
.map_err(|err| ExitCodes::DatabaseError(err.to_string()))?;
// let _backend = LmdbAssetStore::initialize(data_dir.join("asset_data"), Default::default())
// .map_err(|err| ExitCodes::DatabaseError(err.to_string()))?;
// let data_store = AssetDataStore::new(backend);
let asset_processor = ConcreteAssetProcessor::default();

let payload_processor = TariDanPayloadProcessor::new(asset_processor, mempool_service.clone());
let mut inbound = TariCommsInboundConnectionService::new();
let mut inbound = TariCommsInboundConnectionService::new(asset_definition.public_key.clone());
let receiver = inbound.take_receiver().unwrap();

let loopback = inbound.clone_sender();
let shutdown_2 = shutdown.clone();
task::spawn(async move {
let topic_subscription =
subscription_factory.get_subscription(TariMessageType::DanConsensusMessage, "HotStufMessages");
subscription_factory.get_subscription(TariMessageType::DanConsensusMessage, "HotStuffMessages");
inbound.run(shutdown_2, topic_subscription).await
});
let dht = handles.expect_handle::<Dht>();
let outbound = TariCommsOutboundService::new(dht.outbound_requester(), loopback);
let outbound =
TariCommsOutboundService::new(dht.outbound_requester(), loopback, asset_definition.public_key.clone());
let base_node_client = GrpcBaseNodeClient::new(config.base_node_grpc_address);
let chain_storage = SqliteStorageService {};
dbg!(&asset_definition);
dbg!("About to start consensus worker");
let mut consensus_worker = ConsensusWorker::new(
receiver,
outbound,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ use std::{convert::TryInto, net::SocketAddr};

use async_trait::async_trait;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{BaseLayerMetadata, BaseLayerOutput},
services::BaseNodeClient,
types::PublicKey,
DigitalAssetError,
};

#[derive(Clone)]
pub struct GrpcBaseNodeClient {
endpoint: SocketAddr,
inner: Option<grpc::base_node_client::BaseNodeClient<tonic::transport::Channel>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,57 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use tari_app_grpc::tari_rpc as rpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{Instruction, TemplateId},
services::{AssetProcessor, MempoolService},
services::{AssetProcessor, AssetProxy, MempoolService},
storage::DbFactory,
types::PublicKey,
};
use tonic::{Request, Response, Status};

pub struct ValidatorNodeGrpcServer<
TMempoolService: MempoolService,
TDbFactory: DbFactory,
TAssetProcessor: AssetProcessor,
TAssetProxy: AssetProxy,
> {
mempool: TMempoolService,
db_factory: TDbFactory,
asset_processor: TAssetProcessor,
asset_proxy: TAssetProxy,
}

impl<TMempoolService: MempoolService, TDbFactory: DbFactory, TAssetProcessor: AssetProcessor>
ValidatorNodeGrpcServer<TMempoolService, TDbFactory, TAssetProcessor>
impl<
TMempoolService: MempoolService,
TDbFactory: DbFactory,
TAssetProcessor: AssetProcessor,
TAssetProxy: AssetProxy,
> ValidatorNodeGrpcServer<TMempoolService, TDbFactory, TAssetProcessor, TAssetProxy>
{
pub fn new(mempool: TMempoolService, db_factory: TDbFactory, asset_processor: TAssetProcessor) -> Self {
pub fn new(
mempool: TMempoolService,
db_factory: TDbFactory,
asset_processor: TAssetProcessor,
asset_proxy: TAssetProxy,
) -> Self {
Self {
mempool,
db_factory,
asset_processor,
asset_proxy,
}
}
}

#[tonic::async_trait]
impl<TMempoolService, TDbFactory, TAssetProcessor> rpc::validator_node_server::ValidatorNode
for ValidatorNodeGrpcServer<TMempoolService, TDbFactory, TAssetProcessor>
impl<TMempoolService, TDbFactory, TAssetProcessor, TAssetProxy> rpc::validator_node_server::ValidatorNode
for ValidatorNodeGrpcServer<TMempoolService, TDbFactory, TAssetProcessor, TAssetProxy>
where
TMempoolService: MempoolService + Clone + Sync + Send + 'static,
TDbFactory: DbFactory + Sync + Send + 'static,
TAssetProcessor: AssetProcessor + Sync + Send + 'static,
TAssetProxy: AssetProxy + Sync + Send + 'static,
{
async fn get_token_data(
&self,
Expand Down Expand Up @@ -123,21 +136,53 @@ where
request: Request<rpc::InvokeReadMethodRequest>,
) -> Result<Response<rpc::InvokeReadMethodResponse>, Status> {
dbg!(&request);
let state = self
.db_factory
.create_state_db()
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?;
let request = request.into_inner();
let mut unit_of_work = state.new_unit_of_work();
let response_bytes = self
.asset_processor
.invoke_read_method(
TemplateId::from(request.template_id),
request.method,
&request.args,
&mut unit_of_work,
)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse { result: response_bytes }))
let asset_public_key = PublicKey::from_bytes(&request.asset_public_key)
.map_err(|err| Status::invalid_argument(format!("Asset public key was not a valid public key:{}", err)))?;
if let Some(state) = self
.db_factory
.get_state_db(&asset_public_key)
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?
{
let mut unit_of_work = state.new_unit_of_work();
let response_bytes = self
.asset_processor
.invoke_read_method(
TemplateId::from(request.template_id),
request.method,
&request.args,
&mut unit_of_work,
)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: response_bytes,
authority: Some(rpc::Authority {
node_public_key: vec![],
signature: vec![],
proxied_by: None,
}),
}))
} else {
// Forward to proxy
let response_bytes = self
.asset_proxy
.invoke_read_method(
&asset_public_key,
TemplateId::from(request.template_id),
request.method,
request.args,
)
.await
.map_err(|err| Status::internal(format!("Error calling proxied method:{}", err)))?;
// TODO: Populate authority
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: Some(response_bytes),
authority: Some(rpc::Authority {
node_public_key: vec![],
signature: vec![],
proxied_by: Some(vec![]),
}),
}))
}
}
}
Loading

0 comments on commit b818429

Please sign in to comment.