Skip to content

Commit

Permalink
feat: update physical table's schema on creating logical table (#3570)
Browse files Browse the repository at this point in the history
* feat: update physical table's schema on creating logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove debug code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak ut const

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* invalid physical table cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Mar 25, 2024
1 parent 2ad0b24 commit 992c7ec
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 22 deletions.
134 changes: 126 additions & 8 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use api::v1::{CreateTableExpr, SemanticType};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures_util::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};

use crate::cache_invalidator::Context;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::error::{
DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu,
TableInfoNotFoundSnafu,
};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
Expand Down Expand Up @@ -169,11 +179,12 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}

/// Creates table metadata
/// Creates table metadata for logical tables and update corresponding physical
/// table's metadata.
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
pub async fn on_create_metadata(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
let remaining_tasks = self.creator.data.remaining_tasks();
Expand Down Expand Up @@ -208,6 +219,42 @@ impl CreateLogicalTablesProcedure {
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();

if !self.creator.data.physical_columns.is_empty() {
// fetch old physical table's info
let physical_table_info = self
.context
.table_metadata_manager
.get_full_table_info(self.creator.data.physical_table_id)
.await?
.0
.context(TableInfoNotFoundSnafu {
table_name: format!("table id - {}", self.creator.data.physical_table_id),
})?;

// generate new table info
let new_table_info = self
.creator
.data
.build_new_physical_table_info(&physical_table_info);

// update physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info, new_table_info)
.await?;

// invalid table cache
self.context
.cache_invalidator
.invalidate(
&Context::default(),
vec![CacheIdent::TableId(self.creator.data.physical_table_id)],
)
.await?;
} else {
warn!("No physical columns found, leaving the physical table's schema unchanged");
}

info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");

Ok(Status::done_with_output(table_ids))
Expand Down Expand Up @@ -275,11 +322,39 @@ impl CreateLogicalTablesProcedure {
});
}

join_all(create_region_tasks)
// collect response from datanodes
let raw_schemas = join_all(create_region_tasks)
.await
.into_iter()
.map(|response| {
response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))
})
.collect::<Result<Vec<_>>>()?;

if raw_schemas.is_empty() {
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}

// verify all datanodes return the same raw schemas
// Safety: previous check ensures this vector is not empty.
let first = raw_schemas.first().unwrap();
ensure!(
raw_schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "Raw schemas from datanodes are not the same"
}
);

// decode raw schemas and store it
if let Some(raw_schema) = first {
let physical_columns =
ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?;
self.creator.data.physical_columns = physical_columns;
} else {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.creator.data.state = CreateTablesState::CreateMetadata;

// Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage.
Expand Down Expand Up @@ -357,6 +432,7 @@ impl TablesCreator {
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
},
}
}
Expand All @@ -370,6 +446,7 @@ pub struct CreateTablesData {
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
physical_columns: Vec<ColumnMetadata>,
}

impl CreateTablesData {
Expand Down Expand Up @@ -420,6 +497,47 @@ impl CreateTablesData {
})
.collect::<Vec<_>>()
}

