-
Notifications
You must be signed in to change notification settings - Fork 0
Flow to relay and advertise transactions
- When a new tx is relayed by a peer it is advertised with a 'inv' message which comes to a zebra node.
- The zebra node processes the Message::Inv in either Handler::process_message() or Handler::handle_message_as_request depending on which one from two futures completes first. (It looks like when komodod relays a newly sent tx, it normally comes to handle_message_as_request() first - for Either::Left pattern)
- When this is the handle_message_as_request() then Request::AdvertiseTransactionIds is created in this func and svc (inbound_service) is called in drive_peer_request().
- The Request::AdvertiseTransactionIds is processed in Inbound::call (It seems name 'AdvertiseTransactionIds' used here is misleading because there exists a different path with the same AdvertiseTransactionIds name for real advertising - see below). Here a mempool::Request::Queue request is created for this txid (note that on startup the mempool may be in ActiveState::Disabled state and the Queue request will not be processed).
- If the mempool is in Enabled state the txid is checked for dups in should_download_or_verify() and then a future is created to download this tx in tx_downloads.download_if_needed_and_verify().
- when the tx is received it is added to the mempool by tx::Request::Mempool request.
If a txid from 'inv' was not downloaded inside handle_message_as_request() (because the mempool was in Disabled state) then it will be received later by crawler which periodically initiates 'inv' requests to remote peers and then all remote peer mempool txns will be received:
- crawl_transactions() runs in a loop, in which Request::MempoolTransactionIds requests to peer_set are created
- a selected peer object from peer_set in handle_client_request() processes Request::MempoolTransactionIds and sends Message::Mempool ("mempool") p2p message to a remote peer
- received txids from the Response::TransactionIds responses are passed to handle_response() function where queue_transactions() is called for each txid, which calls mempool::Request::Queue that works as above starting transaction downloads.
In Peer Connection<>::run() function a state machine is run to process remote messages and correlate them to internal requests. So it handles both internal requests that are sent as messages to remote nodes and incoming messages that may be responses to some requests or not. The self.state changes according to internal requests and processed messages. There are three states:
- if state is State::AwaitingRequest - there is no internal request and incoming messages are processed just as themselves in handle_message_as_request() where for an incoming message a corresponding internal request is determined to process the message and forwarded to the Inbound service inside drive_peer_request(),
- if state is State::AwaitingResponse and handler is a 'request' (not Handler::Finished) - both the handler's state and received message are processed in process_messages() and if they matched handler is switched to Handler::Finished and received message is stored as the response to the request.
- if state is State::AwaitingResponse and handler is Handler::Finished - the response received from a remote node's is sent with tx.send() to the oneshot channel listened in client.rs call (also see crawler sample below). Then the state changes to the initial State::AwaitingRequest.
Some thread with a loop, f.e. crawler periodically requests remote nodes' mempool:
- crawler creates a request like Request::MempoolTransactionIds and a future with peer_set.call() which is defined in set.rs.
- In set.rs call() the request is routed over route_broadcast() or route_p2c(). In those functions a peer is selected and the request is passed to peer's call() defined in client.rs (see svc.call() in route_p2c()).
- In client.rs call() the request is sent over the ClientRequest channel (or stream). The receiver part of this channel listens in connection.rs run() function - see self.client_rx.next() call in AwaitingRequest match branch. Also a oneshot tx/rx channel for the response is created and the tx part is passed in the ClientRequest call.
- Then the request is sent to handle_client_request() function where a corresponding p2p message (Message::Mempool in this case) is sent to a remote node and a new handler is created with the corresponding response type (in this case Handler::MempoolTransactionIds). The state is changed to AwaitingResponse.
- the response is sent with tx.send() in the branch with Handler::Finished.
- the response is received in client.rs call() in the rx part of the oneshot channel
- crawler's awaits the request future and passes the response into handle_response() where received txids are queued.
- A list of tx_downloads futures in the mempool's ActiveState::Enabled struct is periodically polled and when one is ready (a tx is received) then the transaction_sender.send() channel part is called to notify mempool_transaction_receiver (receiver) channel part which is running in a loop in gossip_mempool_transaction_id()
- when the channel's receiver.changed() it creates a zn::Request::AdvertiseTransactionIds request to broadcast txid over peers and passed to the broadcast_network (which is the peer_set wrapped as Timeout)
- The Request::AdvertiseTransactionIds request is processed in PeerSet<>::call and sent with route_broadcast() to peer objects
- Per object's Peer::handle_client_request() processes that Request::AdvertiseTransactionIds and sends 'Message::Inv' with txid to a remote peer.
When I started a zebra node, it connects to a komodod peer and I sent a new tx on the komodod node but it does not appear at once in the zebra mempool. Why? This is because zebra mempool may be in ActiveState::Disabled and 'inv' relayed messages are not processed yet. But in a few minutes crawler would request mempool state from remote nodes and the local mempool is filled in. But if a zebra node already runs some time after start it gets a tx into mempool at the same moment when the tx is sent on a remote node.
2023-01-12T16:02:52.125171Z INFO {zebrad="d638791" net="Test"}: zebrad::components::mempool::crawler: in crawler.rs crawl_transactions() crawler sends Request::MempoolTransactionIds requests
2023-01-12T16:02:52.125751Z INFO {zebrad="d638791" net="Test"}: zebra_network::peer_set::set: entered set.rs call() with req=MempoolTransactionIds
2023-01-12T16:02:52.125811Z INFO {zebrad="d638791" net="Test"}: zebra_network::peer::client: entered client.rs call(), will server_tx.try_send() ClientRequest with request=MempoolTransactionIds
2023-01-12T16:02:52.126071Z INFO {zebrad="d638791" net="Test"}:{peer=Out("127.0.0.1:17770")}: zebra_network::peer::connection: in connection.rs run() got req from client_rx, will enter handle_client_request() req=InProgressClientRequest { request: MempoolTransactionIds, tx: MustUseClientResponseSender { tx: Some(Sender { complete: false }), missing_inv: None }, span: Span { name: "", level: Level(Error), target: "zebrad::application", id: Id(1), module_path: "zebrad::application", line: 397, file: "zebrad/src/application.rs" } }
2023-01-12T16:02:52.127444Z INFO {zebrad="d638791" net="Test"}: zebra_network::peer::connection: in process_message() got Message::Inv with items=[Tx(transaction::Hash("75979857f0eb0da0351e510d8edb7f1df1ca1f8f1820b727fec4e21e252e8d74"))]
2023-01-12T16:02:52.127564Z INFO {zebrad="d638791" net="Test"}:{peer=Out("127.0.0.1:17770")}: zebra_network::peer::connection: in connections.rs run() within match's branch for State::AwaitingResponse and Handler::Finished tx.send() sends response=Ok(TransactionIds([Legacy(transaction::Hash("75979857f0eb0da0351e510d8edb7f1df1ca1f8f1820b727fec4e21e252e8d74"))]))
2023-01-12T16:02:52.127618Z INFO {zebrad="d638791" net="Test"}:{peer=Out("127.0.0.1:17770")}: zebra_network::peer::client: in client.rs MustUseClientResponseSender::send() sends response over oneshot channel response=Ok(TransactionIds([Legacy(transaction::Hash("75979857f0eb0da0351e510d8edb7f1df1ca1f8f1820b727fec4e21e252e8d74"))]))
2023-01-12T16:02:52.127935Z INFO {zebrad="d638791" net="Test"}: zebra_network::peer::client: received response in client.rs call() oneshot_recv_result=Ok(Ok(TransactionIds([Legacy(transaction::Hash("75979857f0eb0da0351e510d8edb7f1df1ca1f8f1820b727fec4e21e252e8d74"))])))
2023-01-12T16:02:52.128025Z INFO {zebrad="d638791" net="Test"}: zebrad::components::mempool::crawler: crawl_transactions() crawler received result for Request::MempoolTransactionIds result=Ok(TransactionIds([Legacy(transaction::Hash("75979857f0eb0da0351e510d8edb7f1df1ca1f8f1820b727fec4e21e252e8d74"))]))