Skip to content

Commit

Permalink
style: refine the naming of acronyms (#5347)
Browse files Browse the repository at this point in the history
* refine a error message

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* rename MySQL to MySql

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* rename TTL to Ttl

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* minnor fixes

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* remove a deprecated item

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* refine some comments

Signed-off-by: TennyZhuang <zty0826@gmail.com>

* fix clippy

Signed-off-by: TennyZhuang <zty0826@gmail.com>

Signed-off-by: TennyZhuang <zty0826@gmail.com>
  • Loading branch information
TennyZhuang authored Sep 14, 2022
1 parent 7f363a4 commit f7eb24f
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 82 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl BatchTaskMetrics {

/// Following functions are used to custom executor level metrics.
// Each task execution has its own label:
// QueryID, StageId, TaskId
// QueryId, StageId, TaskId
pub fn task_labels(&self) -> HashMap<String, String> {
self.labels.clone()
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::catalog::Field;
use crate::error::ErrorCode;
use crate::types::DataType;

/// Column ID is the unique identifier of a column in a table. Different from table ID,
/// column ID is not globally unique.
/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub struct ColumnId(i32);

Expand All @@ -47,11 +47,13 @@ impl From<i32> for ColumnId {
Self::new(column_id)
}
}

impl From<ColumnId> for i32 {
fn from(id: ColumnId) -> i32 {
id.0
}
}

impl std::fmt::Display for ColumnId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
Expand Down
28 changes: 14 additions & 14 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use thiserror::Error;
pub use tracing;

use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
pub use crate::sink::mysql::{MySQLConfig, MySQLSink, MYSQL_SINK};
pub use crate::sink::mysql::{MySqlConfig, MySqlSink, MYSQL_SINK};
use crate::sink::redis::{RedisConfig, RedisSink};

#[async_trait]
Expand All @@ -49,7 +49,7 @@ pub trait Sink {

#[derive(Clone, Debug, EnumAsInner)]
pub enum SinkConfig {
Mysql(MySQLConfig),
Mysql(MySqlConfig),
Redis(RedisConfig),
Kafka(KafkaConfig),
}
Expand All @@ -69,7 +69,7 @@ impl SinkConfig {
.ok_or_else(|| SinkError::Config(format!("missing config: {}", SINK_TYPE_KEY)))?;
match sink_type.to_lowercase().as_str() {
KAFKA_SINK => Ok(SinkConfig::Kafka(KafkaConfig::from_hashmap(properties)?)),
MYSQL_SINK => Ok(SinkConfig::Mysql(MySQLConfig::from_hashmap(properties)?)),
MYSQL_SINK => Ok(SinkConfig::Mysql(MySqlConfig::from_hashmap(properties)?)),
_ => unimplemented!(),
}
}
Expand All @@ -85,31 +85,31 @@ impl SinkConfig {

#[derive(Debug)]
pub enum SinkImpl {
MySQL(Box<MySQLSink>),
MySql(Box<MySqlSink>),
Redis(Box<RedisSink>),
Kafka(Box<KafkaSink>),
}

impl SinkImpl {
pub async fn new(cfg: SinkConfig) -> Result<Self> {
Ok(match cfg {
SinkConfig::Mysql(cfg) => SinkImpl::MySQL(Box::new(MySQLSink::new(cfg).await?)),
SinkConfig::Mysql(cfg) => SinkImpl::MySql(Box::new(MySqlSink::new(cfg).await?)),
SinkConfig::Redis(cfg) => SinkImpl::Redis(Box::new(RedisSink::new(cfg)?)),
SinkConfig::Kafka(cfg) => SinkImpl::Kafka(Box::new(KafkaSink::new(cfg)?)),
})
}

pub fn needs_preparation(&self) -> bool {
match self {
SinkImpl::MySQL(_) => true,
SinkImpl::MySql(_) => true,
SinkImpl::Redis(_) => false,
SinkImpl::Kafka(_) => false,
}
}

pub async fn prepare(&mut self, schema: &Schema) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.prepare(schema).await,
SinkImpl::MySql(sink) => sink.prepare(schema).await,
_ => unreachable!(),
}
}
Expand All @@ -119,31 +119,31 @@ impl SinkImpl {
impl Sink for SinkImpl {
async fn write_batch(&mut self, chunk: StreamChunk, schema: &Schema) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.write_batch(chunk, schema).await,
SinkImpl::MySql(sink) => sink.write_batch(chunk, schema).await,
SinkImpl::Redis(sink) => sink.write_batch(chunk, schema).await,
SinkImpl::Kafka(sink) => sink.write_batch(chunk, schema).await,
}
}

async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.begin_epoch(epoch).await,
SinkImpl::MySql(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Redis(sink) => sink.begin_epoch(epoch).await,
SinkImpl::Kafka(sink) => sink.begin_epoch(epoch).await,
}
}

async fn commit(&mut self) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.commit().await,
SinkImpl::MySql(sink) => sink.commit().await,
SinkImpl::Redis(sink) => sink.commit().await,
SinkImpl::Kafka(sink) => sink.commit().await,
}
}

async fn abort(&mut self) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.abort().await,
SinkImpl::MySql(sink) => sink.abort().await,
SinkImpl::Redis(sink) => sink.abort().await,
SinkImpl::Kafka(sink) => sink.abort().await,
}
Expand All @@ -154,9 +154,9 @@ pub type Result<T> = std::result::Result<T, SinkError>;

#[derive(Error, Debug)]
pub enum SinkError {
#[error("MySQL error: {0}")]
MySQL(String),
#[error("MySQL inner error: {0}")]
#[error("MySql error: {0}")]
MySql(String),
#[error("MySql inner error: {0}")]
MySQLInner(#[from] mysql_async::Error),
#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),
Expand Down
Loading

0 comments on commit f7eb24f

Please sign in to comment.