Skip to content

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed May 28, 2024
1 parent 760f643 commit 95d8509
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
15 changes: 15 additions & 0 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,18 @@ impl Database {
self.catalog = catalog.into();
}

pub fn catalog(&self) -> &String {
&self.catalog
}

pub fn set_schema(&mut self, schema: impl Into<String>) {
self.schema = schema.into();
}

pub fn schema(&self) -> &String {
&self.schema
}

pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}
Expand Down Expand Up @@ -155,6 +163,13 @@ impl Database {
.await
}

pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
.await
}

pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ mod tests {
use clap::Parser;

use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;

use crate::{App, cli, standalone};
Expand Down
48 changes: 38 additions & 10 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
Expand All @@ -38,12 +42,13 @@ use query::QueryEngine;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use session::context::QueryContext;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};

use crate::cli::cmd::ReplCommand;
use crate::cli::helper::RustylineHelper;
use crate::cli::AttachCommand;
use crate::error;
use crate::error::{
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
Expand Down Expand Up @@ -257,19 +262,42 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {

let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_list = KvBackendCatalogManager::new(
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);

let table_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let table_route_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;
let catalog_manager = KvBackendCatalogManager::new(
Mode::Distributed,
Some(meta_client.clone()),
cached_meta_backend.clone(),
multi_cache_invalidator,
table_cache,
table_route_cache,
)
.await;
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
catalog_manager,
None,
None,
None,
Expand Down
4 changes: 3 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ pub enum Error {
#[snafu(display("Failed to request database, sql: {sql}"))]
RequestDatabase {
sql: String,
location: Location,
#[snafu(source)]
source: client::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to collect RecordBatches"))]
Expand Down

0 comments on commit 95d8509

Please sign in to comment.