Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr634 1.17 new filter (experiment no merging) #659

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
527145e
rebase with v1.17
lijunwangs Mar 17, 2024
db76d5d
Show staked vs nonstaked packets sent down
lijunwangs Mar 17, 2024
d5a5b7a
add metrics on throttled staked vs non-staked
lijunwangs Mar 17, 2024
f29d21d
fixed a clippy issue
lijunwangs Mar 17, 2024
e1725cc
added metrics on received streams
lijunwangs Mar 17, 2024
3ef19e3
changed to info level temporarily
lijunwangs Mar 17, 2024
8095673
Add per peer metrics logs
lijunwangs Mar 18, 2024
347fcb2
Give name to metrics for ease of read
lijunwangs Mar 18, 2024
0ed001e
fmt code
lijunwangs Mar 18, 2024
cb6bcca
use async mutex
lijunwangs Mar 18, 2024
8d4677f
clippy issues
lijunwangs Mar 18, 2024
ec3ecd2
fixed a merge issue
lijunwangs Mar 18, 2024
3216a3a
change max conn
lijunwangs Mar 21, 2024
203f182
test quinn dos fix
lijunwangs Mar 22, 2024
ea62de5
Try use the quinn filtering interface
lijunwangs Mar 25, 2024
4bd6b30
Fixed some deprecateed function issues
lijunwangs Mar 25, 2024
3854b12
Try filter connection from 127.0.0.1 to simulter filter attackers
lijunwangs Mar 25, 2024
a9a684a
Adjust thread count for quic server
lijunwangs Mar 25, 2024
65e5ab7
Added metrics on incoming connections
lijunwangs Mar 25, 2024
73ea8df
Adjust stream timeout
lijunwangs Mar 25, 2024
05801ac
Use rate limiter to limit the connections per second
lijunwangs Mar 26, 2024
9ad8d40
connection rate limiter
lijunwangs Mar 26, 2024
0b7d1dd
missing carg.toml
lijunwangs Mar 27, 2024
d5a10a4
add metrics on throttled connections
lijunwangs Mar 27, 2024
720cffa
Add rust version to v1.74
lijunwangs Mar 27, 2024
0529cdd
Added process_sampled_packets_count; and reduce lock holding time in …
lijunwangs Mar 28, 2024
13cc79a
turn off rate limiting
lijunwangs Mar 28, 2024
81b3855
turn on rate limiting
lijunwangs Mar 28, 2024
b724e5c
change limit to 8 connections per second
lijunwangs Mar 31, 2024
fff4e27
Drop without calling reject
lijunwangs Mar 31, 2024
8543888
rebase with https://github.com/quinn-rs/quinn/pull/1752
lijunwangs Mar 31, 2024
c83d32a
ignore excessive connection and rename some connection related variab…
lijunwangs Mar 31, 2024
dac280c
changed CONNECTIONS_LIMIT_PER_SECOND to 16 to leave some cushion
lijunwangs Apr 1, 2024
34bc67f
Use debug for throttled stream
lijunwangs Apr 1, 2024
30bcb3b
Packet batch sleep to 25 ms
lijunwangs Apr 1, 2024
bfc624d
corrected max_streams_for_connection_in_100ms to not use hardcoded co…
lijunwangs Apr 1, 2024
68ffd15
Bump version to v1.17.30 (#500)
github-actions[bot] Mar 29, 2024
5f90762
v1.17: Bump h2 (backport of #570) (#608)
mergify[bot] Apr 5, 2024
6a25959
net-utils: support SO_REUSEPORT
t-nelson Apr 5, 2024
09f4626
tpu: use multiple quic endpoints
alessandrod Apr 2, 2024
418c3c3
cluster-info: manage port range by hand...
t-nelson Apr 6, 2024
5747978
local-cluster: keep udp tpu socket around for tests
Apr 7, 2024
f71d344
Fixed unit test failures
lijunwangs Apr 7, 2024
49dd2fb
use feature all to fix build
Apr 8, 2024
6db6f95
add filtering for multi-port quic
Apr 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,431 changes: 1,281 additions & 1,150 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ generic-array = { version = "0.14.7", default-features = false }
gethostname = "0.2.3"
getrandom = "0.2.10"
goauth = "0.13.1"
governor = "0.6.3"
hex = "0.4.3"
hidapi = { version = "2.4.1", default-features = false }
histogram = "0.6.9"
Expand Down Expand Up @@ -265,8 +266,10 @@ prost-types = "0.11.9"
protobuf-src = "1.1.0"
qstring = "0.7.2"
qualifier_attr = { version = "0.2.2", default-features = false }
quinn = "0.10.2"
quinn-proto = "0.10.5"
#quinn = "0.10.2"
#quinn-proto = "0.10.5"
quinn = {git = "https://github.com/lijunwangs/quinn.git", rev = "5980492919744a8366ce74a837b64e052aff9944"}
quinn-proto = {git = "https://github.com/lijunwangs/quinn.git", rev = "5980492919744a8366ce74a837b64e052aff9944"}
quote = "1.0"
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down Expand Up @@ -296,7 +299,7 @@ sha3 = "0.10.4"
signal-hook = "0.3.17"
siphasher = "0.3.11"
smpl_jwt = "0.7.1"
socket2 = "0.5.4"
socket2 = { version= "0.5.4", features=["all"]}
soketto = "0.7"
solana_rbpf = "=0.8.0"
solana-account-decoder = { path = "account-decoder", version = "=1.17.30" }
Expand Down Expand Up @@ -365,6 +368,7 @@ solana-test-validator = { path = "test-validator", version = "=1.17.30" }
solana-thin-client = { path = "thin-client", version = "=1.17.30" }
solana-tpu-client = { path = "tpu-client", version = "=1.17.30", default-features = false }
solana-transaction-status = { path = "transaction-status", version = "=1.17.30" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.17.30" }
solana-turbine = { path = "turbine", version = "=1.17.30" }
solana-udp-client = { path = "udp-client", version = "=1.17.30" }
solana-version = { path = "version", version = "=1.17.30" }
Expand Down
8 changes: 3 additions & 5 deletions cli-output/src/display.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::cli_output::CliSignatureVerificationStatus,
base64::{prelude::BASE64_STANDARD, Engine},
chrono::{Local, NaiveDateTime, SecondsFormat, TimeZone, Utc},
chrono::{DateTime, Local, SecondsFormat, TimeZone, Utc},
console::style,
indicatif::{ProgressBar, ProgressStyle},
solana_cli_config::SettingType,
Expand Down Expand Up @@ -715,10 +715,8 @@ pub fn new_spinner_progress_bar() -> ProgressBar {
}

pub fn unix_timestamp_to_string(unix_timestamp: UnixTimestamp) -> String {
match NaiveDateTime::from_timestamp_opt(unix_timestamp, 0) {
Some(ndt) => Utc
.from_utc_datetime(&ndt)
.to_rfc3339_opts(SecondsFormat::Secs, true),
match DateTime::from_timestamp(unix_timestamp, 0) {
Some(ndt) => ndt.to_rfc3339_opts(SecondsFormat::Secs, true),
None => format!("UnixTimestamp {unix_timestamp}"),
}
}
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ solana-sdk = { workspace = true }
solana-send-transaction-service = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-version = { workspace = true }
Expand Down
26 changes: 26 additions & 0 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ impl Consumer {
.slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);

// Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes
// We assume the retryable_transaction_indexes is already sorted.
let mut retryable_idx = 0;
for (index, packet) in packets_to_process.iter().enumerate() {
if packet.original_packet().meta().is_perf_track_packet() {
if let Some(start_time) = packet.start_time() {
if retryable_idx >= retryable_transaction_indexes.len()
|| retryable_transaction_indexes[retryable_idx] != index
{
let duration = Instant::now().duration_since(*start_time);

debug!(
"Banking stage processing took {duration:?} for transaction {:?}",
packet.transaction().get_signatures().first()
);
payload
.slot_metrics_tracker
.increment_process_sampled_packets_us(duration.as_micros() as u64);
} else {
// This packet is retried, advance the retry index to the next, as the next packet's index will
// certainly be > than this.
retryable_idx += 1;
}
}
}
}
Some(retryable_transaction_indexes)
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/banking_stage/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use {
VersionedTransaction,
},
},
std::{cmp::Ordering, mem::size_of, sync::Arc},
std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant},
thiserror::Error,
};

Expand Down Expand Up @@ -45,10 +45,16 @@ pub struct ImmutableDeserializedPacket {
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
banking_stage_start_time: Option<Instant>,
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
let banking_stage_start_time = packet
.meta()
.is_perf_track_packet()
.then_some(Instant::now());

let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand All @@ -71,6 +77,7 @@ impl ImmutableDeserializedPacket {
message_hash,
is_simple_vote,
priority_details,
banking_stage_start_time,
})
}