/// Generate the new physical table info.
///
/// This method will consumes the physical columns.
fn build_new_physical_table_info(
&mut self,
old_table_info: &DeserializedValueWithBytes<TableInfoValue>,
) -> RawTableInfo {
let mut raw_table_info = old_table_info.deref().table_info.clone();

let existing_primary_key = raw_table_info
.meta
.schema
.column_schemas
.iter()
.map(|col| col.name.clone())
.collect::<HashSet<_>>();
let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
let value_indices = &mut raw_table_info.meta.value_indices;
value_indices.clear();
let time_index = &mut raw_table_info.meta.schema.timestamp_index;
let columns = &mut raw_table_info.meta.schema.column_schemas;
columns.clear();

for (idx, col) in self.physical_columns.drain(..).enumerate() {
match col.semantic_type {
SemanticType::Tag => {
// push new primary key to the end.
if !existing_primary_key.contains(&col.column_schema.name) {
primary_key_indices.push(idx);
}
}
SemanticType::Field => value_indices.push(idx),
SemanticType::Timestamp => *time_index = Some(idx),
}

columns.push(col.column_schema);
}

raw_table_info
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ impl RegionServerHandler for RegionServer {

// merge results by sum up affected rows and merge extensions.
let mut affected_rows = 0;
let mut extension = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extension.extend(result.extension);
}

Ok(RegionResponse {
Expand All @@ -282,7 +284,7 @@ impl RegionServerHandler for RegionServer {
}),
}),
affected_rows: affected_rows as _,
extension: Default::default(),
extension,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async fn test_on_datanode_create_logical_regions() {
let region_routes = test_data::new_region_routes();
let datanode_manager = new_datanode_manager(&region_server, &region_routes).await;
let physical_table_route = TableRouteValue::physical(region_routes);
let physical_table_id = 111;
let physical_table_id = 1;

let task1 = create_table_task(Some("my_table1"));
let task2 = create_table_task(Some("my_table2"));
Expand Down
1 change: 0 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ impl RegionEngine for MetricEngine {
RegionRequest::Catchup(_) => Ok(0),
};

// TODO: pass extension
result
.map_err(BoxedError::new)
.map(|rows| RegionHandleResult {
Expand Down
27 changes: 18 additions & 9 deletions tests/cases/standalone/common/create/create_metric_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) en

Affected Rows: 0

SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;

Error: 3000(PlanQuery), Failed to plan SQL: No field named metric. Valid fields are information_schema.tables.table_catalog, information_schema.tables.table_schema, information_schema.tables.table_name, information_schema.tables.table_type, information_schema.tables.table_id, information_schema.tables.engine.
+---------------+--------------+------------+------------+--------+
| table_catalog | table_schema | table_name | table_type | engine |
+---------------+--------------+------------+------------+--------+
| greptime | public | phy | BASE TABLE | metric |
| greptime | public | t1 | BASE TABLE | metric |
| greptime | public | t2 | BASE TABLE | metric |
+---------------+--------------+------------+------------+--------+

-- We currently don't maintains physical table's schema.
DESC TABLE phy;

+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | | NO | | FIELD |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+

DESC TABLE t1;

Expand Down
3 changes: 1 addition & 2 deletions tests/cases/standalone/common/create/create_metric_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) e

CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");

SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name;

-- We currently don't maintains physical table's schema.
DESC TABLE phy;

DESC TABLE t1;
Expand Down
24 changes: 24 additions & 0 deletions tests/cases/standalone/common/insert/logical_metric_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ DROP TABLE t2;

Affected Rows: 0

DESC TABLE phy;

+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | | NO | | FIELD |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+

SELECT ts, val, __tsid, host, job FROM phy;

+-------------------------+-----+----------------------+-------+------+
| ts | val | __tsid | host | job |
+-------------------------+-----+----------------------+-------+------+
| 1970-01-01T00:00:00.001 | 1.0 | 1128149335081630826 | host2 | |
| 1970-01-01T00:00:00 | 0.0 | 18067404594631612786 | host1 | |
| 1970-01-01T00:00:00.001 | 1.0 | 2176048834144407834 | | job2 |
| 1970-01-01T00:00:00 | 0.0 | 15980333303142110493 | | job1 |
+-------------------------+-----+----------------------+-------+------+

DROP TABLE phy;

Affected Rows: 0
Expand Down
4 changes: 4 additions & 0 deletions tests/cases/standalone/common/insert/logical_metric_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ DROP TABLE t1;

DROP TABLE t2;

DESC TABLE phy;

SELECT ts, val, __tsid, host, job FROM phy;

DROP TABLE phy;
2 changes: 2 additions & 0 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ impl Env {
};
let log_file_name = self.data_home.join(log_file_name).display().to_string();

println!("{subcommand} log file at {log_file_name}");

let log_file = OpenOptions::new()
.create(true)
.write(true)
Expand Down

0 comments on commit 992c7ec

Please sign in to comment.