Skip to content

Commit

Permalink
fix: no default otel endpoint for the operator (#154)
Browse files Browse the repository at this point in the history
* fix: no default otel endpoint for the operator

The operator no longer has a default opentelemetry endpoint and will
only collect and send traces if an endpoint is configured.

This might address the memory leak but either way its a nice quality of
life improvement as we have not been collecting traces from the operator
anyway.

* fix: enable logs even if otlp tracing export is disabled

With this change logs will still be printed to STDOUT even if no OTLP
endpoint is configured.
  • Loading branch information
nathanielc authored Feb 19, 2024
1 parent 7e5d9ad commit 039e15b
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 82 deletions.
17 changes: 2 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ tokio = { version = "1", features = ["full", "tracing"] }
tonic = { version = "0.8" }
tracing = "0.1.37"
tracing-opentelemetry = "0.22"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"tracing-log",
] }

[patch.crates-io]
goose = { git = "https://github.com/3box/goose.git", branch = "feat/all-metrics-digest" }
Expand Down
106 changes: 60 additions & 46 deletions common/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use opentelemetry_sdk::{
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
MeterProvider,
},
runtime, trace, Resource,
runtime, Resource,
};
use prometheus::{Encoder, TextEncoder};
use tokio::{sync::oneshot, task::JoinHandle};
Expand All @@ -25,55 +25,69 @@ use tracing_subscriber::{filter::LevelFilter, prelude::*, EnvFilter, Registry};
static PROM_REGISTRY: OnceLock<prometheus::Registry> = OnceLock::new();

/// Initialize tracing
pub async fn init_tracing(otlp_endpoint: String) -> Result<()> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint.clone()),
)
.with_trace_config(trace::config().with_resource(Resource::new(vec![
opentelemetry::KeyValue::new(
"hostname",
gethostname::gethostname()
.into_string()
.expect("hostname should be valid utf-8"),
),
opentelemetry::KeyValue::new("service.name", "keramik"),
])))
.install_batch(runtime::Tokio)?;

// Setup filters
// Default to INFO if no env is specified
pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
//// Setup log filter
//// Default to INFO if no env is specified
let log_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?;
let otlp_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?;

// Setup tracing layers
let telemetry = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(otlp_filter);
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.compact()
.with_filter(log_filter);

let collector = Registry::default().with(telemetry).with(logger);

#[cfg(feature = "tokio-console")]
let collector = {
let console_filter = EnvFilter::builder().parse("tokio=trace,runtime=trace")?;
let console_layer = console_subscriber::spawn().with_filter(console_filter);
collector.with(console_layer)
};

// Initialize tracing
tracing::subscriber::set_global_default(collector)?;

// If we have an otlp_endpoint setup export of traces
if let Some(otlp_endpoint) = otlp_endpoint {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint.clone()),
)
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![
opentelemetry::KeyValue::new(
"hostname",
gethostname::gethostname()
.into_string()
.expect("hostname should be valid utf-8"),
),
opentelemetry::KeyValue::new("service.name", "keramik"),
])),
)
.install_batch(runtime::Tokio)?;

// Setup otlp export filter
// Default to INFO if no env is specified
let otlp_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?;

// Setup tracing layers
let telemetry = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(otlp_filter);
// Setup logging to stdout
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.with_filter(log_filter);

let collector = Registry::default().with(telemetry).with(logger);

#[cfg(feature = "tokio-console")]
let collector = {
let console_filter = EnvFilter::builder().parse("tokio=trace,runtime=trace")?;
let console_layer = console_subscriber::spawn().with_filter(console_filter);
collector.with(console_layer)
};

tracing::subscriber::set_global_default(collector)?;
} else {
// Setup basic log only tracing
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.with_filter(log_filter);
tracing_subscriber::registry().with(logger).init()
}
Ok(())
}

Expand Down
3 changes: 0 additions & 3 deletions k8s/operator/manifests/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ spec:
containerPort: 9464
protocol: TCP
env:
# We are pointing to tempo or grafana tracing agent's otlp grpc receiver port
- name: OPERATOR_OTLP_ENDPOINT
value: "https://otel:4317"
- name: RUST_LOG
value: "info"
#readinessProbe:
Expand Down
2 changes: 0 additions & 2 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ controller = [
"dep:thiserror",
"dep:tokio",
"dep:tracing",
"dep:tracing-log",
# Enable keramik-common/telemetry feature if the controller is enabled.
"keramik-common/telemetry",
"kube/client",
Expand Down Expand Up @@ -69,7 +68,6 @@ serde_yaml = { version = "0.9.21", optional = true }
thiserror = { version = "1", optional = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
tracing-log = { workspace = true, optional = true }

[dev-dependencies]
expect-patch.workspace = true
Expand Down
14 changes: 5 additions & 9 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ use anyhow::Result;
use clap::{command, Parser, Subcommand};
use keramik_common::telemetry;
use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider};
use tracing::info;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Command,

#[arg(
long,
env = "OPERATOR_OTLP_ENDPOINT",
default_value = "http://localhost:4317"
)]
otlp_endpoint: String,
#[arg(long, env = "OPERATOR_OTLP_ENDPOINT")]
otlp_endpoint: Option<String>,

#[arg(long, env = "OPERATOR_PROM_BIND", default_value = "0.0.0.0:9464")]
prom_bind: String,
Expand All @@ -31,12 +28,11 @@ pub enum Command {

#[tokio::main]
async fn main() -> Result<()> {
tracing_log::LogTracer::init()?;

let args = Cli::parse();
telemetry::init_tracing(args.otlp_endpoint.clone()).await?;
telemetry::init_tracing(args.otlp_endpoint).await?;
let (metrics_controller, metrics_server_shutdown, metrics_server_join) =
telemetry::init_metrics_prom(&args.prom_bind.parse()?).await?;
info!("starting operator");

match args.command {
Command::Daemon => {
Expand Down
2 changes: 0 additions & 2 deletions runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ schemars = "0.8.12"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-log.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
multibase.workspace = true

[dev-dependencies]
test-log = "0.2"

5 changes: 2 additions & 3 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ pub enum CommandResult {

#[tokio::main]
async fn main() -> Result<()> {
tracing_log::LogTracer::init()?;

let args = Cli::parse();
telemetry::init_tracing(args.otlp_endpoint.clone()).await?;
telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?;
let metrics_controller = telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?;
info!("starting runner");

let meter = global::meter("keramik");
let runs = meter
Expand Down

0 comments on commit 039e15b

Please sign in to comment.