Skip to content

Commit

Permalink
[indexer alt] rename a few structs to make their units more clear (#2…
Browse files Browse the repository at this point in the history
…0419)

## Description 

While reading/testing this code I found that it can be tough to reason
about limits of channels because each channel may have different units:
checkpoints, batches, rows. So this PR made some struct names more
explicit according to my understanding and added some comments to make
channel limits more clear.

## Test plan 

Existing tests. 
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
emmazzz authored Dec 11, 2024
1 parent 481390c commit a6a8084
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ use tracing::{debug, info};

use crate::{
metrics::IndexerMetrics,
pipeline::{CommitterConfig, Indexed, WatermarkPart},
pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart},
};

use super::{Batched, Handler};
use super::{BatchedRows, Handler};

/// Processed values that are waiting to be written to the database. This is an internal type used
/// by the concurrent collector to hold data it is waiting to send to the committer.
struct Pending<H: Handler> {
struct PendingCheckpoint<H: Handler> {
/// Values to be inserted into the database from this checkpoint
values: Vec<H::Value>,
/// The watermark associated with this checkpoint and the part of it that is left to commit
watermark: WatermarkPart,
}

impl<H: Handler> Pending<H> {
impl<H: Handler> PendingCheckpoint<H> {
/// Whether there are values left to commit from this indexed checkpoint.
fn is_empty(&self) -> bool {
let empty = self.values.is_empty();
Expand All @@ -39,7 +39,7 @@ impl<H: Handler> Pending<H> {

/// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on
/// chunk size.
fn batch_into(&mut self, batch: &mut Batched<H>) {
fn batch_into(&mut self, batch: &mut BatchedRows<H>) {
let max_chunk_rows = super::max_chunk_rows::<H>();
if batch.values.len() + self.values.len() > max_chunk_rows {
let mut for_batch = self.values.split_off(max_chunk_rows - batch.values.len());
Expand All @@ -54,8 +54,8 @@ impl<H: Handler> Pending<H> {
}
}

impl<H: Handler> From<Indexed<H>> for Pending<H> {
fn from(indexed: Indexed<H>) -> Self {
impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
fn from(indexed: IndexedCheckpoint<H>) -> Self {
Self {
watermark: WatermarkPart {
watermark: indexed.watermark,
Expand Down Expand Up @@ -84,8 +84,8 @@ impl<H: Handler> From<Indexed<H>> for Pending<H> {
pub(super) fn collector<H: Handler + 'static>(
config: CommitterConfig,
checkpoint_lag: Option<u64>,
mut rx: mpsc::Receiver<Indexed<H>>,
tx: mpsc::Sender<Batched<H>>,
mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
tx: mpsc::Sender<BatchedRows<H>>,
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> JoinHandle<()> {
Expand All @@ -96,11 +96,11 @@ pub(super) fn collector<H: Handler + 'static>(
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);

// Data for checkpoints that have been received but not yet ready to be sent to committer due to lag constraint.
let mut received: BTreeMap<u64, Indexed<H>> = BTreeMap::new();
let mut received: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
let checkpoint_lag = checkpoint_lag.unwrap_or_default();

// Data for checkpoints that are ready to be sent but haven't been written yet.
let mut pending: BTreeMap<u64, Pending<H>> = BTreeMap::new();
let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
let mut pending_rows = 0;

info!(pipeline = H::NAME, "Starting collector");
Expand All @@ -119,7 +119,7 @@ pub(super) fn collector<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.start_timer();

let mut batch = Batched::new();
let mut batch = BatchedRows::new();
while !batch.is_full() {
let Some(mut entry) = pending.first_entry() else {
break;
Expand Down Expand Up @@ -193,8 +193,8 @@ pub(super) fn collector<H: Handler + 'static>(
/// Move all checkpoints from `received` that are within the lag range into `pending`.
/// Returns the number of rows moved.
fn move_ready_checkpoints<H: Handler>(
received: &mut BTreeMap<u64, Indexed<H>>,
pending: &mut BTreeMap<u64, Pending<H>>,
received: &mut BTreeMap<u64, IndexedCheckpoint<H>>,
pending: &mut BTreeMap<u64, PendingCheckpoint<H>>,
checkpoint_lag: u64,
) -> usize {
let tip = match (received.last_key_value(), pending.last_key_value()) {
Expand Down Expand Up @@ -280,7 +280,10 @@ mod tests {

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry, Entry, Entry]));
received.insert(
i,
IndexedCheckpoint::new(0, i, 0, 0, vec![Entry, Entry, Entry]),
);
}

// With lag of 2 and tip at 5, only checkpoints 1-3 should move
Expand All @@ -300,11 +303,14 @@ mod tests {
let mut pending = BTreeMap::new();

// Add checkpoint 10 to pending to establish tip
pending.insert(10, Pending::from(Indexed::new(0, 10, 0, 0, vec![Entry])));
pending.insert(
10,
PendingCheckpoint::from(IndexedCheckpoint::new(0, 10, 0, 0, vec![Entry])),
);

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry]));
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 3 and tip at 10, checkpoints 1-7 can move
Expand All @@ -322,7 +328,7 @@ mod tests {

// Add checkpoints 8-10 to received
for i in 8..=10 {
received.insert(i, Indexed::new(0, i, 0, 0, vec![Entry]));
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 5 and tip at 10, no checkpoints can move
Expand Down Expand Up @@ -355,9 +361,9 @@ mod tests {

// Send test data
let test_data = vec![
Indexed::new(0, 1, 10, 1000, vec![Entry; part1_length]),
Indexed::new(0, 2, 20, 2000, vec![Entry; part2_length]),
Indexed::new(0, 3, 30, 3000, vec![Entry, Entry]),
IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]),
IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]),
IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]),
];

for data in test_data {
Expand Down Expand Up @@ -393,7 +399,7 @@ mod tests {
);

processor_tx
.send(Indexed::new(0, 1, 10, 1000, vec![Entry, Entry]))
.send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry]))
.await
.unwrap();

Expand Down Expand Up @@ -434,7 +440,7 @@ mod tests {
);

// Send more data than MAX_PENDING_ROWS plus collector channel buffer
let data = Indexed::new(
let data = IndexedCheckpoint::new(
0,
1,
10,
Expand All @@ -452,12 +458,12 @@ mod tests {

// Now fill up the processor channel with minimum data to trigger send blocking
for _ in 0..processor_channel_size {
let more_data = Indexed::new(0, 2, 11, 1000, vec![Entry]);
let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]);
processor_tx.send(more_data).await.unwrap();
}

// Now sending even more data should block because of MAX_PENDING_ROWS limit.
let even_more_data = Indexed::new(0, 3, 12, 1000, vec![Entry]);
let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]);

let send_result = processor_tx.try_send(even_more_data);
assert!(matches!(
Expand Down
Loading

0 comments on commit a6a8084

Please sign in to comment.