Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a query planner for OxQL #7152

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ tar = "0.4"
tempfile = "3.10"
term = "0.7"
termios = "0.3"
termtree = "0.5.1"
textwrap = "0.16.1"
test-strategy = "0.3.1"
thiserror = "1.0"
Expand Down
14 changes: 11 additions & 3 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ async fn test_instance_watcher_metrics(
const STATE_STARTING: &str = "starting";
const STATE_RUNNING: &str = "running";
const STATE_STOPPING: &str = "stopping";
const OXQL_QUERY: &str = "get virtual_machine:check";
const OXQL_QUERY: &str = "get virtual_machine:check | \
filter timestamp > @2000-01-01";

let client = &cptestctx.external_client;
let internal_client = &cptestctx.internal_client;
Expand Down Expand Up @@ -707,7 +708,13 @@ async fn test_project_timeseries_query(
// fields are. This is helpful generally, but here it would be better if
// we could say something more like "you can't query this timeseries from
// this endpoint"
assert_eq!(result.message, "The filter expression contains identifiers that are not valid for its input timeseries. Invalid identifiers: [\"project_id\", \"silo_id\"], timeseries fields: {\"datum\", \"metric_name\", \"target_name\", \"timestamp\"}");
const EXPECTED_ERROR_MESSAGE: &str = "\
The filter expression refers to \
identifiers that are not valid for its input \
table \"integration_target:integration_metric\". \
Invalid identifiers: [\"silo_id\", \"project_id\"], \
valid identifiers: [\"datum\", \"metric_name\", \"target_name\", \"timestamp\"]";
assert!(result.message.ends_with(EXPECTED_ERROR_MESSAGE));

// nonexistent project
let url = "/v1/timeseries/query?project=nonexistent";
Expand Down Expand Up @@ -872,7 +879,8 @@ async fn test_mgs_metrics(
return;
}

let query = format!("get {metric_name}");
let query =
format!("get {metric_name} | filter timestamp > @2000-01-01");

// MGS polls SP sensor data once every second. It's possible that, when
// we triggered Oximeter to collect samples from MGS, it may not have
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
strum.workspace = true
termtree.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
usdt.workspace = true
Expand Down
54 changes: 47 additions & 7 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::oxql::ast::table_ops::filter;
use crate::oxql::ast::table_ops::filter::Filter;
use crate::oxql::ast::table_ops::limit::Limit;
use crate::oxql::ast::table_ops::limit::LimitKind;
use crate::oxql::Query;
use crate::query::field_table_name;
use crate::Error;
use crate::Metric;
Expand Down Expand Up @@ -113,6 +114,34 @@ struct ConsistentKeyGroup {
}

impl Client {
/// Build a query plan for the OxQL query.
pub async fn plan_oxql_query(
&self,
query: impl AsRef<str>,
) -> Result<oxql::plan::Plan, Error> {
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
self.build_query_plan(&parsed_query).await
}

/// Build a query plan for the OxQL query.
async fn build_query_plan(
&self,
query: &Query,
) -> Result<oxql::plan::Plan, Error> {
let referenced_timeseries = query.all_timeseries_names();
let mut schema = BTreeMap::new();
for name in referenced_timeseries.into_iter() {
let Some(sch) = self.schema_for_timeseries(name).await? else {
return Err(Error::TimeseriesNotFound(name.to_string()));
};
schema.insert(name.clone(), sch);
}
let plan =
oxql::plan::Plan::new(query.parsed_query().clone(), &schema)?;
Ok(plan)
}

/// Run a OxQL query.
pub async fn oxql_query(
&self,
Expand All @@ -132,6 +161,15 @@ impl Client {
// See https://github.com/oxidecomputer/omicron/issues/5298.
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
let plan = self.build_query_plan(&parsed_query).await?;
if plan.requires_full_table_scan() {
return Err(Error::Oxql(anyhow::anyhow!(
"This query requires at least one full table scan. \
Please rewrite the query to filter either the fields \
or timestamps, in order to reduce the amount of data \
fetched from the database."
)));
}
let query_id = Uuid::new_v4();
let query_log =
self.log.new(slog::o!("query_id" => query_id.to_string()));
Expand Down Expand Up @@ -837,12 +875,12 @@ impl Client {
// return.
//
// This is used to ensure that we never go above the limit in
// `MAX_RESULT_SIZE`. That restricts the _total_ number of rows we want
// to retch from the database. So we set our limit to be one more than
// the remainder on our allotment. If we get exactly as many as we set
// in the limit, then we fail the query because there are more rows that
// _would_ be returned. We don't know how many more, but there is at
// least 1 that pushes us over the limit. This prevents tricky
// `MAX_DATABASE_ROWS`. That restricts the _total_ number of rows we
// want to retch from the database. So we set our limit to be one more
// than the remainder on our allotment. If we get exactly as many as we
// set in the limit, then we fail the query because there are more row
// that _would_ be returned. We don't know how many more, but there is
// at least 1 that pushes us over the limit. This prevents tricky
// TOCTOU-like bugs where we need to check the limit twice, and improves
// performance, since we don't return much more than we could possibly
// handle.
Expand Down Expand Up @@ -1293,7 +1331,9 @@ mod tests {
#[tokio::test]
async fn test_get_entire_table() {
let ctx = setup_oxql_test("test_get_entire_table").await;
let query = "get some_target:some_metric";
// We need _some_ filter here to avoid a provable full-table scan.
let query =
"get some_target:some_metric | filter timestamp > @2020-01-01";
let result = ctx
.client
.oxql_query(query)
Expand Down
5 changes: 5 additions & 0 deletions oximeter/db/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ pub enum Error {

#[error("Expected an empty data block")]
ExpectedEmptyDataBlock,

#[error(
"A query unexpectedly resulted in an empty data block; query: {query}"
)]
UnexpectedEmptyBlock { query: String },
}

impl Error {
Expand Down
31 changes: 31 additions & 0 deletions oximeter/db/src/oxql/ast/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::DateTime;
use chrono::Utc;
use oximeter::FieldType;
use oximeter::FieldValue;
use oxql_types::point::DataType;
use regex::Regex;
use std::borrow::Borrow;
use std::fmt;
Expand All @@ -35,6 +36,20 @@ pub enum Literal {
}

impl Literal {
// Return the name of this literal's type as a string.
pub(crate) fn type_name(&self) -> &'static str {
match self {
Literal::Integer(_) => "Integer",
Literal::Double(_) => "Double",
Literal::String(_) => "String",
Literal::Boolean(_) => "Boolean",
Literal::Uuid(_) => "Uuid",
Literal::Duration(_) => "Duration",
Literal::Timestamp(_) => "Timestamp",
Literal::IpAddr(_) => "IpAddr",
}
}

// Format the literal as a safe, typed string for ClickHouse.
pub(crate) fn as_db_safe_string(&self) -> String {
match self {
Expand Down Expand Up @@ -93,6 +108,22 @@ impl Literal {
}
}

// Return true if this literal can be compared to a datum of the provided
// type.
pub(crate) fn is_compatible_with_datum(&self, data_type: DataType) -> bool {
match (self, data_type) {
(Literal::Integer(_), DataType::Integer)
| (Literal::Double(_), DataType::Double)
| (Literal::String(_), DataType::String)
| (Literal::Boolean(_), DataType::Boolean)
| (Literal::Duration(_), DataType::Integer)
| (Literal::Duration(_), DataType::Double)
| (Literal::Timestamp(_), DataType::Integer)
| (Literal::Timestamp(_), DataType::Double) => true,
(_, _) => false,
}
}

/// Apply the comparison op between self and the provided field.
///
/// Return None if the comparison cannot be applied, either because the type
Expand Down
49 changes: 49 additions & 0 deletions oximeter/db/src/oxql/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

// Copyright 2024 Oxide Computer Company

use std::collections::BTreeSet;
use std::fmt;

use chrono::DateTime;
use chrono::Utc;
use oximeter::TimeseriesName;
Expand All @@ -26,12 +29,32 @@ pub struct Query {
ops: Vec<TableOp>,
}

impl fmt::Display for Query {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let n_ops = self.ops.len();
for (i, op) in self.ops.iter().enumerate() {
write!(f, "{op}")?;
if i < n_ops - 1 {
write!(f, " | ")?;
}
}
Ok(())
}
}

impl Query {
// Return the first operation in the query, which is always a form of `get`.
fn first_op(&self) -> &TableOp {
self.ops.first().expect("Should have parsed at least 1 operation")
}

/// Iterate over the table operations.
pub(crate) fn table_ops(
&self,
) -> impl ExactSizeIterator<Item = &'_ TableOp> + '_ {
self.ops.iter()
}

pub(crate) fn timeseries_name(&self) -> &TimeseriesName {
match self.first_op() {
TableOp::Basic(BasicTableOp::Get(n)) => n,
Expand All @@ -42,6 +65,32 @@ impl Query {
}
}

/// Return _all_ timeseries names referred to by get table operations.
pub(crate) fn all_timeseries_names(&self) -> BTreeSet<&TimeseriesName> {
let mut set = BTreeSet::new();
self.all_timeseries_names_impl(&mut set);
set
}

fn all_timeseries_names_impl<'a>(
&'a self,
set: &mut BTreeSet<&'a TimeseriesName>,
) {
for op in self.ops.iter() {
match op {
TableOp::Basic(BasicTableOp::Get(name)) => {
set.insert(name);
}
TableOp::Basic(_) => {}
TableOp::Grouped(GroupedTableOp { ops }) => {
for query in ops.iter() {
query.all_timeseries_names_impl(set);
}
}
}
}
}

// Check that this query (and any subqueries) start with a get table op, and
// that there are no following get operations. I.e., we have:
//
Expand Down
24 changes: 22 additions & 2 deletions oximeter/db/src/oxql/ast/table_ops/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use oxql_types::point::Values;
use oxql_types::Alignment;
use oxql_types::Table;
use oxql_types::Timeseries;
use std::fmt;
use std::time::Duration;

// The maximum factor by which an alignment operation may upsample data.
Expand Down Expand Up @@ -68,7 +69,7 @@ fn verify_max_upsampling_ratio(
///
/// Alignment is used to produce data at the defined timestamps, so that samples
/// from multiple timeseries may be combined or correlated in meaningful ways.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Align {
/// The alignment method, used to describe how data over the input period
/// is used to generate an output sample.
Expand All @@ -87,6 +88,16 @@ pub struct Align {
pub period: Duration,
}

impl std::fmt::Display for Align {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let name = match self.method {
AlignmentMethod::Interpolate => "interpolate",
AlignmentMethod::MeanWithin => "mean_within",
};
write!(f, "{}({:?})", name, self.period)
}
}

impl Align {
// Apply the alignment function to the set of tables.
pub(crate) fn apply(
Expand All @@ -108,7 +119,7 @@ impl Align {
}

/// An alignment method.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AlignmentMethod {
/// Alignment is done by interpolating the output data at the specified
/// period.
Expand All @@ -118,6 +129,15 @@ pub enum AlignmentMethod {
MeanWithin,
}

impl fmt::Display for AlignmentMethod {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlignmentMethod::Interpolate => write!(f, "interpolate"),
AlignmentMethod::MeanWithin => write!(f, "mean_within"),
}
}
}

// Align the timeseries in a table by computing the average within each output
// period.
fn align_mean_within(
Expand Down
Loading
Loading