Skip to content

Commit

Permalink
Make service runtime short-lived by default (#2453)
Browse files Browse the repository at this point in the history
* Make long lived services optional (and not the default)

* fix README of LLM

* fix help

* fix worker tests

* address comments

* fix execution tests

* update CLI.md
  • Loading branch information
ma2bd authored Sep 5, 2024
1 parent 429a15c commit e3d0ab1
Show file tree
Hide file tree
Showing 21 changed files with 197 additions and 121 deletions.
1 change: 1 addition & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp

Default value: `10`
* `--wait-for-outgoing-messages` — Whether to wait until a quorum of validators has confirmed that all sent cross-chain messages have been delivered
* `--long-lived-services` — (EXPERIMENTAL) Whether application services can persist in some cases between queries
* `--tokio-threads <TOKIO_THREADS>` — The number of Tokio worker threads to use
* `--blanket-message-policy <BLANKET_MESSAGE_POLICY>` — The policy for handling incoming messages

Expand Down
13 changes: 5 additions & 8 deletions examples/llm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ The model used by Linera Stories is a 40M parameter TinyLlama by A. Karpathy. Fi

CAVEAT:

* We currently use a local HTTP service to provide model data to the wallet
implementation (aka "node service"). In the future, model data will be stored on-chain
([#1981](https://github.com/linera-io/linera-protocol/issues/1981)) or in an external
decentralized storage.

* Running larger LLMs with acceptable performance will likely require hardware acceleration ([#1931](https://github.com/linera-io/linera-protocol/issues/1931)).

* The service currently is restarted when the wallet receives a new block for the chain where the
Expand Down Expand Up @@ -59,8 +54,11 @@ Then, a node service for the current wallet has to be started:

```bash
PORT=8080
linera service --port $PORT &
```
linera --long-lived-services service --port $PORT &
```

The experimental option `--long-lived-services` is used for performance, to avoid
reloading the model between queries.

Next, navigate to `llm/web-frontend` and install the requisite `npm`
dependencies:
Expand All @@ -75,6 +73,5 @@ Finally, navigate to `localhost:3000` to interact with the Linera ChatBot.
```bash
echo "http://localhost:3000/$CHAIN?app=$APP_ID&port=$PORT"
```


<!-- cargo-rdme end -->
14 changes: 6 additions & 8 deletions examples/llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ The model used by Linera Stories is a 40M parameter TinyLlama by A. Karpathy. Fi
CAVEAT:
* We currently use a local HTTP service to provide model data to the wallet
implementation (aka "node service"). In the future, model data will be stored on-chain
([#1981](https://github.com/linera-io/linera-protocol/issues/1981)) or in an external
decentralized storage.
* Running larger LLMs with acceptable performance will likely require hardware acceleration ([#1931](https://github.com/linera-io/linera-protocol/issues/1931)).
* The service currently is restarted when the wallet receives a new block for the chain where the
Expand Down Expand Up @@ -61,8 +56,11 @@ Then, a node service for the current wallet has to be started:
```bash
PORT=8080
linera service --port $PORT &
```
linera --long-lived-services service --port $PORT &
```
The experimental option `--long-lived-services` is used for performance, to avoid
reloading the model between queries.
Next, navigate to `llm/web-frontend` and install the requisite `npm`
dependencies:
Expand All @@ -77,7 +75,7 @@ Finally, navigate to `localhost:3000` to interact with the Linera ChatBot.
```bash
echo "http://localhost:3000/$CHAIN?app=$APP_ID&port=$PORT"
```
*/
*/

use async_graphql::{Request, Response};
use linera_sdk::base::{ContractAbi, ServiceAbi};
Expand Down
20 changes: 6 additions & 14 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use linera_base::{
},
};
use linera_execution::{
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRequest,
ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation,
OperationContext, Query, QueryContext, RawExecutionOutcome, RawOutgoingMessage,
ResourceController, ResourceTracker, Response, ServiceRuntimeRequest, TransactionTracker,
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response,
ServiceRuntimeEndpoint, TransactionTracker,
};
use linera_views::{
context::Context,
Expand Down Expand Up @@ -375,10 +375,7 @@ where
&mut self,
local_time: Timestamp,
query: Query,
incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
ExecutionRequest,
>,
runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
) -> Result<Response, ChainError> {
let context = QueryContext {
chain_id: self.chain_id(),
Expand All @@ -387,12 +384,7 @@ where
};
let response = self
.execution_state
.query_application(
context,
query,
incoming_execution_requests,
runtime_request_sender,
)
.query_application(context, query, service_runtime_endpoint)
.await
.map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?;
Ok(response)
Expand Down
1 change: 1 addition & 0 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ where
storage,
options.max_pending_message_bundles,
delivery,
options.long_lived_services,
wallet.chain_ids(),
"Client node",
);
Expand Down
4 changes: 4 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ pub struct ClientOptions {
#[arg(long)]
pub wait_for_outgoing_messages: bool,

/// (EXPERIMENTAL) Whether application services can persist in some cases between queries.
#[arg(long)]
pub long_lived_services: bool,

/// The number of Tokio worker threads to use.
#[arg(long, env = "LINERA_CLIENT_TOKIO_THREADS")]
pub tokio_threads: Option<usize>,
Expand Down
1 change: 1 addition & 0 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
storage.clone(),
10,
delivery,
false,
[chain_id0],
"Client node",
)),
Expand Down
41 changes: 23 additions & 18 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use linera_chain::{
ChainStateView,
};
use linera_execution::{
committee::Epoch, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest,
ServiceSyncRuntime,
committee::Epoch, Query, QueryContext, Response, ServiceRuntimeEndpoint, ServiceSyncRuntime,
};
use linera_storage::Storage;
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
Expand Down Expand Up @@ -134,7 +133,7 @@ where
StorageClient: Storage + Clone + Send + Sync + 'static,
{
worker: ChainWorkerState<StorageClient>,
service_runtime_thread: linera_base::task::BlockingFuture<()>,
service_runtime_thread: Option<linera_base::task::BlockingFuture<()>>,
}

impl<StorageClient> ChainWorkerActor<StorageClient>
Expand All @@ -151,8 +150,14 @@ where
tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
chain_id: ChainId,
) -> Result<Self, WorkerError> {
let (service_runtime_thread, execution_state_receiver, runtime_request_sender) =
Self::spawn_service_runtime_actor(chain_id);
let (service_runtime_thread, service_runtime_endpoint) = {
if config.long_lived_services {
let (thread, endpoint) = Self::spawn_service_runtime_actor(chain_id);
(Some(thread), Some(endpoint))
} else {
(None, None)
}
};

let worker = ChainWorkerState::load(
config,
Expand All @@ -161,8 +166,7 @@ where
blob_cache,
tracked_chains,
chain_id,
execution_state_receiver,
runtime_request_sender,
service_runtime_endpoint,
)
.await?;

Expand All @@ -179,28 +183,27 @@ where
chain_id: ChainId,
) -> (
linera_base::task::BlockingFuture<()>,
futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
std::sync::mpsc::Sender<ServiceRuntimeRequest>,
ServiceRuntimeEndpoint,
) {
let context = QueryContext {
chain_id,
next_block_height: BlockHeight(0),
local_time: Timestamp::from(0),
};

let (execution_state_sender, execution_state_receiver) =
let (execution_state_sender, incoming_execution_requests) =
futures::channel::mpsc::unbounded();
let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();

let service_runtime_thread = linera_base::task::spawn_blocking(move || {
ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
});

(
service_runtime_thread,
execution_state_receiver,
let endpoint = ServiceRuntimeEndpoint {
incoming_execution_requests,
runtime_request_sender,
)
};
(service_runtime_thread, endpoint)
}

/// Runs the worker until there are no more incoming requests.
Expand Down Expand Up @@ -311,10 +314,12 @@ where
}
}

