diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index dba6a0f8892f..97da27852030 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -55,7 +55,7 @@ GreptimeDB uses the [Apache 2.0 license](https://github.com/GreptimeTeam/greptim
- To ensure that community is free and confident in its ability to use your contributions, please sign the Contributor License Agreement (CLA) which will be incorporated in the pull request process.
- Make sure all files have proper license header (running `docker run --rm -v $(pwd):/github/workspace ghcr.io/korandoru/hawkeye-native:v3 format` from the project root).
- Make sure all your codes are formatted and follow the [coding style](https://pingcap.github.io/style-guide/rust/) and [style guide](docs/style-guide.md).
-- Make sure all unit tests are passed (using `cargo test --workspace` or [nextest](https://nexte.st/index.html) `cargo nextest run`).
+- Make sure all unit tests are passed using [nextest](https://nexte.st/index.html) `cargo nextest run`.
- Make sure all clippy warnings are fixed (you can check it locally by running `cargo clippy --workspace --all-targets -- -D warnings`).
#### `pre-commit` Hooks
diff --git a/Makefile b/Makefile
index 4c36f5c86b0a..a01fb72344b1 100644
--- a/Makefile
+++ b/Makefile
@@ -205,10 +205,14 @@ run-it-in-container: start-etcd ## Run integration tests in dev-builder.
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
make test sqlness-test BUILD_JOBS=${BUILD_JOBS}
-.PHONY: run-cluster-with-etcd
-run-cluster-with-etcd: ## Run greptime cluster with etcd in docker-compose.
+.PHONY: start-cluster
+start-cluster: ## Start the greptimedb cluster with etcd by using docker compose.
docker compose -f ./docker/docker-compose/cluster-with-etcd.yaml up
+.PHONY: stop-cluster
+stop-cluster: ## Stop the greptimedb cluster that created by docker compose.
+ docker compose -f ./docker/docker-compose/cluster-with-etcd.yaml stop
+
##@ Docs
config-docs: ## Generate configuration documentation from toml files.
docker run --rm \
diff --git a/config/config.md b/config/config.md
index 88380597872b..cb90eac70f48 100644
--- a/config/config.md
+++ b/config/config.md
@@ -139,6 +139,7 @@
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.
Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. |
+| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
@@ -428,6 +429,7 @@
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.
Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. |
+| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 1f06271000e1..ea8c95cf7005 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -478,6 +478,10 @@ data_freeze_threshold = 32768
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"
+[[region_engine]]
+## Enable the file engine.
+[region_engine.file]
+
## The logging options.
[logging]
## The directory to store the log files.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 2e8831951125..0944d5985e80 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -501,6 +501,10 @@ data_freeze_threshold = 32768
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"
+[[region_engine]]
+## Enable the file engine.
+[region_engine.file]
+
## The logging options.
[logging]
## The directory to store the log files.
diff --git a/docker/docker-compose/cluster-with-etcd.yaml b/docker/docker-compose/cluster-with-etcd.yaml
index 833d16883e40..6491bdeac1fd 100644
--- a/docker/docker-compose/cluster-with-etcd.yaml
+++ b/docker/docker-compose/cluster-with-etcd.yaml
@@ -1,12 +1,13 @@
x-custom:
- initial_cluster_token: &initial_cluster_token "--initial-cluster-token=etcd-cluster"
- common_settings: &common_settings
+ etcd_initial_cluster_token: &etcd_initial_cluster_token "--initial-cluster-token=etcd-cluster"
+ etcd_common_settings: &etcd_common_settings
image: quay.io/coreos/etcd:v3.5.10
entrypoint: /usr/local/bin/etcd
+ greptimedb_image: &greptimedb_image docker.io/greptimedb/greptimedb:latest
services:
etcd0:
- <<: *common_settings
+ <<: *etcd_common_settings
container_name: etcd0
ports:
- 2379:2379
@@ -22,7 +23,7 @@ services:
- --election-timeout=1250
- --initial-cluster=etcd0=http://etcd0:2380
- --initial-cluster-state=new
- - *initial_cluster_token
+ - *etcd_initial_cluster_token
volumes:
- /tmp/greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
healthcheck:
@@ -34,7 +35,7 @@ services:
- greptimedb
metasrv:
- image: docker.io/greptime/greptimedb:latest
+ image: *greptimedb_image
container_name: metasrv
ports:
- 3002:3002
@@ -56,10 +57,11 @@ services:
- greptimedb
datanode0:
- image: docker.io/greptime/greptimedb:latest
+ image: *greptimedb_image
container_name: datanode0
ports:
- 3001:3001
+ - 5000:5000
command:
- datanode
- start
@@ -67,8 +69,14 @@ services:
- --rpc-addr=0.0.0.0:3001
- --rpc-hostname=datanode0:3001
- --metasrv-addrs=metasrv:3002
+ - --http-addr=0.0.0.0:5000
volumes:
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
+ healthcheck:
+ test: [ "CMD", "curl", "-f", "http://datanode0:5000/health" ]
+ interval: 5s
+ timeout: 3s
+ retries: 5
depends_on:
metasrv:
condition: service_healthy
@@ -76,7 +84,7 @@ services:
- greptimedb
frontend0:
- image: docker.io/greptime/greptimedb:latest
+ image: *greptimedb_image
container_name: frontend0
ports:
- 4000:4000
@@ -91,8 +99,31 @@ services:
- --rpc-addr=0.0.0.0:4001
- --mysql-addr=0.0.0.0:4002
- --postgres-addr=0.0.0.0:4003
+ healthcheck:
+ test: [ "CMD", "curl", "-f", "http://frontend0:4000/health" ]
+ interval: 5s
+ timeout: 3s
+ retries: 5
depends_on:
- metasrv:
+ datanode0:
+ condition: service_healthy
+ networks:
+ - greptimedb
+
+ flownode0:
+ image: *greptimedb_image
+ container_name: flownode0
+ ports:
+ - 4004:4004
+ command:
+ - flownode
+ - start
+ - --node-id=0
+ - --metasrv-addrs=metasrv:3002
+ - --rpc-addr=0.0.0.0:4004
+ - --rpc-hostname=flownode0:4004
+ depends_on:
+ frontend0:
condition: service_healthy
networks:
- greptimedb
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 3f25c3fdc7f5..cf1cf6bc5048 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -26,6 +26,7 @@ use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
+use file_engine::config::EngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
use meta_client::MetaClientOptions;
@@ -71,18 +72,21 @@ fn test_load_datanode_example_config() {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
- region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
- num_workers: 8,
- auto_flush_interval: Duration::from_secs(3600),
- scan_parallelism: 0,
- global_write_buffer_size: ReadableSize::gb(1),
- global_write_buffer_reject_size: ReadableSize::gb(2),
- sst_meta_cache_size: ReadableSize::mb(128),
- vector_cache_size: ReadableSize::mb(512),
- page_cache_size: ReadableSize::mb(512),
- max_background_jobs: 4,
- ..Default::default()
- })],
+ region_engine: vec![
+ RegionEngineConfig::Mito(MitoConfig {
+ num_workers: 8,
+ auto_flush_interval: Duration::from_secs(3600),
+ scan_parallelism: 0,
+ global_write_buffer_size: ReadableSize::gb(1),
+ global_write_buffer_reject_size: ReadableSize::gb(2),
+ sst_meta_cache_size: ReadableSize::mb(128),
+ vector_cache_size: ReadableSize::mb(512),
+ page_cache_size: ReadableSize::mb(512),
+ max_background_jobs: 4,
+ ..Default::default()
+ }),
+ RegionEngineConfig::File(EngineConfig {}),
+ ],
logging: LoggingOptions {
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
@@ -207,18 +211,21 @@ fn test_load_standalone_example_config() {
sync_period: Some(Duration::from_secs(10)),
..Default::default()
}),
- region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
- num_workers: 8,
- auto_flush_interval: Duration::from_secs(3600),
- scan_parallelism: 0,
- global_write_buffer_size: ReadableSize::gb(1),
- global_write_buffer_reject_size: ReadableSize::gb(2),
- sst_meta_cache_size: ReadableSize::mb(128),
- vector_cache_size: ReadableSize::mb(512),
- page_cache_size: ReadableSize::mb(512),
- max_background_jobs: 4,
- ..Default::default()
- })],
+ region_engine: vec![
+ RegionEngineConfig::Mito(MitoConfig {
+ num_workers: 8,
+ auto_flush_interval: Duration::from_secs(3600),
+ scan_parallelism: 0,
+ global_write_buffer_size: ReadableSize::gb(1),
+ global_write_buffer_reject_size: ReadableSize::gb(2),
+ sst_meta_cache_size: ReadableSize::mb(128),
+ vector_cache_size: ReadableSize::mb(512),
+ page_cache_size: ReadableSize::mb(512),
+ max_background_jobs: 4,
+ ..Default::default()
+ }),
+ RegionEngineConfig::File(EngineConfig {}),
+ ],
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs
index 9988a311f51c..88f21ce9dbf8 100644
--- a/src/common/datasource/src/file_format/parquet.rs
+++ b/src/common/datasource/src/file_format/parquet.rs
@@ -27,12 +27,14 @@ use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
+use datatypes::schema::SchemaRef;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
-use parquet::basic::{Compression, ZstdLevel};
-use parquet::file::properties::WriterProperties;
+use parquet::basic::{Compression, Encoding, ZstdLevel};
+use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
+use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
@@ -184,14 +186,16 @@ impl ArrowWriterCloser for ArrowWriter {
/// Returns number of rows written.
pub async fn stream_to_parquet(
mut stream: SendableRecordBatchStream,
+ schema: datatypes::schema::SchemaRef,
store: ObjectStore,
path: &str,
concurrency: usize,
) -> Result {
- let write_props = WriterProperties::builder()
- .set_compression(Compression::ZSTD(ZstdLevel::default()))
- .build();
- let schema = stream.schema();
+ let write_props = column_wise_config(
+ WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
+ schema,
+ )
+ .build();
let inner_writer = store
.writer_with(path)
.concurrent(concurrency)
@@ -200,7 +204,7 @@ pub async fn stream_to_parquet(
.map(|w| w.into_futures_async_write().compat_write())
.context(WriteObjectSnafu { path })?;
- let mut writer = AsyncArrowWriter::try_new(inner_writer, schema, Some(write_props))
+ let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
.context(WriteParquetSnafu { path })?;
let mut rows_written = 0;
@@ -216,6 +220,24 @@ pub async fn stream_to_parquet(
Ok(rows_written)
}
+/// Customizes per-column properties.
+fn column_wise_config(
+ mut props: WriterPropertiesBuilder,
+ schema: SchemaRef,
+) -> WriterPropertiesBuilder {
+ // Disable dictionary for timestamp column, since for increasing timestamp column,
+ // the dictionary pages will be larger than data pages.
+ for col in schema.column_schemas() {
+ if col.data_type.is_timestamp() {
+ let path = ColumnPath::new(vec![col.name.clone()]);
+ props = props
+ .set_column_dictionary_enabled(path.clone(), false)
+ .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
+ }
+ }
+ props
+}
+
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index c83c14183405..0b47c32ee21d 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -100,7 +100,6 @@ impl From for DatanodeWalConfig {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
- compression: config.compression,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
@@ -114,7 +113,6 @@ mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
- use rskafka::client::partition::Compression;
use super::*;
use crate::config::kafka::common::BackoffConfig;
@@ -207,7 +205,6 @@ mod tests {
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
- compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
@@ -229,7 +226,6 @@ mod tests {
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
- compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index b50ac685537d..ae97c1017cf5 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -15,7 +15,6 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
-use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
@@ -27,9 +26,6 @@ use crate::BROKER_ENDPOINT;
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec,
- /// The compression algorithm used to compress kafka records.
- #[serde(skip)]
- pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
@@ -46,7 +42,6 @@ impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
- compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs
index ddee160bf642..5a5e445f819c 100644
--- a/src/common/wal/src/config/kafka/standalone.rs
+++ b/src/common/wal/src/config/kafka/standalone.rs
@@ -15,7 +15,6 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
-use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
@@ -40,9 +39,6 @@ pub struct StandaloneKafkaConfig {
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
- /// The compression algorithm used to compress kafka records.
- #[serde(skip)]
- pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
@@ -67,7 +63,6 @@ impl Default for StandaloneKafkaConfig {
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
- compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs
index 5f686e6ace5e..089f05f008c4 100644
--- a/src/log-store/src/kafka/client_manager.rs
+++ b/src/log-store/src/kafka/client_manager.rs
@@ -98,7 +98,7 @@ impl ClientManager {
producer_channel_size: REQUEST_BATCH_SIZE * 2,
producer_request_batch_size: REQUEST_BATCH_SIZE,
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
- compression: config.compression,
+ compression: Compression::Lz4,
})
}
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 7919aeb4ca5e..1533694bc9de 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -371,9 +371,6 @@ pub struct InvertedIndexConfig {
/// Memory threshold for performing an external sort during index creation.
pub mem_threshold_on_create: MemoryThreshold,
- /// Whether to compress the index data.
- pub compress: bool,
-
#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
@@ -396,7 +393,6 @@ impl Default for InvertedIndexConfig {
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
- compress: true,
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(32),
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index 1d598efcb40e..9179d8a07411 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;
let region_stat = region.region_usage();
- assert_eq!(region_stat.sst_usage, 3026);
+ assert_eq!(region_stat.sst_usage, 3010);
// region total usage
// Some memtables may share items.
diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs
index f71b665a252f..9de4a0ddf572 100644
--- a/src/mito2/src/engine/compaction_test.rs
+++ b/src/mito2/src/engine/compaction_test.rs
@@ -112,7 +112,11 @@ async fn test_compaction_region() {
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
- let request = CreateRequestBuilder::new().build();
+ let request = CreateRequestBuilder::new()
+ .insert_option("compaction.type", "twcs")
+ .insert_option("compaction.twcs.max_active_window_runs", "1")
+ .insert_option("compaction.twcs.max_inactive_window_runs", "1")
+ .build();
let column_schemas = request
.column_metadatas
diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs
index 1adf51d12f41..0f0be6b8f12b 100644
--- a/src/mito2/src/engine/merge_mode_test.rs
+++ b/src/mito2/src/engine/merge_mode_test.rs
@@ -101,8 +101,8 @@ async fn test_merge_mode_compaction() {
let request = CreateRequestBuilder::new()
.field_num(2)
.insert_option("compaction.type", "twcs")
- .insert_option("compaction.twcs.max_active_window_files", "2")
- .insert_option("compaction.twcs.max_inactive_window_files", "2")
+ .insert_option("compaction.twcs.max_active_window_runs", "1")
+ .insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("merge_mode", "last_non_null")
.build();
let region_dir = request.region_dir.clone();
diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs
index 9e740cff86b0..7a28cee977d6 100644
--- a/src/mito2/src/region/options.rs
+++ b/src/mito2/src/region/options.rs
@@ -216,7 +216,7 @@ impl TwcsOptions {
impl Default for TwcsOptions {
fn default() -> Self {
Self {
- max_active_window_runs: 1,
+ max_active_window_runs: 4,
max_inactive_window_runs: 1,
time_window: None,
remote_compaction: false,
diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs
index 2407a974c107..909bf481b484 100644
--- a/src/mito2/src/sst/index.rs
+++ b/src/mito2/src/sst/index.rs
@@ -220,7 +220,6 @@ impl<'a> IndexerBuilder<'a> {
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
- self.inverted_index_config.compress,
&self.index_options.inverted_index.ignore_column_ids,
);
diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs
index 380661d60db9..c8bed65bd8f5 100644
--- a/src/mito2/src/sst/index/inverted_index/creator.rs
+++ b/src/mito2/src/sst/index/inverted_index/creator.rs
@@ -24,7 +24,6 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
-use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -71,9 +70,6 @@ pub struct SstIndexCreator {
/// The memory usage of the index creator.
memory_usage: Arc,
- /// Whether to compress the index data.
- compress: bool,
-
/// Ids of indexed columns.
column_ids: HashSet,
}
@@ -87,7 +83,6 @@ impl SstIndexCreator {
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option,
segment_row_count: NonZeroUsize,
- compress: bool,
ignore_column_ids: &[ColumnId],
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider::new(
@@ -122,7 +117,6 @@ impl SstIndexCreator {
stats: Statistics::default(),
aborted: false,
memory_usage,
- compress,
column_ids,
}
}
@@ -242,12 +236,9 @@ impl SstIndexCreator {
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
- let put_options = PutOptions {
- compression: self.compress.then_some(CompressionCodec::Zstd),
- };
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
- puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options)
+ puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
);
match (
@@ -398,7 +389,6 @@ mod tests {
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
- false,
&[],
);
diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs
index 7a9d24695173..98a240547d09 100644
--- a/src/mito2/src/sst/index/puffin_manager.rs
+++ b/src/mito2/src/sst/index/puffin_manager.rs
@@ -21,8 +21,8 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
-use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
-use puffin::puffin_manager::{BlobGuard, PuffinManager};
+use puffin::puffin_manager::stager::BoundedStager;
+use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use crate::error::{PuffinInitStagerSnafu, Result};
@@ -35,10 +35,11 @@ use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
-pub(crate) type BlobReader = as BlobGuard>::Reader;
-pub(crate) type SstPuffinWriter = ::Writer;
pub(crate) type SstPuffinManager =
FsPuffinManager, ObjectStorePuffinFileAccessor>;
+pub(crate) type SstPuffinReader = ::Reader;
+pub(crate) type SstPuffinWriter = ::Writer;
+pub(crate) type BlobReader = <::Blob as BlobGuard>::Reader;
const STAGING_DIR: &str = "staging";
diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs
index 0fb6f1137e9c..8a90d1095569 100644
--- a/src/operator/src/statement/copy_table_to.rs
+++ b/src/operator/src/statement/copy_table_to.rs
@@ -75,14 +75,18 @@ impl StatementExecutor {
)
.await
.context(error::WriteStreamToFileSnafu { path }),
- Format::Parquet(_) => stream_to_parquet(
- Box::pin(DfRecordBatchStreamAdapter::new(stream)),
- object_store,
- path,
- WRITE_CONCURRENCY,
- )
- .await
- .context(error::WriteStreamToFileSnafu { path }),
+ Format::Parquet(_) => {
+ let schema = stream.schema();
+ stream_to_parquet(
+ Box::pin(DfRecordBatchStreamAdapter::new(stream)),
+ schema,
+ object_store,
+ path,
+ WRITE_CONCURRENCY,
+ )
+ .await
+ .context(error::WriteStreamToFileSnafu { path })
+ }
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}
diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs
index f1435bd0e474..9a87d70592c2 100644
--- a/src/puffin/src/file_format/reader/file.rs
+++ b/src/puffin/src/file_format/reader/file.rs
@@ -61,6 +61,15 @@ impl PuffinFileReader {
);
Ok(())
}
+
+ /// Converts the reader into an owned blob reader.
+ pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader {
+ PartialReader::new(
+ self.source,
+ blob_metadata.offset as _,
+ blob_metadata.length as _,
+ )
+ }
}
impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader {
diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs
index 339b266c7476..f77b79c007ba 100644
--- a/src/puffin/src/puffin_manager.rs
+++ b/src/puffin/src/puffin_manager.rs
@@ -22,7 +22,6 @@ mod tests;
use std::path::PathBuf;
use async_trait::async_trait;
-use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek};
use crate::blob_metadata::CompressionCodec;
@@ -92,10 +91,11 @@ pub trait PuffinReader {
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
+#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek + Unpin;
- fn reader(&self) -> BoxFuture<'static, Result>;
+ async fn reader(&self) -> Result;
}
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs
index 46deb198cc09..89ef8cc45192 100644
--- a/src/puffin/src/puffin_manager/file_accessor.rs
+++ b/src/puffin/src/puffin_manager/file_accessor.rs
@@ -21,7 +21,7 @@ use crate::error::Result;
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinFileAccessor: Send + Sync + 'static {
- type Reader: AsyncRead + AsyncSeek + Unpin + Send;
+ type Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync;
type Writer: AsyncWrite + Unpin + Send;
/// Opens a reader for the given puffin file.
diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs
index 7c95532dc604..01b367a78291 100644
--- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs
+++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs
@@ -46,7 +46,7 @@ impl FsPuffinManager {
#[async_trait]
impl PuffinManager for FsPuffinManager
where
- S: Stager + Clone,
+ S: Stager + Clone + 'static,
F: PuffinFileAccessor + Clone,
{
type Reader = FsPuffinReader;
diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
index a1b8d3a8ea0c..ad0eccabe46a 100644
--- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
+++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
@@ -12,23 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
-use futures::future::BoxFuture;
use futures::io::BufReader;
-use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
+use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};
-use crate::blob_metadata::CompressionCodec;
+use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::{
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
+use crate::partial_reader::PartialReader;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
-use crate::puffin_manager::PuffinReader;
+use crate::puffin_manager::{BlobGuard, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader {
@@ -55,25 +58,51 @@ impl FsPuffinReader {
#[async_trait]
impl PuffinReader for FsPuffinReader
where
- S: Stager,
+ S: Stager + 'static,
F: PuffinFileAccessor + Clone,
{
- type Blob = S::Blob;
+ type Blob = Either, S::Blob>;
type Dir = S::Dir;
async fn blob(&self, key: &str) -> Result {
- self.stager
- .get_blob(
- self.puffin_file_name.as_str(),
- key,
- Box::new(move |writer| {
- let accessor = self.puffin_file_accessor.clone();
- let puffin_file_name = self.puffin_file_name.clone();
- let key = key.to_string();
- Self::init_blob_to_cache(puffin_file_name, key, writer, accessor)
- }),
- )
- .await
+ let reader = self
+ .puffin_file_accessor
+ .reader(&self.puffin_file_name)
+ .await?;
+ let mut file = PuffinFileReader::new(reader);
+
+ // TODO(zhongzc): cache the metadata.
+ let metadata = file.metadata().await?;
+ let blob_metadata = metadata
+ .blobs
+ .into_iter()
+ .find(|m| m.blob_type == key)
+ .context(BlobNotFoundSnafu { blob: key })?;
+
+ let blob = if blob_metadata.compression_codec.is_none() {
+ // If the blob is not compressed, we can directly read it from the puffin file.
+ Either::L(RandomReadBlob {
+ file_name: self.puffin_file_name.clone(),
+ accessor: self.puffin_file_accessor.clone(),
+ blob_metadata,
+ })
+ } else {
+ // If the blob is compressed, we need to decompress it into staging space before reading.
+ let staged_blob = self
+ .stager
+ .get_blob(
+ self.puffin_file_name.as_str(),
+ key,
+ Box::new(|writer| {
+ Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
+ }),
+ )
+ .await?;
+
+ Either::R(staged_blob)
+ };
+
+ Ok(blob)
}
async fn dir(&self, key: &str) -> Result {
@@ -85,7 +114,12 @@ where
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let key = key.to_string();
- Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor)
+ Box::pin(Self::init_dir_to_stager(
+ puffin_file_name,
+ key,
+ writer_provider,
+ accessor,
+ ))
}),
)
.await
@@ -97,79 +131,63 @@ where
S: Stager,
F: PuffinFileAccessor,
{
- fn init_blob_to_cache(
- puffin_file_name: String,
- key: String,
+ async fn init_blob_to_stager(
+ mut reader: PuffinFileReader,
+ blob_metadata: BlobMetadata,
mut writer: BoxWriter,
- accessor: F,
- ) -> BoxFuture<'static, Result> {
- Box::pin(async move {
- let reader = accessor.reader(&puffin_file_name).await?;
- let mut file = PuffinFileReader::new(reader);
-
- let metadata = file.metadata().await?;
- let blob_metadata = metadata
- .blobs
- .iter()
- .find(|m| m.blob_type == key.as_str())
- .context(BlobNotFoundSnafu { blob: key })?;
- let reader = file.blob_reader(blob_metadata)?;
-
- let compression = blob_metadata.compression_codec;
- let size = Self::handle_decompress(reader, &mut writer, compression).await?;
-
- Ok(size)
- })
+ ) -> Result {
+ let reader = reader.blob_reader(&blob_metadata)?;
+ let compression = blob_metadata.compression_codec;
+ let size = Self::handle_decompress(reader, &mut writer, compression).await?;
+ Ok(size)
}
- fn init_dir_to_cache(
+ async fn init_dir_to_stager(
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
accessor: F,
- ) -> BoxFuture<'static, Result> {
- Box::pin(async move {
- let reader = accessor.reader(&puffin_file_name).await?;
- let mut file = PuffinFileReader::new(reader);
-
- let puffin_metadata = file.metadata().await?;
- let blob_metadata = puffin_metadata
- .blobs
- .iter()
- .find(|m| m.blob_type == key.as_str())
- .context(BlobNotFoundSnafu { blob: key })?;
-
- let mut reader = file.blob_reader(blob_metadata)?;
- let mut buf = vec![];
- reader.read_to_end(&mut buf).await.context(ReadSnafu)?;
- let dir_meta: DirMetadata =
- serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;
-
- let mut size = 0;
- for file_meta in dir_meta.files {
- let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
- BlobIndexOutOfBoundSnafu {
- index: file_meta.blob_index,
- max_index: puffin_metadata.blobs.len(),
- },
- )?;
- ensure!(
- blob_meta.blob_type == file_meta.key,
- FileKeyNotMatchSnafu {
- expected: file_meta.key,
- actual: &blob_meta.blob_type,
- }
- );
-
- let reader = file.blob_reader(blob_meta)?;
- let writer = writer_provider.writer(&file_meta.relative_path).await?;
-
- let compression = blob_meta.compression_codec;
- size += Self::handle_decompress(reader, writer, compression).await?;
- }
+ ) -> Result {
+ let reader = accessor.reader(&puffin_file_name).await?;
+ let mut file = PuffinFileReader::new(reader);
+
+ let puffin_metadata = file.metadata().await?;
+ let blob_metadata = puffin_metadata
+ .blobs
+ .iter()
+ .find(|m| m.blob_type == key.as_str())
+ .context(BlobNotFoundSnafu { blob: key })?;
- Ok(size)
- })
+ let mut reader = file.blob_reader(blob_metadata)?;
+ let mut buf = vec![];
+ reader.read_to_end(&mut buf).await.context(ReadSnafu)?;
+ let dir_meta: DirMetadata =
+ serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;
+
+ let mut size = 0;
+ for file_meta in dir_meta.files {
+ let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
+ BlobIndexOutOfBoundSnafu {
+ index: file_meta.blob_index,
+ max_index: puffin_metadata.blobs.len(),
+ },
+ )?;
+ ensure!(
+ blob_meta.blob_type == file_meta.key,
+ FileKeyNotMatchSnafu {
+ expected: file_meta.key,
+ actual: &blob_meta.blob_type,
+ }
+ );
+
+ let reader = file.blob_reader(blob_meta)?;
+ let writer = writer_provider.writer(&file_meta.relative_path).await?;
+
+ let compression = blob_meta.compression_codec;
+ size += Self::handle_decompress(reader, writer, compression).await?;
+ }
+
+ Ok(size)
}
/// Handles the decompression of the reader and writes the decompressed data to the writer.
@@ -196,3 +214,87 @@ where
}
}
}
+
+/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
+pub struct RandomReadBlob {
+ file_name: String,
+ accessor: F,
+ blob_metadata: BlobMetadata,
+}
+
+#[async_trait]
+impl BlobGuard for RandomReadBlob {
+ type Reader = PartialReader;
+
+ async fn reader(&self) -> Result {
+ ensure!(
+ self.blob_metadata.compression_codec.is_none(),
+ UnsupportedDecompressionSnafu {
+ decompression: self.blob_metadata.compression_codec.unwrap().to_string()
+ }
+ );
+
+ let reader = self.accessor.reader(&self.file_name).await?;
+ let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
+ Ok(blob_reader)
+ }
+}
+
+/// `Either` is a type that represents either `A` or `B`.
+///
+/// Used to:
+/// impl `AsyncRead + AsyncSeek` for `Either`,
+/// impl `BlobGuard` for `Either`.
+pub enum Either {
+ L(A),
+ R(B),
+}
+
+impl AsyncRead for Either
+where
+ A: AsyncRead + Unpin,
+ B: AsyncRead + Unpin,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll> {
+ match self.get_mut() {
+ Either::L(a) => Pin::new(a).poll_read(cx, buf),
+ Either::R(b) => Pin::new(b).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncSeek for Either
+where
+ A: AsyncSeek + Unpin,
+ B: AsyncSeek + Unpin,
+{
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: std::io::SeekFrom,
+ ) -> Poll> {
+ match self.get_mut() {
+ Either::L(a) => Pin::new(a).poll_seek(cx, pos),
+ Either::R(b) => Pin::new(b).poll_seek(cx, pos),
+ }
+ }
+}
+
+#[async_trait]
+impl BlobGuard for Either
+where
+ A: BlobGuard + Sync,
+ B: BlobGuard + Sync,
+{
+ type Reader = Either;
+ async fn reader(&self) -> Result {
+ match self {
+ Either::L(a) => Ok(Either::L(a.reader().await?)),
+ Either::R(b) => Ok(Either::R(b.reader().await?)),
+ }
+ }
+}
diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs
index 396dd69ba222..6e1581cddbb5 100644
--- a/src/puffin/src/puffin_manager/stager.rs
+++ b/src/puffin/src/puffin_manager/stager.rs
@@ -42,19 +42,19 @@ pub type DirWriterProviderRef = Box;
///
/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the staging area.
-pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;
+pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
/// Function that initializes a directory.
///
/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the staging area.
-pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;
+pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
/// `Stager` manages the staging area for the puffin files.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait Stager: Send + Sync {
- type Blob: BlobGuard;
+ type Blob: BlobGuard + Sync;
type Dir: DirGuard;
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs
index 1f1aaa2c01de..9294497a062a 100644
--- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs
+++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs
@@ -22,7 +22,6 @@ use async_walkdir::{Filtering, WalkDir};
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
use common_telemetry::{info, warn};
-use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use moka::future::Cache;
use sha2::{Digest, Sha256};
@@ -128,7 +127,7 @@ impl Stager for BoundedStager {
let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&file_name);
- let size = Self::write_blob(&path, &init_fn).await?;
+ let size = Self::write_blob(&path, init_fn).await?;
let guard = Arc::new(FsBlobGuard {
path,
@@ -163,7 +162,7 @@ impl Stager for BoundedStager {
let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&dir_name);
- let size = Self::write_dir(&path, &init_fn).await?;
+ let size = Self::write_dir(&path, init_fn).await?;
let guard = Arc::new(FsDirGuard {
path,
@@ -225,7 +224,7 @@ impl BoundedStager {
async fn write_blob(
target_path: &PathBuf,
- init_fn: &(dyn InitBlobFn + Send + Sync + '_),
+ init_fn: Box,
) -> Result {
// To guarantee the atomicity of writing the file, we need to write
// the file to a temporary file first...
@@ -247,7 +246,7 @@ impl BoundedStager {
async fn write_dir(
target_path: &PathBuf,
- init_fn: &(dyn InitDirFn + Send + Sync + '_),
+ init_fn: Box,
) -> Result {
// To guarantee the atomicity of writing the directory, we need to write
// the directory to a temporary directory first...
@@ -425,16 +424,13 @@ pub struct FsBlobGuard {
delete_queue: Sender,
}
+#[async_trait]
impl BlobGuard for FsBlobGuard {
type Reader = Compat;
- fn reader(&self) -> BoxFuture<'static, Result> {
- let path = self.path.clone();
- async move {
- let file = fs::File::open(&path).await.context(OpenSnafu)?;
- Ok(file.compat())
- }
- .boxed()
+ async fn reader(&self) -> Result {
+ let file = fs::File::open(&self.path).await.context(OpenSnafu)?;
+ Ok(file.compat())
}
}
diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs
index dc106d746182..02073522bece 100644
--- a/src/puffin/src/puffin_manager/tests.rs
+++ b/src/puffin/src/puffin_manager/tests.rs
@@ -62,14 +62,30 @@ async fn test_put_get_file() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
}
}
}
@@ -106,7 +122,15 @@ async fn test_put_get_files() {
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
}
// renew cache manager
@@ -114,7 +138,15 @@ async fn test_put_get_files() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
}
}
}
@@ -205,7 +237,15 @@ async fn test_put_get_mix_file_dir() {
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
@@ -216,7 +256,15 @@ async fn test_put_get_mix_file_dir() {
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
- check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
+ check_blob(
+ puffin_file_name,
+ key,
+ raw_data,
+ &stager,
+ &reader,
+ compression_codec.is_some(),
+ )
+ .await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
}
@@ -241,24 +289,28 @@ async fn put_blob(
.unwrap();
}
-async fn check_blob(
+async fn check_blob(
puffin_file_name: &str,
key: &str,
raw_data: &[u8],
stager: &BoundedStager,
- puffin_reader: &R,
-) where
- R: PuffinReader,
-{
+ puffin_reader: &impl PuffinReader,
+ compressed: bool,
+) {
let blob = puffin_reader.blob(key).await.unwrap();
let mut reader = blob.reader().await.unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, raw_data);
- let mut cached_file = stager.must_get_file(puffin_file_name, key).await;
+ if !compressed {
+ // If the blob is not compressed, it won't be exist in the stager.
+ return;
+ }
+
+ let mut staged_file = stager.must_get_file(puffin_file_name, key).await;
let mut buf = Vec::new();
- cached_file.read_to_end(&mut buf).await.unwrap();
+ staged_file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, raw_data);
}
@@ -291,15 +343,13 @@ async fn put_dir(
.unwrap();
}
-async fn check_dir(
+async fn check_dir(
puffin_file_name: &str,
key: &str,
files_in_dir: &[(&str, &[u8])],
stager: &BoundedStager,
- puffin_reader: &R,
-) where
- R: PuffinReader,
-{
+ puffin_reader: &impl PuffinReader,
+) {
let res_dir = puffin_reader.dir(key).await.unwrap();
for (file_name, raw_data) in files_in_dir {
let file_path = if cfg!(windows) {
@@ -311,12 +361,12 @@ async fn check_dir(
assert_eq!(buf, *raw_data);
}
- let cached_dir = stager.must_get_dir(puffin_file_name, key).await;
+ let staged_dir = stager.must_get_dir(puffin_file_name, key).await;
for (file_name, raw_data) in files_in_dir {
let file_path = if cfg!(windows) {
- cached_dir.as_path().join(file_name.replace('/', "\\"))
+ staged_dir.as_path().join(file_name.replace('/', "\\"))
} else {
- cached_dir.as_path().join(file_name)
+ staged_dir.as_path().join(file_name)
};
let buf = std::fs::read(file_path).unwrap();
assert_eq!(buf, *raw_data);
diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs
index 40f8cc80a33e..4eb2f476d243 100644
--- a/src/servers/src/http/error_result.rs
+++ b/src/servers/src/http/error_result.rs
@@ -100,15 +100,14 @@ impl IntoResponse for ErrorResponse {
| StatusCode::FlowNotFound
| StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
- StatusCode::PermissionDenied
- | StatusCode::AuthHeaderNotFound
+ StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,
- StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
+ StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 96d9316f5549..9a7d98279034 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -838,7 +838,6 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
-compress = true
metadata_cache_size = "32MiB"
content_cache_size = "32MiB"
diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result
index db05b043ac8f..c2c29a6f2d76 100644
--- a/tests/cases/distributed/information_schema/cluster_info.result
+++ b/tests/cases/distributed/information_schema/cluster_info.result
@@ -20,7 +20,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
@@ -30,7 +30,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
@@ -40,7 +40,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
@@ -50,7 +50,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
@@ -60,7 +60,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;
diff --git a/tests/cases/distributed/information_schema/cluster_info.sql b/tests/cases/distributed/information_schema/cluster_info.sql
index 7ac47e3d6309..fcdd4eb5106a 100644
--- a/tests/cases/distributed/information_schema/cluster_info.sql
+++ b/tests/cases/distributed/information_schema/cluster_info.sql
@@ -5,7 +5,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
@@ -13,7 +13,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
@@ -21,7 +21,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
@@ -29,7 +29,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
@@ -37,7 +37,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[0-9T:\.]{17}|\s[\-0-9T:\.]{23}) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;
diff --git a/tests/cases/standalone/information_schema/cluster_info.result b/tests/cases/standalone/information_schema/cluster_info.result
index 60ad81bdacc7..df549b652fba 100644
--- a/tests/cases/standalone/information_schema/cluster_info.result
+++ b/tests/cases/standalone/information_schema/cluster_info.result
@@ -20,22 +20,22 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO;
-+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|Start_time|STANDALONE||Version|Hash|Start_time|Start_timems||+++++++++
++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE';
-+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|Start_time|STANDALONE||Version|Hash|Start_time|Start_timems||+++++++++
++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE';
@@ -45,7 +45,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE';
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0;
diff --git a/tests/cases/standalone/information_schema/cluster_info.sql b/tests/cases/standalone/information_schema/cluster_info.sql
index 2fbd620da80e..4905f6d1fd23 100644
--- a/tests/cases/standalone/information_schema/cluster_info.sql
+++ b/tests/cases/standalone/information_schema/cluster_info.sql
@@ -5,7 +5,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO;
@@ -13,7 +13,7 @@ SELECT * FROM CLUSTER_INFO;
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE';
@@ -23,7 +23,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE';
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\d\.\d\.\d) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
--- SQLNESS REPLACE (\s[\-0-9T:\.]+) Start_time
+-- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0;