Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ma2bd committed Sep 5, 2024
1 parent f6fa238 commit 0e04672
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 72 deletions.
20 changes: 6 additions & 14 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use linera_base::{
},
};
use linera_execution::{
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRequest,
ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation,
OperationContext, Query, QueryContext, RawExecutionOutcome, RawOutgoingMessage,
ResourceController, ResourceTracker, Response, ServiceRuntimeRequest, TransactionTracker,
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response,
ServiceRuntimeEndpoint, TransactionTracker,
};
use linera_views::{
context::Context,
Expand Down Expand Up @@ -375,10 +375,7 @@ where
&mut self,
local_time: Timestamp,
query: Query,
incoming_execution_requests: Option<
&mut futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
>,
runtime_request_sender: Option<&mut std::sync::mpsc::Sender<ServiceRuntimeRequest>>,
service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
) -> Result<Response, ChainError> {
let context = QueryContext {
chain_id: self.chain_id(),
Expand All @@ -387,12 +384,7 @@ where
};
let response = self
.execution_state
.query_application(
context,
query,
incoming_execution_requests,
runtime_request_sender,
)
.query_application(context, query, service_runtime_endpoint)
.await
.map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?;
Ok(response)
Expand Down
24 changes: 15 additions & 9 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use linera_chain::{
ChainStateView,
};
use linera_execution::{
committee::Epoch, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest,
ServiceSyncRuntime,
committee::Epoch, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeEndpoint,
ServiceRuntimeRequest, ServiceSyncRuntime,
};
use linera_storage::Storage;
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
Expand Down Expand Up @@ -151,12 +151,19 @@ where
tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
chain_id: ChainId,
) -> Result<Self, WorkerError> {
let (service_runtime_thread, execution_state_receiver, runtime_request_sender) = {
let (service_runtime_endpoint, service_runtime_thread) = {
if config.long_lived_services {
let (thread, receiver, sender) = Self::spawn_service_runtime_actor(chain_id);
(Some(thread), Some(receiver), Some(sender))
let (thread, incoming_execution_requests, runtime_request_sender) =
Self::spawn_service_runtime_actor(chain_id);
(
Some(ServiceRuntimeEndpoint {
incoming_execution_requests,
runtime_request_sender,
}),
Some(thread),
)
} else {
(None, None, None)
(None, None)
}
};

Expand All @@ -167,8 +174,7 @@ where
blob_cache,
tracked_chains,
chain_id,
execution_state_receiver,
runtime_request_sender,
service_runtime_endpoint,
)
.await?;

Expand Down Expand Up @@ -317,8 +323,8 @@ where
}
}

drop(self.worker);
if let Some(thread) = self.service_runtime_thread {
drop(self.worker);
thread
.await
.expect("Service runtime thread should not panic");
Expand Down
14 changes: 4 additions & 10 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use linera_chain::{
ChainError, ChainStateView,
};
use linera_execution::{
committee::Epoch, ExecutionRequest, Message, Query, QueryContext, Response,
ServiceRuntimeRequest, SystemMessage,
committee::Epoch, Message, Query, QueryContext, Response, ServiceRuntimeEndpoint, SystemMessage,
};
use linera_storage::{Clock as _, Storage};
use linera_views::views::{ClonableView, ViewError};
Expand Down Expand Up @@ -55,8 +54,7 @@ where
storage: StorageClient,
chain: ChainStateView<StorageClient::Context>,
shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
execution_state_receiver: Option<futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>>,
runtime_request_sender: Option<std::sync::mpsc::Sender<ServiceRuntimeRequest>>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
recent_hashed_certificate_values: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
recent_blobs: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
Expand All @@ -76,10 +74,7 @@ where
blob_cache: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
chain_id: ChainId,
execution_state_receiver: Option<
futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
>,
runtime_request_sender: Option<std::sync::mpsc::Sender<ServiceRuntimeRequest>>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
) -> Result<Self, WorkerError> {
let chain = storage.load_chain(chain_id).await?;

Expand All @@ -88,8 +83,7 @@ where
storage,
chain,
shared_chain_view: None,
execution_state_receiver,
runtime_request_sender,
service_runtime_endpoint,
recent_hashed_certificate_values: certificate_value_cache,
recent_blobs: blob_cache,
tracked_chains,
Expand Down
7 changes: 1 addition & 6 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ where
let response = self
.0
.chain
.query_application(
local_time,
query,
self.0.execution_state_receiver.as_mut(),
self.0.runtime_request_sender.as_mut(),
)
.query_application(local_time, query, self.0.service_runtime_endpoint.as_mut())
.await?;
Ok(response)
}
Expand Down
24 changes: 14 additions & 10 deletions linera-execution/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ pub struct ExecutionStateView<C> {
pub users: HashedReentrantCollectionView<C, UserApplicationId, KeyValueStoreView<C>>,
}

/// How to interact with a long-lived service runtime.
pub struct ServiceRuntimeEndpoint {
/// How to receive requests.
pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
/// How to query the runtime.
pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
}

