Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): flush_flow function #4416

Merged
merged 23 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5c801650435d464891114502539b701c77a1b914" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
None,
None,
None,
None,
false,
plugins.clone(),
));
Expand Down
1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
Expand Down
164 changes: 164 additions & 0 deletions src/common/function/src/flush_flow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType;

use crate::handlers::FlowServiceHandlerRef;

fn flush_signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

#[admin_fn(
name = FlushFlowFunction,
display_name = flush_flow,
sig_fn = flush_signature,
ret = uint64
)]
pub(crate) async fn flush_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;

let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;

Ok(Value::from(affected_rows))
}

fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;

let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
use session::context::QueryContext;

use super::*;
use crate::function::{Function, FunctionContext};

#[test]
fn test_flush_flow_metadata() {
let f = FlushFlowFunction;
assert_eq!("flush_flow", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert_eq!(
f.signature(),
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
);
}

#[test]
fn test_missing_flow_service() {
let f = FlushFlowFunction;

let args = vec!["flow_name"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing FlowServiceHandler, not expected",
result.to_string()
);
}

#[test]
fn test_parse_flow_args() {
let testcases = [
("flow_name", ("greptime", "flow_name")),
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();

let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}
}
discord9 marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 13 additions & 0 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ pub trait ProcedureServiceHandler: Send + Sync {
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}

/// This flow service handler is only use for flush flow for now.
#[async_trait]
pub trait FlowServiceHandler: Send + Sync {
async fn flush(
&self,
catalog: &str,
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}

pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;

pub type FlowServiceHandlerRef = Arc<dyn FlowServiceHandler>;
1 change: 1 addition & 0 deletions src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]

mod flush_flow;
mod macros;
pub mod scalars;
mod system;
Expand Down
20 changes: 18 additions & 2 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};

/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
Expand All @@ -22,6 +22,8 @@ pub struct FunctionState {
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
// The flownode handler
pub flow_service_handler: Option<FlowServiceHandlerRef>,
}

impl FunctionState {
Expand All @@ -42,9 +44,10 @@ impl FunctionState {
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};

use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
struct MockFlowServiceHandler;
const ROWS: usize = 42;

#[async_trait]
Expand Down Expand Up @@ -116,9 +119,22 @@ impl FunctionState {
}
}

#[async_trait]
impl FlowServiceHandler for MockFlowServiceHandler {
async fn flush(
&self,
_catalog: &str,
_flow: &str,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}

Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
}
}
}
10 changes: 2 additions & 8 deletions src/common/function/src/system/procedure_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;

#[derive(Serialize)]
Expand Down Expand Up @@ -103,6 +96,7 @@ mod tests {
use datatypes::vectors::StringVector;

use super::*;
use crate::function::{Function, FunctionContext};

#[test]
fn test_procedure_state_misc() {
Expand Down
2 changes: 2 additions & 0 deletions src/common/function/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;

use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;

/// Table functions
Expand All @@ -35,5 +36,6 @@ impl TableFunction {
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
registry.register(Arc::new(FlushFlowFunction));
}
}
10 changes: 2 additions & 8 deletions src/common/function/src/table/flush_compact_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;
use store_api::storage::RegionId;

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;

Expand Down Expand Up @@ -84,6 +77,7 @@ mod tests {
use datatypes::vectors::UInt64Vector;

use super::*;
use crate::function::{Function, FunctionContext};

macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
Expand Down
Loading