Expand Down Expand Up @@ -114,6 +121,10 @@ impl ImmutableDeserializedPacket {
self.compute_unit_limit() >= static_builtin_cost_sum
}

pub fn start_time(&self) -> &Option<Instant> {
&self.banking_stage_start_time
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions.
pub fn build_sanitized_transaction(
Expand Down
11 changes: 11 additions & 0 deletions core/src/banking_stage/leader_slot_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,17 @@ impl LeaderSlotMetricsTracker {
);
}
}

pub(crate) fn increment_process_sampled_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
leader_slot_metrics
.timing_metrics
.process_packets_timings
.process_sampled_packets_us_hist
.increment(us)
.unwrap();
}
}
}

#[cfg(test)]
Expand Down
25 changes: 25 additions & 0 deletions core/src/banking_stage/leader_slot_timing_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ pub(crate) struct ProcessPacketsTimings {
// Time spent running the cost model in processing transactions before executing
// transactions
pub cost_model_us: u64,

// banking stage processing time histogram for sampled packets
pub process_sampled_packets_us_hist: histogram::Histogram,
}

impl ProcessPacketsTimings {
Expand All @@ -264,6 +267,28 @@ impl ProcessPacketsTimings {
i64
),
("cost_model_us", self.cost_model_us, i64),
(
"process_sampled_packets_us_90pct",
self.process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
self.process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
self.process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
self.process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ impl ThreadLocalUnprocessedPackets {
.iter()
.map(|p| (*p).clone())
.collect_vec();

let retryable_packets = if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process, payload)
{
Expand Down
56 changes: 38 additions & 18 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ fn new_server_config(cert: Certificate, key: PrivateKey) -> Result<ServerConfig,
let mut config = ServerConfig::with_crypto(Arc::new(config));
config
.transport_config(Arc::new(new_transport_config()))
.use_retry(true)
.migration(false);
Ok(config)
}
Expand Down Expand Up @@ -224,17 +223,29 @@ async fn run_server(
let stats = Arc::<RepairQuicStats>::default();
let report_metrics_task =
tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone()));
while let Some(connecting) = endpoint.accept().await {
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
remote_request_sender.clone(),
bank_forks.clone(),
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
while let Some(incoming) = endpoint.accept().await {
let remote_addr: SocketAddr = incoming.remote_address();
let connecting = incoming.accept();
match connecting {
Ok(connecting) => {
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
remote_request_sender.clone(),
bank_forks.clone(),
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
Err(error) => {
debug!(
"Error while accepting incoming connection: {error:?} from {}",
remote_addr
);
}
}
}
report_metrics_task.abort();
}
Expand Down Expand Up @@ -775,14 +786,15 @@ impl<T> From<crossbeam_channel::SendError<T>> for Error {
struct RepairQuicStats {
connect_error_invalid_remote_address: AtomicU64,
connect_error_other: AtomicU64,
connect_error_too_many_connections: AtomicU64,
connect_error_cids_exhausted: AtomicU64,
connection_error_application_closed: AtomicU64,
connection_error_connection_closed: AtomicU64,
connection_error_locally_closed: AtomicU64,
connection_error_reset: AtomicU64,
connection_error_timed_out: AtomicU64,
connection_error_transport_error: AtomicU64,
connection_error_version_mismatch: AtomicU64,
connection_error_cids_exhausted: AtomicU64,
invalid_identity: AtomicU64,
no_response_received: AtomicU64,
read_to_end_error_connection_lost: AtomicU64,
Expand Down Expand Up @@ -814,9 +826,6 @@ fn record_error(err: &Error, stats: &RepairQuicStats) {
Error::ConnectError(ConnectError::EndpointStopping) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::TooManyConnections) => {
add_metric!(stats.connect_error_too_many_connections)
}
Error::ConnectError(ConnectError::InvalidDnsName(_)) => {
add_metric!(stats.connect_error_other)
}
Expand All @@ -829,6 +838,12 @@ fn record_error(err: &Error, stats: &RepairQuicStats) {
Error::ConnectError(ConnectError::UnsupportedVersion) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::CidsExhausted) => {
add_metric!(stats.connect_error_cids_exhausted)
}
Error::ConnectionError(ConnectionError::CidsExhausted) => {
add_metric!(stats.connection_error_cids_exhausted)
}
Error::ConnectionError(ConnectionError::VersionMismatch) => {
add_metric!(stats.connection_error_version_mismatch)
}
Expand Down Expand Up @@ -903,8 +918,8 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) {
i64
),
(
"connect_error_too_many_connections",
reset_metric!(stats.connect_error_too_many_connections),
"connect_error_cids_exhausted",
reset_metric!(stats.connect_error_cids_exhausted),
i64
),
(
Expand Down Expand Up @@ -942,6 +957,11 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) {
reset_metric!(stats.connection_error_version_mismatch),
i64
),
(
"connection_error_cids_exhausted",
reset_metric!(stats.connection_error_cids_exhausted),
i64
),
(
"invalid_identity",
reset_metric!(stats.invalid_identity),
Expand Down
Loading
Loading