Skip to content

Commit

Permalink
feat: track channels with query context and w/rcu (#4448)
Browse files Browse the repository at this point in the history
* feat: add source channel to meter recorders

* feat: provide channel for query context

* fix: testing and extension get for query context

* chore: revert cargo toml structure changes

* fix: querycontext modification for prometheus and pipeline

* chore: switch git dependency to main branches

* chore: remove TODO

* refactor: rename other to unknown

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
  • Loading branch information
sunng87 and shuiyisong authored Jul 31, 2024
1 parent dd23d47 commit b741a71
Show file tree
Hide file tree
Showing 21 changed files with 193 additions and 96 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "049171eb16cb4249d8099751a0c46750d1fe88e7" }
mockall = "0.11.4"
moka = "0.12"
notify = "6.1"
Expand Down Expand Up @@ -238,7 +238,7 @@ table = { path = "src/table" }

[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
rev = "049171eb16cb4249d8099751a0c46750d1fe88e7"

[profile.release]
debug = 1
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ pub struct QueryContext {
current_schema: String,
timezone: String,
extensions: HashMap<String, String>,
channel: u8,
}

impl From<QueryContextRef> for QueryContext {
Expand All @@ -1088,6 +1089,7 @@ impl From<QueryContextRef> for QueryContext {
current_schema: query_context.current_schema().to_string(),
timezone: query_context.timezone().to_string(),
extensions: query_context.extensions(),
channel: query_context.channel() as u8,
}
}
}
Expand All @@ -1099,13 +1101,15 @@ impl From<QueryContext> for PbQueryContext {
current_schema,
timezone,
extensions,
channel,
}: QueryContext,
) -> Self {
PbQueryContext {
current_catalog,
current_schema,
timezone,
extensions,
channel: channel as u32,
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,12 @@ impl Inserter {
requests: RegionInsertRequests,
ctx: &QueryContextRef,
) -> Result<Output> {
let write_cost = write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
requests,
ctx.channel() as u8
);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
Expand Down
5 changes: 4 additions & 1 deletion src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl MergeScanExec {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_catalog = self.query_ctx.current_catalog().to_string();
let current_schema = self.query_ctx.current_schema().to_string();
let current_channel = self.query_ctx.channel();
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;
Expand Down Expand Up @@ -221,6 +222,7 @@ impl MergeScanExec {
current_schema: current_schema.clone(),
timezone: timezone.clone(),
extensions: extensions.clone(),
channel: current_channel as u32,
}),
}),
region_id,
Expand Down Expand Up @@ -271,7 +273,8 @@ impl MergeScanExec {
ReadItem {
cpu_time: metrics.elapsed_compute as u64,
table_scan: metrics.memory_usage as u64
}
},
current_channel as u8
);
metric.record_greptime_exec_cost(value as usize);

Expand Down
9 changes: 4 additions & 5 deletions src/servers/src/grpc/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::task::{Context, Poll};

