Skip to content

Commit

Permalink
add no_upload flag
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 7, 2023
1 parent e9d8fcb commit 21466a0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/storage/src/hummock/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct LocalHummockStorage {
/// Read handle.
read_version: Arc<RwLock<HummockReadVersion>>,

no_upload: bool,

/// Event sender.
event_sender: mpsc::UnboundedSender<HummockEvent>,

Expand Down Expand Up @@ -404,9 +406,11 @@ impl LocalHummockStorage {
self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));

// insert imm to uploader
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
if self.no_upload {
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
}

timer.observe_duration();

Expand Down Expand Up @@ -437,6 +441,7 @@ impl LocalHummockStorage {
table_id: option.table_id,
is_consistent_op: option.is_consistent_op,
table_option: option.table_option,
no_upload: option.no_upload,
instance_guard,
read_version,
event_sender,
Expand Down
26 changes: 26 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,16 +375,42 @@ pub struct NewLocalOptions {
/// `update` and `delete` should match the original stored value.
pub is_consistent_op: bool,
pub table_option: TableOption,

/// Used to indicate that we should not upload the data.
pub no_upload: bool,
}

impl NewLocalOptions {
pub fn new(table_id: TableId, is_consistent_op: bool, table_option: TableOption) -> Self {
NewLocalOptions {
table_id,
is_consistent_op,
table_option,
no_upload: true,
}
}

pub fn new_temporary(
table_id: TableId,
is_consistent_op: bool,
table_option: TableOption,
) -> Self {
NewLocalOptions {
table_id,
is_consistent_op,
table_option,
no_upload: false,
}
}

pub fn for_test(table_id: TableId) -> Self {
Self {
table_id,
is_consistent_op: false,
table_option: TableOption {
retention_seconds: None,
},
no_upload: false,
}
}
}
10 changes: 5 additions & 5 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ where

let table_option = TableOption::build_table_option(table_catalog.get_properties());
let local_state_store = store
.new_local(NewLocalOptions {
.new_local(NewLocalOptions::new(
table_id,
is_consistent_op,
table_option,
})
))
.await;

let pk_data_types = pk_indices
Expand Down Expand Up @@ -384,11 +384,11 @@ where
is_consistent_op: bool,
) -> Self {
let local_state_store = store
.new_local(NewLocalOptions {
.new_local(NewLocalOptions::new(
table_id,
is_consistent_op,
table_option: TableOption::default(),
})
TableOption::default(),
))
.await;

let pk_data_types = pk_indices
Expand Down

0 comments on commit 21466a0

Please sign in to comment.