Skip to content

Commit

Permalink
fix: don't silently drop errors encountered in table scan file planni…
Browse files Browse the repository at this point in the history
…ng (apache#535)
  • Loading branch information
sdd authored and shaeqahmed committed Dec 9, 2024
1 parent d092304 commit b74fec5
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,26 @@ impl TableScan {
.plan_context
.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();

// Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
spawn(async move {
futures::stream::iter(manifest_file_contexts)
let result = futures::stream::iter(manifest_file_contexts)
.try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
ctx.fetch_manifest_and_stream_manifest_entries().await
})
.await
.await;

if let Err(error) = result {
let _ = channel_for_manifest_error.send(Err(error)).await;
}
});

let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();

// Process the [`ManifestEntry`] stream in parallel
spawn(async move {
manifest_entry_ctx_rx
let result = manifest_entry_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
Expand All @@ -330,7 +338,11 @@ impl TableScan {
.await
},
)
.await
.await;

if let Err(error) = result {
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
}
});

return Ok(file_scan_task_rx.boxed());
Expand Down

0 comments on commit b74fec5

Please sign in to comment.