Skip to content

Commit

Permalink
Merge pull request #51 from xelis-project/dev
Browse files Browse the repository at this point in the history
Hotfix 1.9.5
  • Loading branch information
Slixe authored Apr 28, 2024
2 parents 3724018 + 2115b52 commit 393fc02
Show file tree
Hide file tree
Showing 23 changed files with 224 additions and 120 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions xelis_common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "xelis_common"
version = "1.9.4"
version = "1.9.5"
edition = "2021"
authors = ["Slixe <slixeprivate@gmail.com>"]
build = "build.rs"
Expand All @@ -27,7 +27,7 @@ anyhow = "1.0.81"
log = "0.4"
fern = { version = "0.6", features = ["colored", "date-based"] }
chrono = "0.4.35"
tokio = { version = "1.36", features = ["macros", "signal", "time", "sync"], optional = true }
tokio = { version = "1.36", features = ["macros", "signal", "time", "sync", "tracing"], optional = true }
reqwest = { version = "0.11.25", default-features = false, features = ["json"], optional = true }
clap = { version = "4.5.2", features = ["derive"], optional = true }
crossterm = "0.27.0"
Expand Down
46 changes: 27 additions & 19 deletions xelis_common/src/json_rpc/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,34 @@ use std::{
collections::HashMap,
hash::Hash,
marker::PhantomData,
borrow::Cow, time::Duration
borrow::Cow,
time::Duration
};

use anyhow::Error;
use futures_util::{StreamExt, stream::{SplitSink, SplitStream}, SinkExt};
use futures_util::{
StreamExt,
stream::{SplitSink, SplitStream},
SinkExt
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{Value, json};
use tokio::{net::TcpStream, sync::{broadcast, oneshot, Mutex}, task::JoinHandle, time::sleep};
use tokio_tungstenite::{WebSocketStream, MaybeTlsStream, connect_async, tungstenite::Message};
use tokio::{
net::TcpStream,
sync::{broadcast, oneshot, Mutex},
task::JoinHandle,
time::sleep
};
use tokio_tungstenite::{
WebSocketStream,
MaybeTlsStream,
connect_async,
tungstenite::Message
};
use log::{debug, error, trace, warn};

use crate::api::SubscribeParams;
use crate::{
api::SubscribeParams,
utils::{sanitize_daemon_address, spawn_task}
};

use super::{JSON_RPC_VERSION, JsonRPCError, JsonRPCResponse, JsonRPCResult};

Expand Down Expand Up @@ -101,15 +117,7 @@ impl<E: Serialize + Hash + Eq + Send + Sync + Clone + 'static> WebSocketJsonRPCC
}

