-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[authority] Authority follower interface #636
Conversation
sui_core/src/authority.rs
Outdated
/// The sender to notify of new transactions | ||
/// and create batches for this authority. | ||
/// Keep as None if there is no need for this. | ||
batch_sender: Option<BatchSender>, | ||
batch_channels: Option<(BatchSender, BroadcastPair)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you actually need the initial receiver (i.e. the second element of the pair) created for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the super early review! I was under the mistaken understanding that if we drop all receivers we close the channel. But actually the channel closes if we drop all senders. So I will remove the receiver end. Nice! Thanks!
pub async fn read_data(&mut self) -> Option<Result<Vec<u8>, std::io::Error>> { | ||
let result = self.framed.next().await; | ||
// .ok_or(std::io::Error::new(std::io::ErrorKind::ConnectionReset, ""))?; | ||
result.map(|v| v.map(|w| w.to_vec())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this last 'to_vec' making a copy of the data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly yes, hence the TODO above:
// TODO: Eliminate vecs and use Byte, ByteBuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass and left a bunch of questions.
Nit: You could associate this PR with #631.
network_utils/src/network.rs
Outdated
match stream.read_data().await { | ||
Some(Ok(vec)) => Ok(Some(vec)), | ||
Some(Err(err)) => Err(err), | ||
None => Ok(None), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream.read_data().await.transpose()
Some(Err(err)) => { | ||
// We expect some EOF or disconnect error at the end. | ||
error!("Error while reading TCP stream: {}", err); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to be quite a bit more conservative in tests, where the emission of an error is well-controlled, and blow up here if the error isn't precisely what we expect.
Moreover, it would be great if this tested the specific impl MessageHandler<TcpDataStream>
we are likely to use everywhere, and in particular tested its unhappy path.
Here's one way to probe for log contents:
https://docs.rs/tracing-test/latest/tracing_test/
(logs_contain
for single-line probes, logs_assert
for multiline)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, I just moved this code, its now actually new:
https://github.com/MystenLabs/fastnft/blob/b1c7b8e001906d477107539fa417ee2bfa618c06/network_utils/src/transport.rs#L222-L228
Some(Err(err)) => { | ||
// We expect some EOF or disconnect error at the end. | ||
error!("Error while reading TCP stream: {}", err); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not just swallow every type of error in production: I believe the Framed
might still expose the io::ErrorKind
variants we expect and we should explicitly probe for the ones that are OK to swallow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just refactored and moved this from here:
https://github.com/MystenLabs/fastnft/blob/b1c7b8e001906d477107539fa417ee2bfa618c06/network_utils/src/transport.rs#L222-L228
writer: SinkSenderErr, | ||
} | ||
|
||
#[allow(clippy::type_complexity)] // appease clippy, in the tests! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a parameter in clippy.toml
you can tune for that. I am OK with type-complexity-threshold = 999999
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our CI flagged this for me.
sui_core/src/authority.rs
Outdated
while !dq_batches.is_empty() { | ||
// Get head of batches | ||
// NOTE: safe to unwrap because of check above. | ||
let current_batch = dq_batches.pop_front().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while Some(current_batch) = dq_batches.pop_front() {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point, fixed!
sui_core/src/authority.rs
Outdated
/// Handles a request for a batch info. It returns a sequence of | ||
/// [batches, transactions, batches, transactions] as UpdateItems, and a flag | ||
/// that is true if the request implies subscribing to live updates. | ||
pub async fn handle_batch_info_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a flag that is true if the request implies subscribing to live updates."
As a caller, I'd like more advice on what to do if this returned boolean is true
. An Example would do wonders here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made comment more matter of fact, and hopefully less confusing:
/// Handles a request for a batch info. It returns a sequence of
/// [batches, transactions, batches, transactions] as UpdateItems, and a flag
/// that if true indicates the request goes beyond the last batch in the
/// database.
request: BatchInfoRequest, | ||
) -> Result<(VecDeque<UpdateItem>, bool), SuiError> { | ||
// Ensure the range contains some elements and end > start | ||
if request.end <= request.start { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is the equality case really invalid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not spend too much time thinking about it -- but probably it would work and simply return the batch that contains the start number? Not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, after having a second look at this, I think this approach is legit. However, it would be good to document here and in batches_and_transactions
that the end
is non-inclusive.
@@ -617,6 +615,76 @@ impl AuthorityStore { | |||
write_batch = write_batch.insert_batch(&self.schedule, schedule_to_write)?; | |||
write_batch.write().map_err(SuiError::from) | |||
} | |||
|
|||
/// This is the function that retrieves batches including transactions within a range. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This is the function that retrieves batches including transactions within a range. | |
/// Retrieves batches including transactions within a range. |
sui_core/src/authority_server.rs
Outdated
// There was an error from the subscription service. | ||
// Note : this may be due to gaps due to the client being too | ||
// slow to consume the items. | ||
return Err(SuiError::SubscriptionServiceError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We have an error at hand here, though the use of a one-sided
if let Ok(...)
pattern doesn't let us see it. We could use it in the returned error! - If this is indeed channel buffer space exhaustion, as the comment hints, I'd love to see it in a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good idea, now I provide granular errors to guide clients.
pub trait RwChannel<'a> { | ||
type R: 'a + Stream<Item = Result<BytesMut, std::io::Error>> + Unpin + Send; | ||
type W: 'a + Sink<Bytes, Error = std::io::Error> + Unpin + Send; | ||
|
||
fn sink(&mut self) -> &mut Self::W; | ||
fn stream(&mut self) -> &mut Self::R; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I got this fully. Are you trying to do more or less the same as RwStreamSink
? A comment on this one would help review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the following comment:
/*
The RwChannel connects the low-level networking code here, that handles
TCP streams, ports, accept/connect, and sockets that provide AsyncRead /
AsyncWrite on byte streams, with the higher level logic in AuthorityServer
that handles sequences of Bytes / BytesMut, as framed messages, through
exposing a standard Stream and Sink trait.
This separation allows us to change the details of the network, transport
and framing, without changing the authority code. It also allows us to test
the authority without using a real network.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment, it definitely helps.
I note that you're managing with the Stream
and Sink
APIs explicltly. That's fine. If that was to become a bit constraining, I note there's a cottage industry of tooling that transforms from that to AsyncRead
and AsyncWrite
. I pointed at one, but they're all morally extensions of https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.into_async_read
Many thanks for the review @huitseeker -- I addressed quite a few of the comments relating to errors and docs. Some of the comments relate to code I inherited and moved around (such as error handling round client errors), and are very valid so we should keep as separate issues to fix. Also I note the call for more tests of TCP transport, but again here I added more tests, and probably we should make issue to write more about unhappy case handling. |
2ae5858
to
4034673
Compare
let mut last_batch_next_seq = 0; | ||
|
||
// Send full historical data as [Batch - Transactions - Batch - Transactions - Batch]. | ||
while let Some(current_batch) = dq_batches.pop_front() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be more efficient to loop though two into_iter()
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refactor this later, my iter-foo is not strong enough for this :)
let batches: Vec<SignedBatch> = self | ||
.batches | ||
.iter() | ||
.skip_prior_to(&start)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having trouble wrapping my head around this one..so maybe adding a doc comment on the definition of batches
explaining how it's indexed on would help. And some comment on what batches_and_transactions
is expected to return would help too.
My understanding is that batches
is indexed by the "next sequence number after the last transaction in that batch". skip_prior_to
with start
would get us to the last batch whose end is the same as (or even before) start
.
This means the result would always include a batch that ends before start
. Is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the following doc, which hopefully also answers some of the questions above:
/// Retrieves batches including transactions within a range.
///
/// This function returns all signed batches that enclose the requested transaction
/// including the batch preceeding the first requested transaction, the batch including
/// the last requested transaction (if there is one) and all batches in between.
///
/// Transactions returned include all transactions within the batch that include the
/// first requsted transaction, all the way to at least all the transactions that are
/// included in the last batch returned. If the last requested transaction is outside a
/// batch (one has not yet been generated) the function returns all transactions at the
/// end of the sequence that are in TxSequenceOrder (and ignores any that are out of
/// order.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! What is the reason to include the batch proceeding the first requested transaction?
// Get transactions in the retrieved batches. The first batch is included | ||
// without transactions, so get transactions of all subsequent batches, or | ||
// until the end of the sequence if the last batch does not contain the | ||
// requested end sequence number. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an example in case it helps:
Get transactions in the retrieved batches. The first batch is included
without transactions, so get transactions of all subsequent batches, or
until the end of the sequence if the last batch does not contain the
requested end sequence number.
So for example if we got a request for start: 3 end: 9 and we have:
B0 T0 T1 B2 T2 T3 B3 T3 T4 T5 B6 T6 T8 T9
The code below will return T2 .. T6
Note: T8 is out of order so the sequence returned ends at T6.
// Before the end of the last batch we want everything. | ||
if *seq < *in_sequence_ptr { | ||
return true; | ||
}; | ||
|
||
// After the end of the last batch we only take items in sequence. | ||
if *seq < last_seq && *seq == *in_sequence_ptr { | ||
*in_sequence_ptr += 1; | ||
return true; | ||
} | ||
|
||
// If too large or out of sequence after the last batch | ||
// we stop taking items. | ||
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this loop.
I guess it goes back to that it's unclear what batches_and_transactions
is expected to return. I would have expected it to:
- Return all batches that contain transactions with sequence number falling in the range [start, end).
- Return all transactions with sequence number falling in the range [start, end).
But I guess it's not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is 99% what you think with a small additional twist: after the transactions included in the last batch returned (i.e. transactions at the tail of the sequence that may fall outside a batch), it only returns the transactions that are in order.
This is to avoid sending back to the client some transactions with gaps, only to then get the transactions in the gap, and send it out of order. Which would confuse clients.
|
||
impl MessageHandler for AuthorityServer { | ||
fn handle_message<'a>( | ||
async fn handle_batch_streaming<'a, 'b, A>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would a client/user want to make a handle_batch_streaming
request that actually requires live subscribing?
I imagine that the user can already setup a broadcast channel with the BatchManager and has started receiving updates since a certain sequence number. To back fill, they just need to request for the range before what they just heard. Having to subscribe feels a bit strange to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client can record what the last batch received fully was, and send a request starting at the next_sequence_number
of that batch to a higher sequence number (up to the limit we set). This will result in batches - transactions - bathes - Transactions being sent back to the client.
The operation to "backfill" and listen for new batches is unified here: the call will both read from the DB (see code in handle_batch_info_request
) and then if the last requested transaction falls beyond what is in the database, will automatically register to send back live updates from the batch_manager in handle_batch_streaming
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
pub trait RwChannel<'a> { | ||
type R: 'a + Stream<Item = Result<BytesMut, std::io::Error>> + Unpin + Send; | ||
type W: 'a + Sink<Bytes, Error = std::io::Error> + Unpin + Send; | ||
|
||
fn sink(&mut self) -> &mut Self::W; | ||
fn stream(&mut self) -> &mut Self::R; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment, it definitely helps.
I note that you're managing with the Stream
and Sink
APIs explicltly. That's fine. If that was to become a bit constraining, I note there's a cottage industry of tooling that transforms from that to AsyncRead
and AsyncWrite
. I pointed at one, but they're all morally extensions of https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.into_async_read
4034673
to
c160f8a
Compare
This PR should complete the Authority side of the follower (Priority A in #194 ). Specifically in this PR:
Next PRs: