From 95d8509fb9bc2436e57ab89dfea99268285e39df Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 28 May 2024 09:25:20 +0800 Subject: [PATCH] finish Signed-off-by: tison --- src/client/src/database.rs | 15 ++++++++++++ src/cmd/src/cli/export.rs | 2 +- src/cmd/src/cli/repl.rs | 48 ++++++++++++++++++++++++++++++-------- src/cmd/src/error.rs | 4 +++- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index b6e5b64a9394..e310a73e584d 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -104,10 +104,18 @@ impl Database { self.catalog = catalog.into(); } + pub fn catalog(&self) -> &String { + &self.catalog + } + pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } + pub fn schema(&self) -> &String { + &self.schema + } + pub fn set_timezone(&mut self, timezone: impl Into) { self.timezone = timezone.into(); } @@ -155,6 +163,13 @@ impl Database { .await } + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + self.do_get(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })) + .await + } + pub async fn create(&self, expr: CreateTableExpr) -> Result { self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 60c1039c9356..f3781c6844dc 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -441,7 +441,7 @@ mod tests { use clap::Parser; use client::{Client, Database}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::logging::LoggingOptions; use crate::{App, cli, standalone}; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6759a923fc43..a9e2e21967f9 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -16,14 +16,18 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use cache::{ + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, +}; use catalog::kvbackend::{ - CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, + CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, }; -use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_config::Mode; use common_error::ext::ErrorExt; -use common_meta::cache_invalidator::MultiCacheInvalidator; +use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; @@ -38,12 +42,13 @@ use query::QueryEngine; use rustyline::error::ReadlineError; use rustyline::Editor; use session::context::QueryContext; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use crate::cli::cmd::ReplCommand; use crate::cli::helper::RustylineHelper; use crate::cli::AttachCommand; +use crate::error; use crate::error::{ CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, @@ -257,19 +262,42 @@ async fn create_query_engine(meta_addr: &str) -> Result { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ - cached_meta_backend.clone(), - ])); - let catalog_list = KvBackendCatalogManager::new( + let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( + CacheRegistryBuilder::default() + .add_cache(cached_meta_backend.clone()) + .build(), + ); + let fundamental_cache_registry = + build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(error::BuildCacheRegistrySnafu)? + .build(), + ); + + let table_cache = layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_CACHE_NAME, + })?; + let table_route_cache = layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; + let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), - multi_cache_invalidator, + table_cache, + table_route_cache, ) .await; let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( - catalog_list, + catalog_manager, None, None, None, diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ea842d655ec8..a2a880fa6c1d 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -166,8 +166,10 @@ pub enum Error { #[snafu(display("Failed to request database, sql: {sql}"))] RequestDatabase { sql: String, - location: Location, + #[snafu(source)] source: client::Error, + #[snafu(implicit)] + location: Location, }, #[snafu(display("Failed to collect RecordBatches"))]