Skip to content

Commit

Permalink
Support ShowVariable Statement (#3455)
Browse files Browse the repository at this point in the history
* support "SHOW VARIABLE;"

* fix test case

* fix comment

* fix clippy

* rename settings -> df_settings
  • Loading branch information
waitingkuo authored Sep 14, 2022
1 parent 9fbee1a commit 0388682
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 83 deletions.
82 changes: 79 additions & 3 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::{
sync::{Arc, Weak},
};

use parking_lot::RwLock;

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
Expand All @@ -39,26 +41,32 @@ use super::{
schema::SchemaProvider,
};

use crate::config::ConfigOptions;

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const DF_SETTINGS: &str = "df_settings";

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -79,9 +87,13 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider { catalog_list })
as Arc<dyn SchemaProvider>
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
catalog_list,
config_options,
}) as Arc<dyn SchemaProvider>
})
})
} else {
self.inner.schema(name)
Expand All @@ -106,6 +118,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaProvider {
Expand Down Expand Up @@ -141,6 +154,12 @@ impl InformationSchemaProvider {
COLUMNS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
DF_SETTINGS,
TableType::View,
);
}

let mem_table: MemTable = builder.into();
Expand Down Expand Up @@ -206,6 +225,19 @@ impl InformationSchemaProvider {

Arc::new(mem_table)
}

/// Construct the `information_schema.df_settings` virtual table
fn make_df_settings(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaDfSettingsBuilder::new();

for (name, setting) in self.config_options.read().options() {
builder.add_setting(name, setting.to_string());
}

let mem_table: MemTable = builder.into();

Arc::new(mem_table)
}
}

impl SchemaProvider for InformationSchemaProvider {
Expand All @@ -224,6 +256,8 @@ impl SchemaProvider for InformationSchemaProvider {
Some(self.make_columns())
} else if name.eq_ignore_ascii_case("views") {
Some(self.make_views())
} else if name.eq_ignore_ascii_case("df_settings") {
Some(self.make_df_settings())
} else {
None
}
Expand Down Expand Up @@ -579,3 +613,45 @@ impl From<InformationSchemaColumnsBuilder> for MemTable {
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}

struct InformationSchemaDfSettingsBuilder {
names: StringBuilder,
settings: StringBuilder,
}

impl InformationSchemaDfSettingsBuilder {
fn new() -> Self {
Self {
names: StringBuilder::new(),
settings: StringBuilder::new(),
}
}

fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) {
self.names.append_value(name.as_ref());
self.settings.append_value(setting.as_ref());
}
}

impl From<InformationSchemaDfSettingsBuilder> for MemTable {
fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable {
let schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("setting", DataType::Utf8, false),
]);

let InformationSchemaDfSettingsBuilder {
mut names,
mut settings,
} = value;

let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(names.finish()), Arc::new(settings.finish())],
)
.unwrap();

MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
30 changes: 17 additions & 13 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ impl SessionContext {
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
Arc::downgrade(&state.config.config_options),
catalog,
))
} else {
Expand Down Expand Up @@ -1130,7 +1131,7 @@ pub struct SessionConfig {
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
/// Configuration options
pub config_options: ConfigOptions,
pub config_options: Arc<RwLock<ConfigOptions>>,
/// Opaque extensions.
extensions: AnyMap,
}
Expand All @@ -1147,7 +1148,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
config_options: ConfigOptions::new(),
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
Expand All @@ -1166,14 +1167,14 @@ impl SessionConfig {
/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: ConfigOptions::from_env(),
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
..Default::default()
}
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
pub fn set(self, key: &str, value: ScalarValue) -> Self {
self.config_options.write().set(key, value);
self
}

Expand Down Expand Up @@ -1252,22 +1253,18 @@ impl SessionConfig {
/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.config_options
.read()
.get_u64(OPT_BATCH_SIZE)
.try_into()
.unwrap()
}

/// Get the current configuration options
pub fn config_options(&self) -> &ConfigOptions {
&self.config_options
}

/// Convert configuration options to name-value pairs with values converted to strings. Note
/// that this method will eventually be deprecated and replaced by [config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
for (k, v) in self.config_options.read().options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
Expand Down Expand Up @@ -1420,6 +1417,7 @@ impl SessionState {
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Arc::downgrade(&config.config_options),
Arc::new(default_catalog),
))
} else {
Expand All @@ -1444,7 +1442,11 @@ impl SessionState {
Arc::new(ProjectionPushDown::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
];
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
if config
.config_options
.read()
.get_bool(OPT_FILTER_NULL_JOIN_KEYS)
{
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
Expand All @@ -1457,10 +1459,11 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
if config.config_options.read().get_bool(OPT_COALESCE_BATCHES) {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.read()
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.try_into()
.unwrap(),
Expand Down Expand Up @@ -1564,6 +1567,7 @@ impl SessionState {
let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules(
self.config
.config_options
.read()
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
);
optimizer_config.query_execution_start_time =
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
{
stringified_plans = e.stringified_plans.clone();
Expand All @@ -1583,6 +1584,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
{
let input = self
Expand Down
Loading

0 comments on commit 0388682

Please sign in to comment.