pub async fn new(mut target: String) -> Result<WebSocketJsonRPCClient<E>, JsonRPCError> {
if target.starts_with("https://") {
target.replace_range(..8, "wss://");
}
else if target.starts_with("http://") {
target.replace_range(..7, "ws://");
}
else if !target.starts_with("ws://") && !target.starts_with("wss://") {
target.insert_str(0, "ws://");
}
target = sanitize_daemon_address(target.as_str());

let ws = Self::connect_to(&target).await?;

Expand All @@ -130,7 +138,7 @@ impl<E: Serialize + Hash + Eq + Send + Sync + Clone + 'static> WebSocketJsonRPCC

{
let zelf = client.clone();
let handle = tokio::spawn(async move {
let handle = spawn_task("ws-subscribe-events", async move {
if let Err(e) = zelf.read(read).await {
error!("Error in the WebSocket client ioloop: {:?}", e);
};
Expand Down Expand Up @@ -270,7 +278,7 @@ impl<E: Serialize + Hash + Eq + Send + Sync + Clone + 'static> WebSocketJsonRPCC
}

let zelf = self.clone();
let handle = tokio::spawn(async move {
let handle = spawn_task("WS ioloop", async move {
if let Err(e) = zelf.read(read).await {
error!("Error in the WebSocket client ioloop: {:?}", e);
};
Expand Down Expand Up @@ -324,7 +332,7 @@ impl<E: Serialize + Hash + Eq + Send + Sync + Clone + 'static> WebSocketJsonRPCC
// Register all events again
{
let client = self.clone();
tokio::spawn(async move {
spawn_task("ws-subscribe-events", async move {
if let Err(e) = client.resubscribe_events().await {
error!("Error while resubscribing to events: {:?}", e);
}
Expand Down
15 changes: 13 additions & 2 deletions xelis_common/src/rpc_server/websocket/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@ use log::debug;
use serde_json::{Value, json};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::Mutex;
use crate::{rpc_server::{RPCHandler, RpcResponseError, InternalRpcError, RpcRequest, RpcResponse}, api::{SubscribeParams, EventResult}, context::Context};
use crate::{
api::{EventResult, SubscribeParams},
context::Context,
rpc_server::{
InternalRpcError,
RPCHandler,
RpcRequest,
RpcResponse,
RpcResponseError
},
utils::spawn_task
};
use super::{WebSocketSessionShared, WebSocketHandler};

// generic websocket handler supporting event subscriptions
Expand Down Expand Up @@ -46,7 +57,7 @@ where
if let Some(id) = subscriptions.get(event) {
let response = json!(RpcResponse::new(Cow::Borrowed(&id), Cow::Borrowed(&value)));
let session = session.clone();
tokio::spawn(async move {
spawn_task(format!("notify-ws-{}", session.id), async move {
if let Err(e) = session.send_text(response.to_string()).await {
debug!("Error occured while notifying a new event: {}", e);
};
Expand Down
6 changes: 4 additions & 2 deletions xelis_common/src/rpc_server/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use futures_util::StreamExt;
use log::{debug, error, trace};
use tokio::{sync::Mutex, select};

use crate::utils::spawn_task;

pub use self::{
handler::EventWebSocketHandler,
http_request::HttpRequest
Expand Down Expand Up @@ -220,7 +222,7 @@ impl<H> WebSocketServer<H> where H: WebSocketHandler + 'static {
// call on_close
let zelf = Arc::clone(self);
let session = session.clone();
tokio::spawn(async move {
spawn_task(format!("ws-on-close-{}", session.id), async move {
if let Err(e) = zelf.handler.on_close(&session).await {
debug!("Error while calling on_close: {}", e);
}
Expand Down Expand Up @@ -284,7 +286,7 @@ impl<H> WebSocketServer<H> where H: WebSocketHandler + 'static {
debug!("Received text message for session #{}: {}", session.id, text);
let zelf = Arc::clone(&self);
let session = session.clone();
tokio::spawn(async move {
spawn_task(format!("ws-on-msg-{}", session.id), async move {
if let Err(e) = zelf.handler.on_message(session, text.into_bytes()).await {
debug!("Error while calling on_message: {}", e);
}
Expand Down
17 changes: 16 additions & 1 deletion xelis_common/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::net::SocketAddr;
use tokio::task::{Builder, JoinHandle};
use std::{
net::SocketAddr,
future::Future,
};

use crate::{
config::{
Expand Down Expand Up @@ -127,6 +131,17 @@ pub fn sanitize_daemon_address(target: &str) -> String {
target
}

// Spawn a new task with a name
pub fn spawn_task<Fut, S: Into<String>>(name: S, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let name_str = name.into();
let name = name_str.as_str();
Builder::new().name(name).spawn(future).expect(name)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion xelis_daemon/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "xelis_daemon"
version = "1.9.4"
version = "1.9.5"
edition = "2021"
authors = ["Slixe <slixeprivate@gmail.com>"]

Expand Down
12 changes: 6 additions & 6 deletions xelis_daemon/src/core/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use xelis_common::{
TimestampMillis
},
transaction::{verify::BlockchainVerificationState, Transaction, TransactionType},
utils::{calculate_tx_fee, format_xelis},
utils::{calculate_tx_fee, format_xelis, spawn_task},
varuint::VarUint
};
use crate::{
Expand Down Expand Up @@ -359,7 +359,7 @@ impl<S: Storage> Blockchain<S> {
if let Some(simulator) = arc.simulator {
warn!("Simulator {} mode enabled!", simulator);
let blockchain = Arc::clone(&arc);
tokio::spawn(async move {
spawn_task("simulator", async move {
simulator.start(blockchain).await;
});
}
Expand Down Expand Up @@ -2157,7 +2157,7 @@ impl<S: Storage> Blockchain<S> {
let pruned_topoheight = storage.get_pruned_topoheight().await?;
let block = block.clone();
let block_hash = block_hash.clone();
tokio::spawn(async move {
spawn_task("broadcast-block", async move {
p2p.broadcast_block(&block, cumulative_difficulty, current_topoheight, current_height, pruned_topoheight, &block_hash, mining).await;
});
}
Expand All @@ -2169,7 +2169,7 @@ impl<S: Storage> Blockchain<S> {
if broadcast {
if let Some(getwork) = rpc.getwork_server() {
let getwork = getwork.clone();
tokio::spawn(async move {
spawn_task("notify-new-job", async move {
if let Err(e) = getwork.notify_new_job().await {
debug!("Error while notifying new job to miners: {}", e);
}
Expand All @@ -2192,7 +2192,7 @@ impl<S: Storage> Blockchain<S> {

let rpc = rpc.clone();
// don't block mutex/lock more than necessary, we move it in another task
tokio::spawn(async move {
spawn_task("rpc-notify-events", async move {
for (event, values) in events {
for value in values {
if let Err(e) = rpc.notify_clients(&event, value).await {
Expand Down Expand Up @@ -2389,7 +2389,7 @@ impl<S: Storage> Blockchain<S> {
if stable_height != previous_stable_height {
if rpc.is_event_tracked(&NotifyEvent::StableHeightChanged).await {
let rpc = rpc.clone();
tokio::spawn(async move {
spawn_task("rpc-notify-stable-height", async move {
let event = json!(StableHeightChangedEvent {
previous_stable_height,
new_stable_height: stable_height
Expand Down
18 changes: 9 additions & 9 deletions xelis_daemon/src/core/state/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,15 @@ impl<'a, S: Storage> BlockchainVerificationState<'a, BlockchainError> for ChainS

let reference = tx.get_reference();
// Verify that the block he is built upon exists
if !self.storage.has_block_with_hash(&reference.hash).await? {
debug!("TX {} reference {} was not found in storage, checking if its below pruned topoheight", tx.hash(), reference.hash);
let pruned_topoheight = self.storage.get_pruned_topoheight().await?
.unwrap_or(0);
if pruned_topoheight < reference.topoheight {
debug!("Invalid reference for tx {}: block {} not found", tx.hash(), reference.hash);
return Err(BlockchainError::InvalidReferenceHash);
}
}
// if !self.storage.has_block_with_hash(&reference.hash).await? || !self.storage.is_block_topological_ordered(&reference.hash).await {
// debug!("TX {} reference {} was not found in storage, checking if its below pruned topoheight", tx.hash(), reference.hash);
// let pruned_topoheight = self.storage.get_pruned_topoheight().await?
// .unwrap_or(0);
// if pruned_topoheight < reference.topoheight {
// debug!("Invalid reference for tx {}: block {} not found", tx.hash(), reference.hash);
// return Err(BlockchainError::InvalidReferenceHash);
// }
// }

// Verify that it is not a fake topoheight
if self.topoheight < reference.topoheight {
Expand Down
8 changes: 4 additions & 4 deletions xelis_daemon/src/core/state/mempool_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ impl<'a, S: Storage> BlockchainVerificationState<'a, BlockchainError> for Mempoo

let reference = tx.get_reference();
// Verify that the block he is built upon exists
if !self.storage.has_block_with_hash(&reference.hash).await? {
debug!("Invalid reference: block {} not found", reference.hash);
return Err(BlockchainError::InvalidReferenceHash);
}
// if !self.storage.has_block_with_hash(&reference.hash).await? || !self.storage.is_block_topological_ordered(&reference.hash).await {
// debug!("Invalid reference: block {} not found", reference.hash);
// return Err(BlockchainError::InvalidReferenceHash);
// }

// Verify that it is not a fake topoheight
if self.topoheight < reference.topoheight {
Expand Down
Loading

0 comments on commit 393fc02

Please sign in to comment.