Skip to content

Commit

Permalink
fix(client): change to AtomicU64 to AtomicUsize (#1293)
Browse files Browse the repository at this point in the history
* fix(client): change to `AtomicU64` to `AtomicUsize`

Some targets may not support AtomicU64.
This PR moves to `AtomicUsize` instead to support more targets.

* Update core/src/client/mod.rs

* fix benches
  • Loading branch information
niklasad1 authored Feb 12, 2024
1 parent d9c4a5c commit aa257c0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
6 changes: 3 additions & 3 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_htt
/// Run jsonrpc WebSocket server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_server::Server) {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};

use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::jsonrpc_core::*;
use jsonrpc_ws_server::*;

static ID: AtomicU64 = AtomicU64::new(0);
static ID: AtomicUsize = AtomicUsize::new(0);

let handle2 = handle.clone();

Expand All @@ -78,7 +78,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_se
SUB_METHOD_NAME,
(SUB_METHOD_NAME, move |_params: Params, _, subscriber: Subscriber| {
handle2.spawn(async move {
let id = ID.fetch_add(1, Ordering::Relaxed);
let id = ID.fetch_add(1, Ordering::Relaxed).try_into().unwrap();
let sink = subscriber.assign_id(SubscriptionId::Number(id)).unwrap();
// NOTE(niklasad1): the way jsonrpc works this is the only way to get this working
// -> jsonrpc responds to the request in background so not possible to know when the request
Expand Down
28 changes: 22 additions & 6 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use error::Error;
use std::fmt;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task;

Expand Down Expand Up @@ -396,15 +396,15 @@ pub struct RequestIdManager {
/// Max concurrent pending requests allowed.
max_concurrent_requests: usize,
/// Get the next request ID.
current_id: AtomicU64,
current_id: CurrentId,
/// Request ID type.
id_kind: IdKind,
}

impl RequestIdManager {
/// Create a new `RequestIdGuard` with the provided concurrency limit.
pub fn new(limit: usize, id_kind: IdKind) -> Self {
Self { current_pending: Arc::new(()), max_concurrent_requests: limit, current_id: AtomicU64::new(0), id_kind }
Self { current_pending: Arc::new(()), max_concurrent_requests: limit, current_id: CurrentId::new(), id_kind }
}

fn get_slot(&self) -> Result<Arc<()>, Error> {
Expand All @@ -421,7 +421,7 @@ impl RequestIdManager {
/// Fails if request limit has been exceeded.
pub fn next_request_id(&self) -> Result<RequestIdGuard<Id<'static>>, Error> {
let rc = self.get_slot()?;
let id = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
let id = self.id_kind.into_id(self.current_id.next());

Ok(RequestIdGuard { _rc: rc, id })
}
Expand All @@ -432,8 +432,8 @@ impl RequestIdManager {
/// Fails if request limit has been exceeded.
pub fn next_request_two_ids(&self) -> Result<RequestIdGuard<(Id<'static>, Id<'static>)>, Error> {
let rc = self.get_slot()?;
let id1 = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
let id2 = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
let id1 = self.id_kind.into_id(self.current_id.next());
let id2 = self.id_kind.into_id(self.current_id.next());
Ok(RequestIdGuard { _rc: rc, id: (id1, id2) })
}

Expand Down Expand Up @@ -487,6 +487,22 @@ impl IdKind {
}
}

#[derive(Debug)]
struct CurrentId(AtomicUsize);

impl CurrentId {
fn new() -> Self {
CurrentId(AtomicUsize::new(0))
}

fn next(&self) -> u64 {
self.0
.fetch_add(1, Ordering::Relaxed)
.try_into()
.expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
}
}

/// Generate a range of IDs to be used in a batch request.
pub fn generate_batch_id_range(guard: &RequestIdGuard<Id>, len: u64) -> Result<Range<u64>, Error> {
let id_start = guard.inner().try_parse_inner_as_number()?;
Expand Down

0 comments on commit aa257c0

Please sign in to comment.