Skip to content

Commit

Permalink
add more logs to Manifest.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Jun 20, 2023
1 parent 55b0f26 commit d5a0a53
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync {
// `SnapshotReoverer`.
#[derive(Debug, Clone)]
struct SnapshotRecoverer<LogStore, SnapshotStore> {
table_id: TableId,
space_id: SpaceId,
log_store: LogStore,
snapshot_store: SnapshotStore,
}
Expand All @@ -217,6 +219,11 @@ where
}

async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result<Snapshot> {
debug!(
"Manifest recover with prev snapshot, snapshot:{:?}, table_id:{}, space_id:{}",
prev_snapshot, self.table_id, self.space_id
);

let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq);
let mut reader = self.log_store.scan(log_start_boundary).await?;

Expand All @@ -239,6 +246,11 @@ where
}

async fn create_latest_snapshot_without_prev(&self) -> Result<Option<Snapshot>> {
debug!(
"Manifest recover without prev snapshot, table_id:{}, space_id:{}",
self.table_id, self.space_id
);

let mut reader = self.log_store.scan(ReadBoundary::Min).await?;

let mut latest_seq = SequenceNumber::MIN;
Expand All @@ -253,11 +265,19 @@ where
}

if has_logs {
debug!(
"Manifest recover with only logs, table_id:{}, space_id:{}",
self.table_id, self.space_id
);
Ok(Some(Snapshot {
end_seq: latest_seq,
data: manifest_data_builder.build(),
}))
} else {
debug!(
"Manifest recover nothing, table_id:{}, space_id:{}",
self.table_id, self.space_id
);
Ok(None)
}
}
Expand Down Expand Up @@ -474,7 +494,7 @@ impl Manifest for ManifestImpl {
}

async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> {
info!("Manifest recover, request:{:?}", load_req);
info!("Manifest recover begin, request:{:?}", load_req);

// Load table meta snapshot from storage.
let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64());
Expand All @@ -490,6 +510,8 @@ impl Manifest for ManifestImpl {
self.store.clone(),
);
let reoverer = SnapshotRecoverer {
table_id: load_req.table_id,
space_id: load_req.space_id,
log_store,
snapshot_store,
};
Expand All @@ -505,6 +527,8 @@ impl Manifest for ManifestImpl {
self.table_meta_set.apply_edit_to_table(request)?;
}

info!("Manifest recover finish, request:{:?}", load_req);

Ok(())
}

Expand Down Expand Up @@ -1386,7 +1410,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1418,7 +1443,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1446,10 +1472,14 @@ mod tests {
}

async fn recover_snapshot(
table_id: TableId,
space_id: SpaceId,
log_store: &MemLogStore,
snapshot_store: &MemSnapshotStore,
) -> Option<Snapshot> {
let recoverer = SnapshotRecoverer {
table_id,
space_id,
log_store: log_store.clone(),
snapshot_store: snapshot_store.clone(),
};
Expand Down

0 comments on commit d5a0a53

Please sign in to comment.