diff --git a/Cargo.lock b/Cargo.lock index 24230bde83..c8e331b0f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "snafu 0.6.10", "table_engine", "table_kv", + "tempfile", "time_ext", "tokio", "toml_ext", diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index 843e2fc737..031d072396 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -54,6 +54,7 @@ size_ext = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } table_kv = { workspace = true } +tempfile = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } toml_ext = { workspace = true } @@ -63,6 +64,7 @@ zstd = { workspace = true } [dev-dependencies] criterion = { workspace = true } +tempfile = { workspace = true } [[bench]] name = "bench" diff --git a/src/benchmarks/bench.toml b/src/benchmarks/bench.toml index 9e9e033115..b76f779f9b 100644 --- a/src/benchmarks/bench.toml +++ b/src/benchmarks/bench.toml @@ -64,3 +64,7 @@ bench_measurement_time = "60s" bench_sample_size = 60 batch_size = 512 value_size = 1024 + +[replay_bench] +bench_measurement_time = "3s" +bench_sample_size = 10 \ No newline at end of file diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs index 7f5f534758..cb5d76ed99 100644 --- a/src/benchmarks/benches/bench.rs +++ b/src/benchmarks/benches/bench.rs @@ -17,13 +17,14 @@ //! Benchmarks -use std::sync::Once; +use std::{cell::RefCell, sync::Once}; use benchmarks::{ config::{self, BenchConfig}, merge_memtable_bench::MergeMemTableBench, merge_sst_bench::MergeSstBench, parquet_bench::ParquetBench, + replay_bench::ReplayBench, scan_memtable_bench::ScanMemTableBench, sst_bench::SstBench, wal_write_bench::WalWriteBench, @@ -208,6 +209,24 @@ fn bench_wal_write(c: &mut Criterion) { group.finish(); } +fn bench_replay_iter(b: &mut Bencher<'_>, bench: &RefCell) { + let mut bench = bench.borrow_mut(); + b.iter(|| bench.run_bench()) +} + +fn bench_replay(c: &mut Criterion) { + let config = init_bench(); + + let mut group = c.benchmark_group("replay"); + + group.measurement_time(config.replay_bench.bench_measurement_time.0); + group.sample_size(config.replay_bench.bench_sample_size); + + let bench = RefCell::new(ReplayBench::new(config.replay_bench)); + group.bench_with_input(BenchmarkId::new("replay", 0), &bench, bench_replay_iter); + group.finish(); +} + criterion_group!( name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); @@ -217,6 +236,7 @@ criterion_group!( bench_scan_memtable, bench_merge_memtable, bench_wal_write, + bench_replay, ); criterion_main!(benches); diff --git a/src/benchmarks/config/bench.toml b/src/benchmarks/config/bench.toml index 41a08af770..ad66fdeaa1 100644 --- a/src/benchmarks/config/bench.toml +++ b/src/benchmarks/config/bench.toml @@ -71,3 +71,8 @@ bench_measurement_time = "60s" bench_sample_size = 60 batch_size = 512 value_size = 1024 + +[replay_bench] +bench_measurement_time = "3s" +bench_sample_size = 10 +batch_size = 10000 diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs index b90a7cb57f..493eb7c644 100644 --- a/src/benchmarks/src/config.rs +++ b/src/benchmarks/src/config.rs @@ -38,6 +38,7 @@ pub struct BenchConfig { pub scan_memtable_bench: ScanMemTableBenchConfig, pub merge_memtable_bench: MergeMemTableBenchConfig, pub wal_write_bench: WalWriteBenchConfig, + pub replay_bench: ReplayConfig, } // TODO(yingwen): Maybe we can use layze static to load config first. @@ -147,3 +148,10 @@ pub struct WalWriteBenchConfig { pub batch_size: usize, pub value_size: usize, } + +#[derive(Deserialize)] +pub struct ReplayConfig { + pub bench_measurement_time: ReadableDuration, + pub bench_sample_size: usize, + pub batch_size: usize, +} diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs index 21142dd25f..ffc098c549 100644 --- a/src/benchmarks/src/lib.rs +++ b/src/benchmarks/src/lib.rs @@ -23,9 +23,11 @@ pub mod config; pub mod merge_memtable_bench; pub mod merge_sst_bench; pub mod parquet_bench; +pub mod replay_bench; pub mod scan_memtable_bench; pub mod sst_bench; pub mod sst_tools; +pub mod table; pub mod util; pub mod wal_write_bench; diff --git a/src/benchmarks/src/replay_bench.rs b/src/benchmarks/src/replay_bench.rs new file mode 100644 index 0000000000..bf2c9a8810 --- /dev/null +++ b/src/benchmarks/src/replay_bench.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Replay bench. + +use std::sync::Arc; + +use analytic_engine::RecoverMode; +use runtime::Runtime; +use util::{OpenTablesMethod, RocksDBEngineBuildContext, TestContext, TestEnv}; +use wal::rocksdb_impl::manager::RocksDBWalsOpener; + +use crate::{config::ReplayConfig, table::FixedSchemaTable, util}; + +pub struct ReplayBench { + runtime: Arc, + test_ctx: TestContext, + table: FixedSchemaTable, + batch_size: usize, +} + +impl ReplayBench { + pub fn new(config: ReplayConfig) -> Self { + let runtime = util::new_runtime(1); + let engine_context = RocksDBEngineBuildContext::new( + RecoverMode::TableBased, + OpenTablesMethod::WithOpenShard, + ); + let env: TestEnv = TestEnv::builder().build(); + + let (test_ctx, fixed_schema_table) = env.block_on(async { + let mut test_ctx = env.new_context(&engine_context); + test_ctx.open().await; + + let fixed_schema_table = test_ctx + .create_fixed_schema_table("test_replay_table1") + .await; + let _ = test_ctx + .create_fixed_schema_table("test_replay_table2") + .await; + let _ = test_ctx + .create_fixed_schema_table("test_replay_table3") + .await; + + (test_ctx, fixed_schema_table) + }); + + ReplayBench { + runtime: Arc::new(runtime), + test_ctx, + table: fixed_schema_table, + batch_size: config.batch_size, + } + } + + pub fn run_bench(&mut self) { + self.runtime.block_on(async { + self.table.prepare_write_requests(self.batch_size); + let rows = self.table.row_tuples(); + + // Write data to table. + let mut table_names = Vec::new(); + for (table_name, _) in self.test_ctx.name_to_tables().iter() { + let row_group = self.table.rows_to_row_group(&rows); + self.test_ctx + .write_to_table(table_name.as_str(), row_group) + .await; + table_names.push(table_name.clone()); + } + + // Reopen db. + self.test_ctx + .reopen_with_tables( + table_names + .iter() + .map(|s| s.as_str()) + .collect::>() + .as_slice(), + ) + .await; + }); + } +} diff --git a/src/benchmarks/src/table.rs b/src/benchmarks/src/table.rs new file mode 100644 index 0000000000..31df4234f9 --- /dev/null +++ b/src/benchmarks/src/table.rs @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utils to create table. + +use std::collections::HashMap; + +use common_types::{ + column_schema, + datum::{Datum, DatumKind}, + row::{Row, RowGroup}, + schema::{self, Schema}, + table::DEFAULT_SHARD_ID, + time::Timestamp, +}; +use table_engine::{ + self, + engine::{CreateTableParams, CreateTableRequest, TableState}, + table::{SchemaId, TableId, TableSeq}, +}; +use time_ext::ReadableDuration; + +use crate::util::start_ms; + +pub fn new_row_6(data: (C0, C1, C2, C3, C4, C5)) -> Row +where + C0: Into, + C1: Into, + C2: Into, + C3: Into, + C4: Into, + C5: Into, +{ + let cols = vec![ + data.0.into(), + data.1.into(), + data.2.into(), + data.3.into(), + data.4.into(), + data.5.into(), + ]; + + Row::from_datums(cols) +} + +pub type WriteRequestTuple = (String, Timestamp, String, f64, f64, String); +pub type RowTuple<'a> = (&'a str, Timestamp, &'a str, f64, f64, &'a str); + +pub fn new_table_id(schema_id: u16, table_seq: u32) -> TableId { + TableId::with_seq(SchemaId::from(schema_id), TableSeq::from(table_seq)).unwrap() +} + +pub struct RowTupleGenerator {} + +pub struct FixedSchemaTable { + create_request: CreateTableRequest, + write_requests: Vec, +} + +impl FixedSchemaTable { + pub fn builder() -> Builder { + Builder::default() + } + + fn default_schema() -> Schema { + Self::default_schema_builder().build().unwrap() + } + + pub fn default_schema_builder() -> schema::Builder { + create_schema_builder( + // Key columns + &[("key", DatumKind::String), ("ts", DatumKind::Timestamp)], + // Normal columns + &[ + ("string_tag", DatumKind::String), + ("double_field1", DatumKind::Double), + ("double_field2", DatumKind::Double), + ("string_field2", DatumKind::String), + ], + ) + } + + #[inline] + pub fn table_id(&self) -> TableId { + self.create_request.table_id + } + + #[inline] + pub fn create_request(&self) -> &CreateTableRequest { + &self.create_request + } + + fn new_row(data: RowTuple) -> Row { + new_row_6(data) + } + + pub fn rows_to_row_group(&self, data: &[RowTuple]) -> RowGroup { + let rows = data + .iter() + .copied() + .map(FixedSchemaTable::new_row) + .collect(); + + self.new_row_group(rows) + } + + fn new_row_group(&self, rows: Vec) -> RowGroup { + RowGroup::try_new(self.create_request.params.table_schema.clone(), rows).unwrap() + } + + pub fn prepare_write_requests(&mut self, batch_size: usize) { + let start_ms = start_ms(); + self.write_requests.clear(); + (0..batch_size).for_each(|idx| { + self.write_requests.push(( + format!("key_{idx}"), + Timestamp::new(start_ms + idx as i64), + format!("tag1_{idx}"), + 11.0, + 110.0, + format!("tag2_{idx}"), + )) + }); + } + + pub fn row_tuples(&self) -> Vec { + self.write_requests + .iter() + .map(|x| (x.0.as_str(), x.1, x.2.as_str(), x.3, x.4, x.5.as_str())) + .collect() + } +} + +#[must_use] +pub struct Builder { + create_request: CreateTableRequest, +} + +impl Builder { + pub fn schema_id(mut self, schema_id: SchemaId) -> Self { + self.create_request.schema_id = schema_id; + self + } + + pub fn table_name(mut self, table_name: String) -> Self { + self.create_request.params.table_name = table_name; + self + } + + pub fn table_id(mut self, table_id: TableId) -> Self { + self.create_request.table_id = table_id; + self + } + + pub fn enable_ttl(mut self, enable_ttl: bool) -> Self { + self.create_request.params.table_options.insert( + common_types::OPTION_KEY_ENABLE_TTL.to_string(), + enable_ttl.to_string(), + ); + self + } + + pub fn ttl(mut self, duration: ReadableDuration) -> Self { + self.create_request + .params + .table_options + .insert(common_types::TTL.to_string(), duration.to_string()); + self + } + + pub fn build_fixed(self) -> FixedSchemaTable { + FixedSchemaTable { + create_request: self.create_request, + write_requests: Vec::new(), + } + } +} + +impl Default for Builder { + fn default() -> Self { + let params = CreateTableParams { + catalog_name: "horaedb".to_string(), + schema_name: "public".to_string(), + table_name: "test_table".to_string(), + table_schema: FixedSchemaTable::default_schema(), + partition_info: None, + engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + table_options: HashMap::new(), + }; + + Self { + create_request: CreateTableRequest { + params, + schema_id: SchemaId::from_u32(2), + table_id: new_table_id(2, 1), + state: TableState::Stable, + shard_id: DEFAULT_SHARD_ID, + }, + } + } +} + +// Format of input slice: &[ ( column name, column type ) ] +pub fn create_schema_builder( + key_tuples: &[(&str, DatumKind)], + normal_tuples: &[(&str, DatumKind)], +) -> schema::Builder { + assert!(!key_tuples.is_empty()); + + let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + normal_tuples.len()) + .auto_increment_column_id(true) + .primary_key_indexes((0..key_tuples.len()).collect()); + + for tuple in key_tuples { + // Key column is not nullable. + let column_schema = column_schema::Builder::new(tuple.0.to_string(), tuple.1) + .is_nullable(false) + .build() + .expect("Should succeed to build key column schema"); + schema_builder = schema_builder.add_key_column(column_schema).unwrap(); + } + + for tuple in normal_tuples { + let column_schema = column_schema::Builder::new(tuple.0.to_string(), tuple.1) + .is_nullable(true) + .build() + .expect("Should succeed to build normal column schema"); + schema_builder = schema_builder.add_normal_column(column_schema).unwrap(); + } + + schema_builder +} diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs index 188081d8a1..cb6d8de967 100644 --- a/src/benchmarks/src/util.rs +++ b/src/benchmarks/src/util.rs @@ -17,10 +17,11 @@ //! Utilities. -use std::sync::Arc; +use std::{collections::HashMap, future::Future, sync::Arc}; use analytic_engine::{ memtable::{key::KeySequence, MemTableRef, PutContext}, + setup::{EngineBuilder, TableEngineContext}, space::SpaceId, sst::{ factory::{ @@ -34,19 +35,42 @@ use analytic_engine::{ }, table::sst_util, table_options::StorageFormat, + Config, RecoverMode, }; use bytes_ext::{BufMut, SafeBufMut}; use common_types::{ projected_schema::{ProjectedSchema, RowProjectorBuilder}, + record_batch::RecordBatch, + row::RowGroup, schema::{IndexInWriterSchema, Schema}, + table::{ShardId, DEFAULT_SHARD_ID}, + time::Timestamp, }; +use futures::stream::StreamExt; use macros::define_result; -use object_store::{ObjectStoreRef, Path}; +use object_store::{ + config::{LocalOptions, ObjectStoreOptions, StorageOptions}, + ObjectStoreRef, Path, +}; use parquet::file::footer; -use runtime::Runtime; +use runtime::{PriorityRuntime, Runtime}; +use size_ext::ReadableSize; use snafu::{ResultExt, Snafu}; -use table_engine::{predicate::Predicate, table::TableId}; -use wal::log_batch::Payload; +use table_engine::{ + engine::{CreateTableRequest, EngineRuntimes, OpenShardRequest, TableDef, TableEngineRef}, + predicate::Predicate, + table::{ReadRequest, SchemaId, TableId, TableRef, WriteRequest}, +}; +use tempfile::TempDir; +use time_ext::ReadableDuration; +use wal::{ + config::{Config as WalConfig, StorageConfig}, + log_batch::Payload, + manager::{OpenedWals, WalRuntimes, WalsOpener}, + rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener}, +}; + +use crate::{table, table::FixedSchemaTable}; #[derive(Debug, Snafu)] pub enum Error { @@ -248,3 +272,392 @@ impl<'a> From<&'a Vec> for WritePayload<'a> { Self(data) } } + +const DAY_MS: i64 = 24 * 60 * 60 * 1000; +/// 3 days ago. +pub fn start_ms() -> i64 { + Timestamp::now().as_i64() - 3 * DAY_MS +} +#[derive(Clone, Copy, Debug)] +pub enum OpenTablesMethod { + WithOpenTable, + WithOpenShard, +} + +pub struct TestEnv { + _dir: TempDir, + pub config: Config, + pub runtimes: Arc, +} + +pub struct Builder { + num_workers: usize, +} + +impl Builder { + pub fn build(self) -> TestEnv { + let dir = tempfile::tempdir().unwrap(); + + let config = Config { + storage: StorageOptions { + mem_cache_capacity: ReadableSize::mb(0), + mem_cache_partition_bits: 0, + disk_cache_dir: "".to_string(), + disk_cache_capacity: ReadableSize::mb(0), + disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, + object_store: ObjectStoreOptions::Local(LocalOptions { + data_dir: dir.path().to_str().unwrap().to_string(), + }), + }, + wal: WalConfig { + storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig { + data_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + })), + disable_data: false, + }, + ..Default::default() + }; + + let runtime = Arc::new( + runtime::Builder::default() + .worker_threads(self.num_workers) + .enable_all() + .build() + .unwrap(), + ); + + TestEnv { + _dir: dir, + config, + runtimes: Arc::new(EngineRuntimes { + read_runtime: PriorityRuntime::new(runtime.clone(), runtime.clone()), + write_runtime: runtime.clone(), + meta_runtime: runtime.clone(), + compact_runtime: runtime.clone(), + default_runtime: runtime.clone(), + io_runtime: runtime, + }), + } + } +} + +impl Default for Builder { + fn default() -> Self { + Self { num_workers: 2 } + } +} + +pub trait EngineBuildContext: Clone + Default { + type WalsOpener: WalsOpener; + + fn wals_opener(&self) -> Self::WalsOpener; + fn config(&self) -> Config; + fn open_method(&self) -> OpenTablesMethod; +} + +pub struct RocksDBEngineBuildContext { + config: Config, + open_method: OpenTablesMethod, +} + +impl RocksDBEngineBuildContext { + pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self { + let mut context = Self::default(); + context.config.recover_mode = mode; + context.open_method = open_method; + + context + } +} + +impl Default for RocksDBEngineBuildContext { + fn default() -> Self { + let dir = tempfile::tempdir().unwrap(); + + let config = Config { + storage: StorageOptions { + mem_cache_capacity: ReadableSize::mb(0), + mem_cache_partition_bits: 0, + disk_cache_dir: "".to_string(), + disk_cache_capacity: ReadableSize::mb(0), + disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, + object_store: ObjectStoreOptions::Local(LocalOptions { + data_dir: dir.path().to_str().unwrap().to_string(), + }), + }, + wal: WalConfig { + storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig { + data_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + })), + disable_data: false, + }, + ..Default::default() + }; + + Self { + config, + open_method: OpenTablesMethod::WithOpenTable, + } + } +} + +impl Clone for RocksDBEngineBuildContext { + fn clone(&self) -> Self { + let mut config = self.config.clone(); + + let dir = tempfile::tempdir().unwrap(); + let storage = StorageOptions { + mem_cache_capacity: ReadableSize::mb(0), + mem_cache_partition_bits: 0, + disk_cache_dir: "".to_string(), + disk_cache_capacity: ReadableSize::mb(0), + disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, + object_store: ObjectStoreOptions::Local(LocalOptions { + data_dir: dir.path().to_str().unwrap().to_string(), + }), + }; + + config.storage = storage; + config.wal = WalConfig { + storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig { + data_dir: dir.path().to_str().unwrap().to_string(), + ..Default::default() + })), + disable_data: false, + }; + Self { + config, + open_method: self.open_method, + } + } +} + +impl EngineBuildContext for RocksDBEngineBuildContext { + type WalsOpener = RocksDBWalsOpener; + + fn wals_opener(&self) -> Self::WalsOpener { + RocksDBWalsOpener + } + + fn config(&self) -> Config { + self.config.clone() + } + + fn open_method(&self) -> OpenTablesMethod { + self.open_method + } +} + +pub struct TestContext { + config: Config, + wals_opener: T, + runtimes: Arc, + engine: Option, + opened_wals: Option, + schema_id: SchemaId, + last_table_seq: u32, + + name_to_tables: HashMap, +} + +impl TestEnv { + pub fn builder() -> Builder { + Builder::default() + } + + pub fn new_context( + &self, + build_context: &T, + ) -> TestContext { + let config = build_context.config(); + let wals_opener = build_context.wals_opener(); + + TestContext { + config, + wals_opener, + runtimes: self.runtimes.clone(), + engine: None, + opened_wals: None, + schema_id: SchemaId::from_u32(100), + last_table_seq: 1, + name_to_tables: HashMap::new(), + } + } + + pub fn block_on(&self, future: F) -> F::Output { + self.runtimes.default_runtime.block_on(future) + } +} + +impl TestContext { + pub async fn open(&mut self) { + let opened_wals = if let Some(opened_wals) = self.opened_wals.take() { + opened_wals + } else { + self.wals_opener + .open_wals( + &self.config.wal, + WalRuntimes { + read_runtime: self.runtimes.read_runtime.high().clone(), + write_runtime: self.runtimes.write_runtime.clone(), + default_runtime: self.runtimes.default_runtime.clone(), + }, + ) + .await + .unwrap() + }; + + let engine_builder = EngineBuilder { + config: &self.config, + engine_runtimes: self.runtimes.clone(), + opened_wals: opened_wals.clone(), + }; + self.opened_wals = Some(opened_wals); + + let TableEngineContext { table_engine, .. } = engine_builder.build().await.unwrap(); + self.engine = Some(table_engine); + } + + pub async fn create_fixed_schema_table(&mut self, table_name: &str) -> FixedSchemaTable { + let fixed_schema_table = FixedSchemaTable::builder() + .schema_id(self.schema_id) + .table_name(table_name.to_string()) + .table_id(self.next_table_id()) + .ttl("7d".parse::().unwrap()) + .build_fixed(); + + self.create_table(fixed_schema_table.create_request().clone()) + .await; + + fixed_schema_table + } + + fn next_table_id(&mut self) -> TableId { + self.last_table_seq += 1; + table::new_table_id(2, self.last_table_seq) + } + + async fn create_table(&mut self, create_request: CreateTableRequest) { + let table_name = create_request.params.table_name.clone(); + let table = self.engine().create_table(create_request).await.unwrap(); + + self.name_to_tables.insert(table_name.to_string(), table); + } + + #[inline] + pub fn engine(&self) -> &TableEngineRef { + self.engine.as_ref().unwrap() + } + + pub async fn write_to_table(&self, table_name: &str, row_group: RowGroup) { + let table = self.table(table_name); + + table.write(WriteRequest { row_group }).await.unwrap(); + } + + pub fn table(&self, table_name: &str) -> TableRef { + self.name_to_tables.get(table_name).cloned().unwrap() + } + + pub async fn read_table( + &self, + table_name: &str, + read_request: ReadRequest, + ) -> Vec { + let table = self.table(table_name); + + let mut stream = table.read(read_request).await.unwrap(); + let mut record_batches = Vec::new(); + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + + record_batches.push(batch); + } + + record_batches + } + + pub async fn partitioned_read_table( + &self, + table_name: &str, + read_request: ReadRequest, + ) -> Vec { + let table = self.table(table_name); + + let streams = table.partitioned_read(read_request).await.unwrap(); + let mut record_batches = Vec::new(); + + for mut stream in streams.streams { + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + + record_batches.push(batch); + } + } + + record_batches + } + + pub async fn reopen_with_tables(&mut self, tables: &[&str]) { + let table_infos: Vec<_> = tables + .iter() + .map(|name| { + let table_id = self.name_to_tables.get(*name).unwrap().id(); + (table_id, *name) + }) + .collect(); + { + // Close all tables. + self.name_to_tables.clear(); + + // Close engine. + let engine = self.engine.take().unwrap(); + engine.close().await.unwrap(); + } + + self.open().await; + + self.open_tables_of_shard(table_infos, DEFAULT_SHARD_ID) + .await; + } + + async fn open_tables_of_shard(&mut self, table_infos: Vec<(TableId, &str)>, shard_id: ShardId) { + let table_defs = table_infos + .into_iter() + .map(|table| TableDef { + catalog_name: "horaedb".to_string(), + schema_name: "public".to_string(), + schema_id: self.schema_id, + id: table.0, + name: table.1.to_string(), + }) + .collect(); + + let open_shard_request = OpenShardRequest { + shard_id, + table_defs, + engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + }; + + let tables = self + .engine() + .open_shard(open_shard_request) + .await + .unwrap() + .into_values() + .map(|result| result.unwrap().unwrap()); + + for table in tables { + self.name_to_tables.insert(table.name().to_string(), table); + } + } + + pub fn name_to_tables(&self) -> &HashMap { + &self.name_to_tables + } +}