#[cfg(with_testing)]
impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
where
Expand Down Expand Up @@ -443,10 +451,7 @@ where
&mut self,
context: QueryContext,
query: Query,
incoming_execution_requests: Option<
&mut futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
>,
runtime_request_sender: Option<&mut std::sync::mpsc::Sender<ServiceRuntimeRequest>>,
endpoint: Option<&mut ServiceRuntimeEndpoint>,
) -> Result<Response, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match query {
Expand All @@ -459,22 +464,21 @@ where
bytes,
} => {
let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
let response = match (incoming_execution_requests, runtime_request_sender) {
(Some(requests), Some(sender)) => {
let response = match endpoint {
Some(endpoint) => {
self.query_user_application_with_long_lived_service(
application_id,
context,
bytes,
requests,
sender,
&mut endpoint.incoming_execution_requests,
&mut endpoint.runtime_request_sender,
)
.await?
}
(None, None) => {
None => {
self.query_user_application(application_id, context, bytes)
.await?
}
_ => unreachable!("invalid parameters"),
};
Ok(Response::User(response))
}
Expand Down
2 changes: 1 addition & 1 deletion linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub use crate::wasm::{
};
pub use crate::{
applications::ApplicationRegistryView,
execution::ExecutionStateView,
execution::{ExecutionStateView, ServiceRuntimeEndpoint},
execution_state_actor::ExecutionRequest,
policy::ResourceControlPolicy,
resources::{ResourceController, ResourceTracker},
Expand Down
22 changes: 10 additions & 12 deletions linera-execution/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub use self::{
};
use crate::{
ApplicationRegistryView, ExecutionRequest, ExecutionRuntimeContext, ExecutionStateView,
QueryContext, ServiceRuntimeRequest, ServiceSyncRuntime, TestExecutionRuntimeContext,
UserApplicationDescription, UserApplicationId,
QueryContext, ServiceRuntimeEndpoint, ServiceRuntimeRequest, ServiceSyncRuntime,
TestExecutionRuntimeContext, UserApplicationDescription, UserApplicationId,
};

pub fn create_dummy_user_application_description(index: u64) -> UserApplicationDescription {
Expand Down Expand Up @@ -109,20 +109,18 @@ impl QueryContext {
/// Spawns a thread running the [`ServiceSyncRuntime`] actor.
///
/// Returns the endpoints to communicate with the actor.
pub fn spawn_service_runtime_actor(
self,
) -> (
futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) {
let (execution_state_sender, execution_state_receiver) =
pub fn spawn_service_runtime_actor(self) -> ServiceRuntimeEndpoint {
let (execution_state_sender, incoming_execution_requests) =
futures::channel::mpsc::unbounded();
let (request_sender, request_receiver) = std::sync::mpsc::channel();
let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();

thread::spawn(move || {
ServiceSyncRuntime::new(execution_state_sender, self).run(request_receiver)
ServiceSyncRuntime::new(execution_state_sender, self).run(runtime_request_receiver)
});

(execution_state_receiver, request_sender)
ServiceRuntimeEndpoint {
incoming_execution_requests,
runtime_request_sender,
}
}
}
7 changes: 2 additions & 5 deletions linera-execution/tests/test_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,15 @@ async fn test_simple_user_operation() -> anyhow::Result<()> {
next_block_height: BlockHeight(0),
local_time: Timestamp::from(0),
};
let (mut execution_request_receiver, mut runtime_request_sender) =
context.spawn_service_runtime_actor();
let mut service_runtime_endpoint = context.spawn_service_runtime_actor();
assert_eq!(
view.query_application(
context,
Query::User {
application_id: caller_id,
bytes: vec![]
},
Some(&mut execution_request_receiver),
Some(&mut runtime_request_sender),
Some(&mut service_runtime_endpoint),
)
.await
.unwrap(),
Expand All @@ -232,7 +230,6 @@ async fn test_simple_user_operation() -> anyhow::Result<()> {
bytes: vec![]
},
None,
None,
)
.await
.unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion linera-execution/tests/test_system_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn test_simple_system_query() -> anyhow::Result<()> {
local_time: Timestamp::from(0),
};
let response = view
.query_application(context, Query::System(SystemQuery), None, None)
.query_application(context, Query::System(SystemQuery), None)
.await
.unwrap();
assert_eq!(
Expand Down
6 changes: 2 additions & 4 deletions linera-execution/tests/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ async fn test_fuel_for_counter_wasm_application(
next_block_height: BlockHeight(0),
local_time: Timestamp::from(0),
};
let (mut execution_request_receiver, mut runtime_request_sender) =
context.spawn_service_runtime_actor();
let mut service_runtime_endpoint = context.spawn_service_runtime_actor();
let expected_value = async_graphql::Response::new(
async_graphql::Value::from_json(json!({"value" : increments.into_iter().sum::<u64>()}))
.unwrap(),
Expand All @@ -136,8 +135,7 @@ async fn test_fuel_for_counter_wasm_application(
.query_application(
context,
Query::user(app_id, &request).unwrap(),
Some(&mut execution_request_receiver),
Some(&mut runtime_request_sender),
Some(&mut service_runtime_endpoint),
)
.await?
else {
Expand Down

0 comments on commit 0e04672

Please sign in to comment.