Skip to content

Commit

Permalink
feat: refine scan metrics logging (#4296)
Browse files Browse the repository at this point in the history
* fix: collect scan cost in row group reader

* feat: remove log after scan

* feat: collect prepare scan cost before fetching readers

* print first poll elapsed

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: print more first poll

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
evenyag and waynexia authored Jul 5, 2024
1 parent 4f0984c commit b1219fa
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 19 deletions.
6 changes: 1 addition & 5 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
Expand Down Expand Up @@ -782,21 +782,17 @@ pub(crate) struct StreamContext {
// Metrics:
/// The start time of the query.
pub(crate) query_start: Instant,
/// Time elapsed before creating the scanner.
pub(crate) prepare_scan_cost: Duration,
}

impl StreamContext {
/// Creates a new [StreamContext].
pub(crate) fn new(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let prepare_scan_cost = query_start.elapsed();

Self {
input,
parts: Mutex::new(ScanPartList::default()),
query_start,
prepare_scan_cost,
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SeqScan {
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader =
Expand Down Expand Up @@ -247,13 +247,15 @@ impl SeqScan {
}

let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
Expand Down Expand Up @@ -287,10 +289,11 @@ impl SeqScan {
metrics.observe_metrics_on_finish();

debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll,
);
}
};
Expand Down Expand Up @@ -321,14 +324,16 @@ impl SeqScan {
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();

// build stream
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
Expand Down Expand Up @@ -375,10 +380,11 @@ impl SeqScan {
metrics.observe_metrics_on_finish();

debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll
);
}
};
Expand Down Expand Up @@ -444,7 +450,6 @@ impl fmt::Debug for SeqScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SeqScan")
.field("parts", &self.stream_ctx.parts)
.field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost)
.finish()
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ impl RegionScanner for UnorderedScan {

fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics)
.await
Expand Down Expand Up @@ -196,8 +198,8 @@ impl RegionScanner for UnorderedScan {
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
Expand All @@ -220,7 +222,6 @@ impl fmt::Debug for UnorderedScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnorderedScan")
.field("parts", &self.stream_ctx.parts)
.field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost)
.finish()
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,8 @@ impl BatchReader for ParquetReader {
return Ok(None);
};

let start = Instant::now();
// We don't collect the elapsed time if the reader returns an error.
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}

Expand All @@ -746,13 +744,11 @@ impl BatchReader for ParquetReader {
// Resets the parquet reader.
reader.reset_reader(parquet_reader);
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}
}

// The reader is exhausted.
reader.metrics.scan_cost += start.elapsed();
self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics));
Ok(None)
}
Expand Down Expand Up @@ -874,14 +870,17 @@ impl RowGroupReader {

/// Tries to fetch next [Batch] from the reader.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
self.metrics.scan_cost += scan_start.elapsed();
return Ok(Some(batch));
}

// We need to fetch next record batch and convert it to batches.
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch()? else {
self.metrics.scan_cost += scan_start.elapsed();
return Ok(None);
};
self.metrics.num_record_batches += 1;
Expand All @@ -894,6 +893,7 @@ impl RowGroupReader {
}
let batch = self.batches.pop_front();
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
self.metrics.scan_cost += scan_start.elapsed();
Ok(batch)
}

Expand Down

0 comments on commit b1219fa

Please sign in to comment.