Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: alter table add column id alloc mismatch #4972

Merged
merged 8 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info, warn};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
Expand Down Expand Up @@ -150,6 +149,7 @@ impl DataRegion {
})
.collect::<Result<_>>()?;

debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
Expand Down
40 changes: 26 additions & 14 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,29 @@ impl MetricEngineInner {

let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
// columns that already exist in physical region
let mut existing_columns = vec![];

let pre_existing_physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;

let pre_exist_cols = pre_existing_physical_columns
.iter()
.map(|col| (col.column_schema.name.as_str(), col))
.collect::<HashMap<_, _>>();

// check pre-existing physical columns so if any columns to add is already exist,
// we can skip it in physical alter operation
// (but still need to update them in logical alter operation)
for col in &columns {
if self
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_region_id,
&col.column_metadata.column_schema.name,
)
.await?
.is_none()
if let Some(exist_column) =
pre_exist_cols.get(&col.column_metadata.column_schema.name.as_str())
{
// push the correct column schema with correct column id
existing_columns.push(*exist_column);
} else {
columns_to_add.push(col.column_metadata.clone());
}
}
Expand All @@ -111,16 +123,16 @@ impl MetricEngineInner {
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
columns_to_add,
&mut columns_to_add,
)
.await?;

// register columns to logical region
for col in columns {
// note here we don't use `columns` directly but concat `existing_columns` with `columns_to_add` to get correct metadata
// about already existing columns
for metadata in existing_columns.into_iter().chain(columns_to_add.iter()) {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.add_column(metadata_region_id, logical_region_id, metadata)
.await?;
}

Expand Down
61 changes: 46 additions & 15 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};

use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{info, warn};
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand Down Expand Up @@ -212,11 +212,17 @@ impl MetricEngineInner {

self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
new_columns,
&mut new_columns,
)
.await?;

// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}
}

// register logical region to metadata region
Expand Down Expand Up @@ -260,27 +266,24 @@ impl MetricEngineInner {
Ok(data_region_id)
}

/// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be
/// cloned into `added_columns`.
/// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id
/// which should be correct if the following requirements are met:
///
/// # NOTE
///
/// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns.
///
pub(crate) async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_region_id: RegionId,
mut new_columns: Vec<ColumnMetadata>,
new_columns: &mut [ColumnMetadata],
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, &mut new_columns)
.add_columns(data_region_id, new_columns)
.await?;

// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

// safety: previous step has checked this
self.state.write().unwrap().add_physical_columns(
data_region_id,
Expand All @@ -291,6 +294,34 @@ impl MetricEngineInner {
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);

// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();

// double check to make sure column ids are not mismatched
// shouldn't be a expensive operation, given it only query for physical columns
for col in new_columns.iter_mut() {
let column_metadata = after_alter_physical_schema_map
.get(&col.column_schema.name.as_str())
.with_context(|| ColumnNotFoundSnafu {
name: &col.column_schema.name,
region_id: data_region_id,
})?;
if col != *column_metadata {
warn!(
"Add already existing columns with different column metadata to physical region({:?}): new column={:?}, old column={:?}",
data_region_id,
col,
column_metadata
);
// update to correct metadata
*col = (*column_metadata).clone();
}
}

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl MetadataRegion {
}

/// Check if the given column exists. Return the semantic type if exists.
#[cfg(test)]
pub async fn column_semantic_type(
&self,
physical_region_id: RegionId,
Expand Down Expand Up @@ -373,6 +374,7 @@ impl MetadataRegion {

/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
#[cfg(test)]
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
Expand Down
99 changes: 99 additions & 0 deletions tests/cases/standalone/common/alter/alter_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,102 @@ DROP TABLE test_alt_table;

Affected Rows: 0

-- to test if same name column can be added
CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = "");

Affected Rows: 0

CREATE TABLE t1 (
ts timestamp time index,
val double,
host string primary key
) engine = metric with ("on_physical_table" = "phy");

Affected Rows: 0

INSERT INTO
t1
VALUES
('host1', 0, 1),
('host2', 1, 0,);

Affected Rows: 2

SELECT
*
FROM
t1;

+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 0.0 |
| host1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------------------------+-----+

CREATE TABLE t2 (
ts timestamp time index,
job string primary key,
val double
) engine = metric with ("on_physical_table" = "phy");

Affected Rows: 0

ALTER TABLE
t1
ADD
COLUMN `at` STRING;

Affected Rows: 0

ALTER TABLE
t2
ADD
COLUMN at3 STRING;

Affected Rows: 0

ALTER TABLE
t2
ADD
COLUMN `at` STRING;

Affected Rows: 0

ALTER TABLE
t2
ADD
COLUMN at2 STRING;

Affected Rows: 0

INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 'job1', 0, 1);

Affected Rows: 1

SELECT
*
FROM
t2;

+-------+-------+-------+------+---------------------+-----+
| at | at2 | at3 | job | ts | val |
+-------+-------+-------+------+---------------------+-----+
| loc_1 | loc_2 | loc_3 | job1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------+-------+------+---------------------+-----+

DROP TABLE t1;

Affected Rows: 0

DROP TABLE t2;

Affected Rows: 0

DROP TABLE phy;

Affected Rows: 0

62 changes: 62 additions & 0 deletions tests/cases/standalone/common/alter/alter_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,65 @@ ALTER TABLE test_alt_table ADD COLUMN m INTEGER;
DESC TABLE test_alt_table;

DROP TABLE test_alt_table;

-- to test if same name column can be added
CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = "");

CREATE TABLE t1 (
ts timestamp time index,
val double,
host string primary key
) engine = metric with ("on_physical_table" = "phy");

INSERT INTO
t1
VALUES
('host1', 0, 1),
('host2', 1, 0,);

SELECT
*
FROM
t1;

CREATE TABLE t2 (
ts timestamp time index,
job string primary key,
val double
) engine = metric with ("on_physical_table" = "phy");

ALTER TABLE
t1
ADD
COLUMN `at` STRING;

ALTER TABLE
t2
ADD
COLUMN at3 STRING;

ALTER TABLE
t2
ADD
COLUMN `at` STRING;

ALTER TABLE
t2
ADD
COLUMN at2 STRING;

INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 'job1', 0, 1);

SELECT
*
FROM
t2;

DROP TABLE t1;

DROP TABLE t2;

DROP TABLE phy;