-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(katana): retain transactions in pool until mined #2630
base: main
Are you sure you want to change the base?
Conversation
WalkthroughOhayo, sensei! This pull request introduces significant changes to the Changes
Possibly related PRs
Suggested reviewers
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (5)
crates/katana/pipeline/src/stage/sequencing.rs (1)
58-60
: Consider adding error handling for pool operations, sensei!While the implementation is functionally correct, it might be good to add explicit error handling for pool operations to ensure graceful handling of potential issues during block production.
Consider wrapping the pool operations in a
try
block:- let service = BlockProductionTask::new(pool, miner, block_producer); - self.task_spawner.build_task().name("Block production").spawn(service) + let service = match BlockProductionTask::new(pool, miner, block_producer) { + Ok(service) => service, + Err(e) => { + error!(target: "pipeline", error = ?e, "Failed to create block production task"); + return self.task_spawner.build_task().spawn(future::err(BlockProductionError::ServiceError)); + } + }; + self.task_spawner.build_task().name("Block production").spawn(service)crates/katana/pool/src/ordering.rs (1)
Line range hint
148-180
: Consider documenting the behavioral differences, sensei! 📚While the change from
take_transactions()
topending_transactions()
is well implemented, it would be valuable to document the semantic difference between these methods, particularly how the new approach affects transaction retention in the pool. This documentation would help future contributors understand the design evolution.crates/katana/pool/src/pool.rs (3)
250-254
: Optimizeremove_transactions
withHashSet
for Better PerformanceOhayo, sensei! In
remove_transactions
, usinghashes.contains()
inside a loop can lead to O(n*m) time complexity. Convertinghashes
to aHashSet
reduces lookup time to O(1), improving performance when removing multiple transactions.Suggested change:
use std::collections::HashSet; fn remove_transactions(&self, hashes: &[TxHash]) { + let hash_set: HashSet<TxHash> = hashes.iter().cloned().collect(); let mut txs = self.inner.transactions.write(); - txs.retain(|t| !hashes.contains(&t.tx.hash())) + txs.retain(|t| !hash_set.contains(&t.tx.hash())) }
Line range hint
445-459
: Incorrect Assumptions in Test Afterpending_transactions
CallOhayo, sensei! The test assumes that calling
pending_transactions()
consumes and removes transactions from the pool. However, the new implementation does not remove them. This may cause the assertions expecting an empty pool to fail. Consider updating the test to reflect the new behavior or modifypending_transactions()
to consume transactions if intended.Options:
Update Test Assertions
Adjust the test to check that the pool still contains the transactions after calling
pending_transactions()
.Modify
PendingTransactions
to Consume TransactionsIf consuming transactions is desired, modify
PendingTransactions
to remove transactions as they are iterated over.Example modification:
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = self.get_mut(); if let Some(tx) = this.all.next() { + // Remove transaction from the pool + this.pool.inner.transactions.write().remove(&tx); Poll::Ready(Some(tx)) } else { this.subscription.poll_next_unpin(cx) } }Please choose the option that aligns with the intended behavior.
278-282
: ImplementFusedStream
forPendingTransactions
Ohayo, sensei! Implementing the
FusedStream
trait forPendingTransactions
can enhance its usability by signaling when the stream has terminated. This is useful for avoiding unnecessary polling and improving efficiency.Example implementation:
use futures::stream::FusedStream; impl<T, O> FusedStream for PendingTransactions<T, O> where T: PoolTransaction, O: PoolOrd<Transaction = T>, { fn is_terminated(&self) -> bool { self.all.len() == 0 && self.subscription.is_terminated() } }Ensure
SubscriptionBox
also implementsFusedStream
.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
crates/katana/core/src/service/mod.rs
(6 hunks)crates/katana/pipeline/src/stage/sequencing.rs
(1 hunks)crates/katana/pool/Cargo.toml
(1 hunks)crates/katana/pool/src/lib.rs
(3 hunks)crates/katana/pool/src/ordering.rs
(2 hunks)crates/katana/pool/src/pool.rs
(12 hunks)
🔇 Additional comments (10)
crates/katana/pool/Cargo.toml (1)
16-16
: Ohayo! The tokio dependency looks good, sensei!
The addition of tokio with the "sync" feature aligns well with the async transaction pool management requirements. The workspace inheritance ensures consistent versioning.
Let's verify if we need additional tokio features:
✅ Verification successful
Ohayo! The tokio "sync" feature is perfectly sufficient, sensei!
The codebase only uses tokio::sync::Notify
and has one async function, both of which are covered by the "sync" feature. No additional tokio features are needed at this time.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for tokio usage patterns that might require additional features
rg -g '*.rs' 'tokio::(spawn|time|fs|io|net)' crates/katana/pool/
Length of output: 66
Script:
#!/bin/bash
# Let's check all tokio imports and usages in the pool crate
rg -g '*.rs' '^use tokio|tokio::' crates/katana/pool/
# Also check for async functions that might need tokio runtime
rg -g '*.rs' 'async fn' crates/katana/pool/
Length of output: 268
crates/katana/pool/src/lib.rs (2)
13-14
: LGTM! Import changes align with the new transaction handling.
Ohayo! The import changes correctly reflect the transition from individual pending transactions to the new PendingTransactions
type.
47-47
:
Breaking change: Verify all consumers are updated.
The change from returning an iterator to returning PendingTransactions
directly is a breaking change that requires updates in all consuming code.
Let's verify the impact:
crates/katana/pipeline/src/stage/sequencing.rs (1)
57-57
: Ohayo! The TransactionMiner initialization looks good, sensei!
The updated initialization using both pending_transactions()
and add_listener()
aligns well with the goal of retaining transactions in the pool until they are mined. This ensures better transaction management by maintaining both the current state and future updates.
Let's verify the transaction retention behavior:
✅ Verification successful
Let me gather more information about the TransactionMiner implementation since it wasn't found in the first search.
Ohayo! The transaction retention implementation is solid, sensei!
The code shows that transactions are properly managed:
- Transactions are only removed from the pool after successful mining (in
core/src/service/mod.rs
) - The TransactionMiner correctly uses both pending transactions and listener for updates
- The pool's
remove_transactions
is only called with successfully mined transaction hashes
The implementation aligns perfectly with the goal of retaining transactions until they are mined. No premature removals or race conditions were found in the transaction handling logic.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the TransactionMiner implementation and its transaction retention logic
# Check for any potential race conditions or transaction dropping in the miner implementation
ast-grep --pattern 'impl$$$TransactionMiner$$$'
# Look for transaction removal conditions to ensure they only happen after mining
rg -A 5 "remove.*transaction"
Length of output: 2452
Script:
#!/bin/bash
# Search for TransactionMiner struct and implementation
ast-grep --pattern 'struct TransactionMiner'
# Look for transaction mining logic
rg -A 10 "impl.*TransactionMiner"
# Check how transactions are handled during mining
rg -A 10 "fn.*mine"
Length of output: 9324
crates/katana/pool/src/ordering.rs (2)
148-148
: Ohayo! The FIFO test update looks good, sensei! 🎋
The change from take_transactions()
to pending_transactions()
maintains the test's effectiveness while aligning with the new pool retention mechanism.
180-180
: The tip-based ordering test is looking sharp, sensei! ⚔️
The update to pending_transactions()
maintains the thorough verification of tip-based priority ordering while embracing the new transaction retention approach.
crates/katana/core/src/service/mod.rs (4)
35-39
: Great Enhancement with Generic Parameter O
Ohayo, sensei! The introduction of the generic parameter O
implementing PoolOrd<Transaction = ExecutableTxWithHash>
in BlockProductionTask
enhances the flexibility and modularity of the codebase. This change allows for customizable transaction ordering strategies, which is a valuable improvement.
Also applies to: 50-54, 64-68
88-88
: Efficient Removal of Mined Transactions
Ohayo, sensei! Adding this.pool.remove_transactions(&outcome.txs);
ensures that mined transactions are properly removed from the pool. This prevents duplicate processing and maintains the integrity of the transaction pool.
125-133
: Refactored TransactionMiner
Improves Transaction Handling
Ohayo, sensei! Updating TransactionMiner
to accept PendingTransactions<ExecutableTxWithHash, O>
streamlines transaction retrieval for mining. This refactoring enhances the efficiency of transaction handling by leveraging the pending transactions directly.
150-153
: Verify Complete Processing of Pending Transactions
Ohayo, sensei! With the miner now collecting transactions from self.pending_txs
, please verify that all pending transactions are being processed without any omissions. Ensuring that pending_txs
correctly streams all transactions is crucial for consistent and reliable transaction processing.
fn remove_transactions(&self, hashes: &[TxHash]); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Documentation and error handling needed for remove_transactions.
Ohayo sensei! The new method needs attention in these areas:
- Missing documentation explaining the method's purpose and behavior
- No error handling for invalid transaction hashes
- No indication of successful/failed removals
Consider this enhancement:
+ /// Removes transactions from the pool by their hashes.
+ ///
+ /// # Arguments
+ /// * `hashes` - A slice of transaction hashes to remove
+ ///
+ /// # Returns
+ /// A Result containing the number of transactions successfully removed,
+ /// or an error if any of the hashes are invalid.
- fn remove_transactions(&self, hashes: &[TxHash]);
+ fn remove_transactions(&self, hashes: &[TxHash]) -> PoolResult<usize>;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn remove_transactions(&self, hashes: &[TxHash]); | |
/// Removes transactions from the pool by their hashes. | |
/// | |
/// # Arguments | |
/// * `hashes` - A slice of transaction hashes to remove | |
/// | |
/// # Returns | |
/// A Result containing the number of transactions successfully removed, | |
/// or an error if any of the hashes are invalid. | |
fn remove_transactions(&self, hashes: &[TxHash]) -> PoolResult<usize>; |
crates/katana/pool/src/pool.rs
Outdated
fn broadcast(&self, tx: PendingTx<T, O>) { | ||
self.notify.notify_waiters(); | ||
self.txs.write().insert(tx); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Race Condition in broadcast
Method
Ohayo, sensei! In the broadcast
method, notifying waiters before inserting the transaction into txs
could lead to a race condition. Subscribers might wake up and find no transaction available. Consider inserting the transaction before notifying waiters to ensure they can retrieve it.
Here’s the suggested change:
fn broadcast(&self, tx: PendingTx<T, O>) {
+ self.txs.write().insert(tx);
self.notify.notify_waiters();
- self.txs.write().insert(tx);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn broadcast(&self, tx: PendingTx<T, O>) { | |
self.notify.notify_waiters(); | |
self.txs.write().insert(tx); | |
} | |
fn broadcast(&self, tx: PendingTx<T, O>) { | |
self.txs.write().insert(tx); | |
self.notify.notify_waiters(); | |
} |
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering> { | ||
// take all the transactions | ||
PendingTransactions { | ||
all: std::mem::take(&mut *self.inner.transactions.write()).into_iter(), | ||
subscription: self.subscribe(), | ||
all: self.inner.transactions.read().clone().into_iter(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential Performance Issue in pending_transactions
Method
Ohayo, sensei! Cloning all transactions in pending_transactions
with self.inner.transactions.read().clone()
could be inefficient for large pools. This may lead to high memory usage and slow performance. Consider iterating over references or using a more efficient data structure to avoid cloning the entire set.
Alternative approach:
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering> {
// iterate over references to avoid cloning
PendingTransactions {
subscription: self.subscribe(),
- all: self.inner.transactions.read().clone().into_iter(),
+ all: self.inner.transactions.read().iter().cloned(),
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering> { | |
// take all the transactions | |
PendingTransactions { | |
all: std::mem::take(&mut *self.inner.transactions.write()).into_iter(), | |
subscription: self.subscribe(), | |
all: self.inner.transactions.read().clone().into_iter(), | |
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering> { | |
// take all the transactions | |
PendingTransactions { | |
subscription: self.subscribe(), | |
all: self.inner.transactions.read().iter().cloned(), |
fn notify_subscribers(&self, tx: PendingTx<T, O>) { | ||
let subscribers = self.inner.subscribers.read(); | ||
for subscriber in subscribers.iter() { | ||
subscriber.broadcast(tx.clone()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Inefficient Cloning in notify_subscribers
Method
Ohayo, sensei! Cloning the transaction for each subscriber in notify_subscribers
may introduce performance overhead, especially with a large number of subscribers. To optimize, consider wrapping PendingTx<T, O>
in an Arc
to share ownership without excessive cloning.
Suggested modification:
fn notify_subscribers(&self, tx: PendingTx<T, O>) {
+ let tx = Arc::new(tx);
let subscribers = self.inner.subscribers.read();
for subscriber in subscribers.iter() {
- subscriber.broadcast(tx.clone());
+ subscriber.broadcast(Arc::clone(&tx));
}
}
And update the broadcast
method signature:
- fn broadcast(&self, tx: PendingTx<T, O>) {
+ fn broadcast(&self, tx: Arc<PendingTx<T, O>>) {
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (9)
crates/katana/pool/src/lib.rs (1)
50-50
: Ohayo! Improved method naming and return type.The rename from
take_transactions
topending_transactions
better reflects that transactions are being viewed rather than removed. The structuredPendingTransactions
return type is a good improvement over a raw iterator.This change supports better transaction retention by making the non-destructive nature of the operation explicit in the API.
crates/katana/core/src/service/mod.rs (3)
86-88
: Consider adding error handling for transaction removal, sensei!While the timing of transaction removal is correct (after successful mining), consider adding error handling for the
remove_transactions
call. Failed removals could lead to transaction duplication in future blocks.- this.pool.remove_transactions(&outcome.txs); + if let Err(e) = this.pool.remove_transactions(&outcome.txs) { + error!(target: LOG_TARGET, %e, "Failed to remove mined transactions from pool"); + // Consider if this should affect block production + }
156-157
: Remove commented code, sensei!The commented code for
pool.take_transactions()
should be removed as it's been replaced by the new implementation.
150-153
: Consider pre-allocating the transactions vector, sensei!For better performance, consider pre-allocating the transactions vector with an estimated capacity based on the pending transactions count.
- let mut transactions = Vec::new(); + let mut transactions = Vec::with_capacity(self.pending_txs.size_hint().0);crates/katana/pool/src/pool.rs (3)
37-39
: Consider using a capacity hint for subscribers Vec.Ohayo, sensei! The
subscribers
field is initialized withDefault::default()
, which creates an empty Vec. For better performance, consider initializing it with an estimated capacity if you can predict the typical number of subscribers.- subscribers: RwLock<Vec<PoolSubscription<T, O>>>, + /// subscribers for incoming txs with pre-allocated capacity + subscribers: RwLock<Vec<PoolSubscription<T, O>>>,
202-206
: Optimize transaction removal for large sets.Ohayo, sensei! The current implementation checks each hash in the input slice for every transaction in the pool. For large sets, this could be O(n*m) where n is the pool size and m is the number of hashes.
Consider using a HashSet for better performance:
fn remove_transactions(&self, hashes: &[TxHash]) { + // Convert hashes to HashSet for O(1) lookups + let hash_set: std::collections::HashSet<_> = hashes.iter().collect(); let mut txs = self.inner.transactions.write(); - txs.retain(|t| !hashes.contains(&t.tx.hash())) + txs.retain(|t| !hash_set.contains(&t.tx.hash())) }
145-146
: Track TODO for rejected transaction cache.Ohayo, sensei! There's a TODO comment about creating a cache for rejected transactions. This is important for RPC spec compliance and should be tracked.
Would you like me to help create a GitHub issue to track this enhancement? I can provide a detailed implementation plan for the rejected transaction cache.
crates/katana/pool/src/pending.rs (1)
11-12
: Fix Typographical Error in Documentation CommentOhayo, sensei! There's a repeated word in the comment. Please remove the extra 'by' to improve clarity.
Apply this diff to correct the typo:
/// an iterator that yields transactions from the pool that can be included in a block, sorted by -/// by its priority. +/// its priority.crates/katana/pool/src/subscription.rs (1)
15-18
: Ohayo, sensei! You can deriveClone
instead of implementing it manually.Since all fields within
PoolSubscription
implementClone
, you can simplify the code by derivingClone
automatically. This reduces boilerplate and improves maintainability.Apply this change:
#[derive(Debug)] +#[derive(Clone)] pub struct PoolSubscription<T, O: PoolOrd> { pub(crate) txs: Arc<RwLock<BTreeSet<PendingTx<T, O>>>>, pub(crate) notify: Arc<Notify>, } - -impl<T, O: PoolOrd> Clone for PoolSubscription<T, O> { - fn clone(&self) -> Self { - Self { txs: self.txs.clone(), notify: self.notify.clone() } - } -}
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
crates/katana/core/src/service/mod.rs
(6 hunks)crates/katana/pool/src/lib.rs
(4 hunks)crates/katana/pool/src/pending.rs
(1 hunks)crates/katana/pool/src/pool.rs
(10 hunks)crates/katana/pool/src/subscription.rs
(1 hunks)crates/katana/rpc/rpc/src/starknet/mod.rs
(4 hunks)
🔇 Additional comments (11)
crates/katana/pool/src/lib.rs (2)
4-4
: Ohayo sensei! Clean module organization and imports.
The new modules and imports align well with the PR's objective of improving transaction retention. The structure maintains good separation of concerns.
Also applies to: 6-6, 15-15
60-61
: Documentation and error handling still needed.
The concerns raised in the previous review about documentation and error handling remain valid.
crates/katana/core/src/service/mod.rs (3)
12-13
: Ohayo! Clean generic implementation for transaction ordering, sensei!
The addition of the PoolOrd
trait and PendingTransactions
type, along with the generic parameter O
in BlockProductionTask
, provides a more flexible and type-safe way to handle transaction ordering in the pool.
Also applies to: 35-39
50-59
: Clean implementation of generic constraints, sensei!
The implementation properly constrains the generic parameter O
with PoolOrd
trait and correctly handles the ExecutableTxWithHash
transaction type. The constructor signature is well-aligned with the new generic structure.
Line range hint 64-106
: Verify transaction visibility during mining process.
The implementation successfully addresses the PR objective of fixing transaction visibility issues. Let's verify the behavior:
✅ Verification successful
Transaction visibility implementation is correctly handled
Ohayo sensei! After reviewing the implementation, I can confirm that the transaction visibility during mining process is properly handled:
- Transactions are only removed from the pool via
remove_transactions
after successful mining (as seen inBlockProductionTask
) - The pool implementation uses thread-safe
RwLock
for transaction storage - The test suite confirms proper transaction lifecycle management
- Transactions remain visible in the pool until explicitly removed after mining
The implementation ensures transactions stay visible throughout the mining process until they are successfully included in a block. Keep up the excellent work, sensei! ✨
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any other transaction removal points that might affect visibility
rg -g '!target' -g '!*.md' "remove_transactions|take_transactions" .
# Look for any transaction pool tests that verify this behavior
rg -g '!target' "mod tests.*transaction.*pool" -A 10
Length of output: 542
Script:
#!/bin/bash
# Let's check the pool implementation to understand transaction removal behavior
ast-grep --pattern 'impl$_ Pool$_ {
$$$
fn remove_transactions($$$) {
$$$
}
$$$
}'
# Check for any transaction pool tests
fd "pool" --type f --exec rg -l "mod\s+tests" {}
# Look for transaction visibility related test cases
rg -g '!target' "test.*transaction.*pool|test.*pool.*transaction" -A 10
Length of output: 254
Script:
#!/bin/bash
# Let's examine the pool.rs file content for implementation details
cat ./crates/katana/pool/src/pool.rs
# Check for any transaction-related test modules
rg -g '!target' "mod tests" -A 20 ./crates/katana/pool/src/pool.rs
# Look for transaction removal implementation in the pool
rg -g '!target' -B 5 -A 10 "fn remove_transactions" ./crates/katana/pool/src/pool.rs
Length of output: 15068
crates/katana/rpc/rpc/src/starknet/mod.rs (3)
15-15
: LGTM! Import looks good, sensei!
The addition of TransactionPool
import is correctly placed and properly used in the implementation.
568-569
: Transaction status handling is perfect, sensei!
The implementation correctly returns TransactionStatus::Received
for transactions found in the pool, which accurately represents their state in the transaction lifecycle.
434-436
: Ohayo! Transaction pool fallback looks solid!
The implementation correctly checks the transaction pool as a last resort before returning a not found error, which aligns with the PR's objective to retain transactions in the pool until mined.
Let's verify the transaction conversion implementation:
✅ Verification successful
Ohayo! The transaction conversion chain is properly implemented, sensei!
The code shows a complete and safe conversion chain:
- Pool returns
Arc<ExecutableTxWithHash>
TxWithHash::from
correctly handles both owned and reference conversions fromExecutableTxWithHash
Tx::from(TxWithHash)
implements comprehensive matching for all transaction types (Invoke V1/V3, Declare V1/V2/V3, L1Handler, DeployAccount V1/V3)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify TxWithHash implementation for pool transactions
# Expect: The From trait implementation handles all transaction types
ast-grep --pattern 'impl From<$_> for TxWithHash {
$$$
}'
Length of output: 881
Script:
#!/bin/bash
# Let's verify the complete chain of conversions from pool transaction to TxWithHash
# First, check what type the pool returns
ast-grep --pattern 'struct Pool {
$$$
}'
# Then check the From implementation for Tx
ast-grep --pattern 'impl From<TxWithHash> for Tx {
$$$
}'
# Also check what tx_ref returns
ast-grep --pattern 'fn tx_ref'
# And check Transaction enum definition
ast-grep --pattern 'enum Transaction {
$$$
}'
Length of output: 13885
Script:
#!/bin/bash
# Let's check the Pool implementation to verify what it returns
rg "impl Pool" -A 5
# And check the get method specifically
ast-grep --pattern 'fn get(&self, $$$) -> $$$'
Length of output: 1211
crates/katana/pool/src/pending.rs (3)
14-17
: Struct Definition Looks Good
Ohayo, sensei! The definition of PendingTransactions
is clear and follows Rust best practices.
26-35
: Implementation of Stream
Trait is Correct
Ohayo, sensei! The poll_next
function correctly handles both the all
iterator and the subscription
stream.
37-47
: Verify Consistency Between Iterator
and Stream
Implementations
Ohayo, sensei! The Iterator
implementation only consumes from self.all.next()
and doesn't consider subscription
, while the Stream
implementation handles both all
and subscription
. Please verify if this discrepancy is intentional.
fn subscribe(&self) -> PoolSubscription<T, O> { | ||
let notify = Arc::new(Notify::new()); | ||
let subscription = PoolSubscription { notify, txs: Default::default() }; | ||
self.inner.subscribers.write().push(subscription.clone()); | ||
subscription | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add subscriber cleanup mechanism.
Ohayo, sensei! The subscription mechanism currently lacks a way to remove inactive subscribers. This could lead to memory leaks if subscribers are abandoned without proper cleanup.
Consider adding a cleanup mechanism:
+impl<T, O: PoolOrd> Drop for PoolSubscription<T, O> {
+ fn drop(&mut self) {
+ // Implement cleanup logic here
+ }
+}
fn subscribe(&self) -> PoolSubscription<T, O> {
let notify = Arc::new(Notify::new());
let subscription = PoolSubscription { notify, txs: Default::default() };
+ // Consider using weak references or a mechanism to detect and remove dead subscriptions
self.inner.subscribers.write().push(subscription.clone());
subscription
}
Committable suggestion skipped: line range outside the PR's diff.
pub(crate) fn broadcast(&self, tx: PendingTx<T, O>) { | ||
self.notify.notify_waiters(); | ||
self.txs.write().insert(tx); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure transactions are inserted before notifying to prevent race conditions.
Currently, broadcast
calls notify_waiters
before inserting the transaction into txs
. This could lead to a race condition where a waiting task is notified but doesn't find the new transaction. By inserting the transaction before notifying, you ensure that any tasks woken up can immediately retrieve it.
Apply this fix:
pub(crate) fn broadcast(&self, tx: PendingTx<T, O>) {
- self.notify.notify_waiters();
self.txs.write().insert(tx);
+ self.notify.notify_waiters();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub(crate) fn broadcast(&self, tx: PendingTx<T, O>) { | |
self.notify.notify_waiters(); | |
self.txs.write().insert(tx); | |
} | |
pub(crate) fn broadcast(&self, tx: PendingTx<T, O>) { | |
self.txs.write().insert(tx); | |
self.notify.notify_waiters(); | |
} |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.get_mut(); | ||
|
||
loop { | ||
if let Some(tx) = this.txs.write().pop_first() { | ||
return Poll::Ready(Some(tx)); | ||
} | ||
|
||
if pin!(this.notify.notified()).poll(cx).is_pending() { | ||
break; | ||
} | ||
} | ||
|
||
Poll::Pending | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Review poll_next
implementation to handle notifications correctly.
In poll_next
, creating a new future with this.notify.notified()
inside the loop may cause missed notifications, leading to tasks not waking up when transactions are available. To prevent this, store the notification future across poll calls.
Consider modifying the implementation as follows:
use futures::FutureExt;
pub struct PoolSubscription<T, O: PoolOrd> {
pub(crate) txs: Arc<RwLock<BTreeSet<PendingTx<T, O>>>>,
pub(crate) notify: Arc<Notify>,
+ notified_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl<T, O> Stream for PoolSubscription<T, O>
where
T: PoolTransaction,
O: PoolOrd<Transaction = T>,
{
type Item = PendingTx<T, O>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(tx) = this.txs.write().pop_first() {
return Poll::Ready(Some(tx));
}
+ if this.notified_fut.is_none() {
+ this.notified_fut = Some(Box::pin(this.notify.notified()));
+ }
+ match this.notified_fut.as_mut().unwrap().poll_unpin(cx) {
+ Poll::Ready(_) => {
+ this.notified_fut = None;
+ continue; // New notification received, check for transactions again
+ }
+ Poll::Pending => return Poll::Pending,
+ }
}
- Poll::Pending
}
}
This change ensures that the notification future persists across polls, preventing missed notifications and ensuring that tasks are awoken when new transactions arrive.
Committable suggestion skipped: line range outside the PR's diff.
// TODO: create a small cache for rejected transactions to respect the rpc spec | ||
// `getTransactionStatus` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we store the rejected transactions anyway? Or the cache is just for improvement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we dont store them yet,
dojo/crates/katana/pool/src/pool.rs
Lines 147 to 150 in 942f6f1
ValidationOutcome::Invalid { error, .. } => { | |
warn!(hash = format!("{hash:#x}"), "Invalid transaction."); | |
Err(PoolError::InvalidTransaction(Box::new(error))) | |
} |
they are dropped instantly here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meaning, the getTransactionStatus
endpoint can never return a tx status that is REJECTED
every block interval, the node would take transactions from the pool - removing it directly from the pool. this creates a small window (depending on the machine) that the transaction appears nonexistent. this is due to how the tx flows from the pool and executor. this applies for both instant and interval block production mode.
for instant mining, the window is between tx being picked up from the pool and the tx being committed to db. while for interval, tx being picked up from the pool and the tx being inserted into the pending block.
when a tx is being queried thru the rpc, the node will first check if the it exist in the db, else find in the pending block (if interval mode). this pr adds a new (last) step, which is to try finding the tx in the pool if it doesn't exist anywhere else.
Summary by CodeRabbit
New Features
PendingTransactions
andPoolSubscription
structures for better handling of pending transactions.Bug Fixes
Documentation
tokio
for asynchronous capabilities.Refactor