drop(self.worker);
self.service_runtime_thread
.await
.expect("Service runtime thread should not panic");
if let Some(thread) = self.service_runtime_thread {
drop(self.worker);
thread
.await
.expect("Service runtime thread should not panic");
}

trace!("`ChainWorkerActor` finished");
}
Expand Down
2 changes: 2 additions & 0 deletions linera-core/src/chain_worker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ChainWorkerConfig {
pub allow_inactive_chains: bool,
/// Whether new messages from deprecated epochs are allowed.
pub allow_messages_from_deprecated_epochs: bool,
/// Whether the user application services should be long-lived.
pub long_lived_services: bool,
/// Blocks with a timestamp this far in the future will still be accepted, but the validator
/// will wait until that timestamp before voting.
pub grace_period: Duration,
Expand Down
12 changes: 4 additions & 8 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use linera_chain::{
ChainError, ChainStateView,
};
use linera_execution::{
committee::Epoch, ExecutionRequest, Message, Query, QueryContext, Response,
ServiceRuntimeRequest, SystemMessage,
committee::Epoch, Message, Query, QueryContext, Response, ServiceRuntimeEndpoint, SystemMessage,
};
use linera_storage::{Clock as _, Storage};
use linera_views::views::{ClonableView, ViewError};
Expand Down Expand Up @@ -55,8 +54,7 @@ where
storage: StorageClient,
chain: ChainStateView<StorageClient::Context>,
shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
execution_state_receiver: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
recent_hashed_certificate_values: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
recent_blobs: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
Expand All @@ -76,8 +74,7 @@ where
blob_cache: Arc<ValueCache<BlobId, Blob>>,
tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
chain_id: ChainId,
execution_state_receiver: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
) -> Result<Self, WorkerError> {
let chain = storage.load_chain(chain_id).await?;

Expand All @@ -86,8 +83,7 @@ where
storage,
chain,
shared_chain_view: None,
execution_state_receiver,
runtime_request_sender,
service_runtime_endpoint,
recent_hashed_certificate_values: certificate_value_cache,
recent_blobs: blob_cache,
tracked_chains,
Expand Down
7 changes: 1 addition & 6 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ where
let response = self
.0
.chain
.query_application(
local_time,
query,
&mut self.0.execution_state_receiver,
&mut self.0.runtime_request_sender,
)
.query_application(local_time, query, self.0.service_runtime_endpoint.as_mut())
.await?;
Ok(response)
}
Expand Down
2 changes: 2 additions & 0 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
storage: S,
max_pending_message_bundles: usize,
cross_chain_message_delivery: CrossChainMessageDelivery,
long_lived_services: bool,
tracked_chains: impl IntoIterator<Item = ChainId>,
name: impl Into<String>,
) -> Self {
Expand All @@ -123,6 +124,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
tracked_chains.clone(),
NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
)
.with_long_lived_services(long_lived_services)
.with_allow_inactive_chains(true)
.with_allow_messages_from_deprecated_epochs(true);
let local_node = LocalNodeClient::new(state);
Expand Down
1 change: 1 addition & 0 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ where
storage,
10,
CrossChainMessageDelivery::NonBlocking,
false,
[chain_id],
format!("Client node for {:.8}", chain_id),
));
Expand Down
Loading

0 comments on commit e3d0ab1

Please sign in to comment.