use auth::UserProviderRef;
use hyper::Body;
use session::context::QueryContext;
use session::context::{Channel, QueryContext};
use tonic::body::BoxBody;
use tonic::server::NamedService;
use tower::{Layer, Service};
Expand Down Expand Up @@ -105,7 +104,7 @@ async fn do_auth<T>(
) -> Result<(), tonic::Status> {
let (catalog, schema) = extract_catalog_and_schema(req);

let query_ctx = Arc::new(QueryContext::with(&catalog, &schema));
let query_ctx = QueryContext::with_channel(&catalog, &schema, Channel::Grpc);

let Some(user_provider) = user_provider else {
query_ctx.set_current_user(auth::userinfo_by_name(None));
Expand Down Expand Up @@ -139,7 +138,7 @@ mod tests {
use base64::Engine;
use headers::Header;
use hyper::{Body, Request};
use session::context::QueryContextRef;
use session::context::QueryContext;

use crate::grpc::authorize::do_auth;
use crate::http::header::GreptimeDbName;
Expand Down Expand Up @@ -197,7 +196,7 @@ mod tests {
expected_schema: &str,
expected_user_name: &str,
) {
let ctx = req.extensions().get::<QueryContextRef>().unwrap();
let ctx = req.extensions().get::<QueryContext>().unwrap();
assert_eq!(expected_catalog, ctx.current_catalog());
assert_eq!(expected_schema, ctx.current_schema());

Expand Down
15 changes: 10 additions & 5 deletions src/servers/src/grpc/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::result::Result as StdResult;
use std::sync::Arc;

use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService;
use opentelemetry_proto::tonic::collector::metrics::v1::{
Expand All @@ -22,7 +23,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::Trac
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::QueryContextRef;
use session::context::{Channel, QueryContext};
use snafu::OptionExt;
use tonic::{Request, Response, Status};

Expand All @@ -47,10 +48,12 @@ impl TraceService for OtlpService {
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();

let ctx = extensions
.get::<QueryContextRef>()
let mut ctx = extensions
.get::<QueryContext>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);

let _ = self.handler.traces(req, ctx).await?;

Expand All @@ -68,10 +71,12 @@ impl MetricsService for OtlpService {
) -> StdResult<Response<ExportMetricsServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();

let ctx = extensions
.get::<QueryContextRef>()
let mut ctx = extensions
.get::<QueryContext>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);

let _ = self.handler.metrics(req, ctx).await?;

Expand Down
4 changes: 1 addition & 3 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use ::auth::UserProviderRef;
use axum::extract::State;
use axum::http::{self, Request, StatusCode};
Expand Down Expand Up @@ -68,7 +66,7 @@ pub async fn inner_auth<B>(
.current_schema(schema.clone())
.timezone(timezone);

let query_ctx = Arc::new(query_ctx_builder.build());
let query_ctx = query_ctx_builder.build();
let need_auth = need_auth(&req);

// 2. check if auth is needed
Expand Down
17 changes: 13 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use pipeline::{PipelineVersion, Value as PipelineValue};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Value};
use session::context::QueryContextRef;
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
Expand Down Expand Up @@ -107,7 +107,7 @@ where
pub async fn add_pipeline(
State(state): State<LogState>,
Path(pipeline_name): Path<String>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContext>,
PipelineContent(payload): PipelineContent,
) -> Result<GreptimedbManageResponse> {
let start = Instant::now();
Expand All @@ -126,6 +126,9 @@ pub async fn add_pipeline(
.build());
}

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let content_type = "yaml";
let result = handler
.insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
Expand All @@ -148,7 +151,7 @@ pub async fn add_pipeline(
#[axum_macros::debug_handler]
pub async fn delete_pipeline(
State(state): State<LogState>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContext>,
Query(query_params): Query<LogIngesterQueryParams>,
Path(pipeline_name): Path<String>,
) -> Result<GreptimedbManageResponse> {
Expand All @@ -167,6 +170,9 @@ pub async fn delete_pipeline(

let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?;

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

handler
.delete_pipeline(&pipeline_name, version, query_ctx)
.await
Expand Down Expand Up @@ -231,7 +237,7 @@ fn transform_ndjson_array_factory(
pub async fn log_ingester(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<HttpResponse> {
Expand All @@ -256,6 +262,9 @@ pub async fn log_ingester(

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

ingest_logs_inner(
handler,
pipeline_name,
Expand Down
13 changes: 10 additions & 3 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use aide::transform::TransformOperation;
Expand All @@ -29,7 +30,7 @@ use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use session::context::{Channel, QueryContext, QueryContextRef};

use super::header::collect_plan_metrics;
use crate::http::arrow_result::ArrowResponse;
Expand Down Expand Up @@ -70,13 +71,16 @@ pub struct SqlQuery {
pub async fn sql(
State(state): State<ApiState>,
Query(query_params): Query<SqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContext>,
Form(form_params): Form<SqlQuery>,
) -> HttpResponse {
let start = Instant::now();
let sql_handler = &state.sql_handler;
let db = query_ctx.get_db_string();

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
Expand Down Expand Up @@ -232,12 +236,15 @@ impl From<PromqlQuery> for PromQuery {
pub async fn promql(
State(state): State<ApiState>,
Query(params): Query<PromqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContext>,
) -> Response {
let sql_handler = &state.sql_handler;
let exec_start = Instant::now();
let db = query_ctx.get_db_string();

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
Expand Down
Loading

0 comments on commit b741a71

Please sign in to comment.