Skip to content

Commit

Permalink
feat!: Set merge mode while creating table in influx handler (#4299)
Browse files Browse the repository at this point in the history
* feat: influxdb write auto set merge mode

* chore: update logs

* chore: address PR comments
  • Loading branch information
evenyag authored Jul 8, 2024
1 parent bb32230 commit 59afa70
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 83 deletions.
12 changes: 12 additions & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_influx_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_metric_row_inserts(
&self,
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ impl InfluxdbLineProtocolHandler for Instance {
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;

let requests = request.try_into()?;

let requests = interceptor_ref
.post_lines_conversion(requests, ctx.clone())
.await?;

self.handle_row_inserts(requests, ctx)
self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)
Expand Down
172 changes: 93 additions & 79 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
Expand All @@ -66,10 +67,17 @@ pub struct Inserter {

pub type InserterRef = Arc<Inserter>;

/// Hint for the table type to create automatically.
#[derive(Clone)]
enum AutoCreateTableType {
/// A logical table with the physical table name.
Logical(String),
/// A physical table.
Physical,
/// A log table which is append-only.
Log,
/// A table that merges rows by `last_non_null` strategy.
LastNonNull,
}

impl AutoCreateTableType {
Expand All @@ -78,6 +86,7 @@ impl AutoCreateTableType {
AutoCreateTableType::Logical(_) => "logical",
AutoCreateTableType::Physical => "physical",
AutoCreateTableType::Log => "log",
AutoCreateTableType::LastNonNull => "last_non_null",
}
}
}
Expand Down Expand Up @@ -108,41 +117,61 @@ impl Inserter {
.await
}

/// Handles row inserts request and creates a physical table on demand.
pub async fn handle_row_inserts(
&self,
mut requests: RowInsertRequests,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Physical,
)
.await
}

let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Physical,
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
/// Handles row inserts request and creates a log table on demand.
pub async fn handle_log_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Log,
)
.await
}

self.do_request(inserts, &ctx).await
/// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
pub async fn handle_last_non_null_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::LastNonNull,
)
.await
}

pub async fn handle_log_inserts(
/// Handles row inserts request with specified [AutoCreateTableType].
async fn handle_row_inserts_with_create_type(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
Expand All @@ -154,12 +183,7 @@ impl Inserter {
validate_column_count_match(&requests)?;

let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Log,
statement_executor,
)
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
Expand All @@ -168,7 +192,7 @@ impl Inserter {
self.do_request(inserts, &ctx).await
}

/// Handle row inserts request with metric engine.
/// Handles row inserts request with metric engine.
pub async fn handle_metric_row_inserts(
&self,
mut requests: RowInsertRequests,
Expand Down Expand Up @@ -486,21 +510,18 @@ impl Inserter {
.await?;
}
}
AutoCreateTableType::Physical => {
AutoCreateTableType::Physical
| AutoCreateTableType::Log
| AutoCreateTableType::LastNonNull => {
for req in create_tables {
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
let table = self
.create_non_logical_table(
req,
ctx,
statement_executor,
auto_create_table_type.clone(),
)
.await?;
}
}
AutoCreateTableType::Log => {
for req in create_tables {
let table = self.create_log_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
Expand Down Expand Up @@ -612,69 +633,62 @@ impl Inserter {
}))
}

/// Create a table with schema from insert request.
///
/// To create a metric engine logical table, specify the `on_physical_table` parameter.
async fn create_table(
/// Creates a non-logical table by create type.
/// # Panics
/// Panics if `create_type` is `AutoCreateTableType::Logical`.
async fn create_non_logical_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<TableRef> {
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;

info!("Table `{table_ref}` does not exist, try creating table");

// TODO(weny): multiple regions table.
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;

match res {
Ok(table) => {
info!("Successfully created table {}", table_ref,);
Ok(table)
}
Err(err) => {
error!(err; "Failed to create table {}", table_ref);
Err(err)
}
}
let options: &[(&str, &str)] = match create_type {
AutoCreateTableType::Logical(_) => unreachable!(),
AutoCreateTableType::Physical => &[],
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")],
AutoCreateTableType::LastNonNull => &[(MERGE_MODE_KEY, "last_non_null")],
};
self.create_table_with_options(req, ctx, statement_executor, options)
.await
}

async fn create_log_table(
/// Creates a table with options.
async fn create_table_with_options(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
options: &[(&str, &str)],
) -> Result<TableRef> {
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`.
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;

info!("Table `{table_ref}` does not exist, try creating the log table");
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
create_table_expr
.table_options
.insert("append_mode".to_string(), "true".to_string());
info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in options {
create_table_expr
.table_options
.insert(k.to_string(), v.to_string());
}
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;

match res {
Ok(table) => {
info!("Successfully created a log table {}", table_ref);
info!(
"Successfully created table {} with options: {:?}",
table_ref, options
);
Ok(table)
}
Err(err) => {
error!(err; "Failed to create a log table {}", table_ref);
error!(err; "Failed to create table {}", table_ref);
Err(err)
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

use common_wal::options::WAL_OPTIONS_KEY;

/// Option key for append mode.
pub const APPEND_MODE_KEY: &str = "append_mode";
/// Option key for merge mode.
pub const MERGE_MODE_KEY: &str = "merge_mode";

/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
Expand All @@ -34,8 +39,8 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"memtable.partition_tree.index_max_keys_per_shard",
"memtable.partition_tree.data_freeze_threshold",
"memtable.partition_tree.fork_dictionary_bytes",
"append_mode",
"merge_mode",
APPEND_MODE_KEY,
MERGE_MODE_KEY,
]
.contains(&key)
}
Expand Down
31 changes: 31 additions & 0 deletions tests-integration/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,37 @@ monitor1,host=host2 memory=1027 1663840496400340001";
+-------------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 |
+-------------------------------+-------+------+--------+"
);

// Put the cpu column for host2.
let lines = r"
monitor1,host=host2 cpu=32 1663840496400340001";
let request = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
instance.exec(request, QueryContext::arc()).await.unwrap();

let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-------------------------------+-------+------+--------+
| ts | host | cpu | memory |
+-------------------------------+-------+------+--------+
| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
+-------------------------------+-------+------+--------+"
);
}
Expand Down

0 comments on commit 59afa70

Please sign in to comment.