diff --git a/CHANGELOG.md b/CHANGELOG.md index ab01bf2..8e5b634 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Release 0.7.1 # + +* fixed a bug when all snapshots were going to first thread, making this thread more busy than others bottlenecking aggregation + + # Release 0.7.0 # ## Incompatible changes ## diff --git a/Cargo.toml b/Cargo.toml index e399047..702d146 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bioyino" -version = "0.7.0" +version = "0.7.1" authors = ["Sergey Noskov aka Albibek "] description = "StatsD-compatible, high-performance, fault-tolerant metric aggregator" edition = "2018" diff --git a/src/peer.rs b/src/peer.rs index 8fb5014..3fbffb6 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -96,6 +96,8 @@ impl NativeProtocolServer { let mut listener = TcpListener::bind(&listen).await?; let mut incoming = listener.incoming(); + let chlen = chans.len(); + let mut next_chan = chlen - 1; while let Some(conn) = incoming.next().await { let conn = conn?; let peer_addr = conn.peer_addr().map(|addr| addr.to_string()).unwrap_or_else(|_| "[UNCONNECTED]".into()); @@ -105,9 +107,8 @@ impl NativeProtocolServer { // debug!(log, "got new connection"); let elog = log.clone(); - let chlen = chans.len(); let mut chans = chans.clone(); - let mut next_chan = chlen - 1; + next_chan = if next_chan >= (chlen - 1) { 0 } else { next_chan + 1 }; let receiver = async move { let elog = log.clone(); @@ -409,11 +410,13 @@ mod test { //use bytes::BytesMut; use capnp::message::Builder; + use capnp_futures::serialize::write_message; use futures3::channel::mpsc::{self, Receiver}; use slog::{debug, Logger}; use tokio2::runtime::{Builder as RBuilder, Runtime}; use tokio2::time::delay_for; + use tokio_util::compat::Tokio02AsyncWriteCompatExt; use bioyino_metric::name::{MetricName, TagFormat}; use bioyino_metric::{Metric, MetricType};