Skip to content

Commit

Permalink
Merge pull request #643 from tpolecat/otel
Browse files Browse the repository at this point in the history
Add OpenTelemetry support from #368
  • Loading branch information
mpilquist authored Nov 16, 2022
2 parents 231a6ea + 9b180e4 commit bcf9bbe
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 47 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ lazy val mock = project

lazy val examples = project
.in(file("modules/examples"))
.dependsOn(core.jvm, jaeger, honeycomb, lightstepHttp, datadog, newrelic, log.jvm)
.dependsOn(core.jvm, jaeger, honeycomb, lightstepHttp, datadog, newrelic, log.jvm, opentelemetry)
.enablePlugins(AutomateHeaderPlugin, NoPublishPlugin)
.settings(commonSettings)
.settings(
Expand All @@ -347,7 +347,9 @@ lazy val examples = project
"org.typelevel" %% "log4cats-slf4j" % "2.5.0",
"org.slf4j" % "slf4j-simple" % "2.0.3",
"eu.timepit" %% "refined" % "0.10.1",
"is.cir" %% "ciris" % "2.4.0"
"is.cir" %% "ciris" % "2.4.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp" % "1.12.0",
"io.grpc" % "grpc-okhttp" % "1.38.0", // required for the OpenTelemetry exporter
)
)

Expand All @@ -369,7 +371,7 @@ lazy val logOdin = project

lazy val docs = project
.in(file("modules/docs"))
.dependsOn(mtl.jvm, honeycomb, datadog, jaeger, log.jvm)
.dependsOn(mtl.jvm, honeycomb, datadog, jaeger, log.jvm, opentelemetry)
.enablePlugins(AutomateHeaderPlugin)
.enablePlugins(ParadoxPlugin)
.enablePlugins(ParadoxSitePlugin)
Expand Down Expand Up @@ -399,6 +401,7 @@ lazy val docs = project
"org.http4s" %% "http4s-client" % "0.23.15",
"org.typelevel" %% "log4cats-slf4j" % "2.4.0",
"org.slf4j" % "slf4j-simple" % "2.0.3",
"io.opentelemetry" % "opentelemetry-exporter-otlp" % "1.12.0", // for the opentelemetry example
),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_3", // pray this does more good than harm
)
1 change: 1 addition & 0 deletions modules/docs/src/main/paradox/backends/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Natchez supports the following tracing back ends. If you're not sure which one y
* [No-Op](noop.md)
* [Odin](odin.md)
* [OpenCensus](opencensus.md)
* [OpenTelemetry](opentelemetry.md)

@@@

Expand Down
86 changes: 86 additions & 0 deletions modules/docs/src/main/paradox/backends/opentelemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# OpenTelemetry

The `natchez-opentelemetry` module provides a backend that uses [OpenTelemetry](https://opentelemetry.io) to report spans.

To use it, add the following dependency.

@@dependency[sbt,Maven,Gradle] {
group="$org$"
artifact="natchez-opentelemetry-2.13"
version="$version$"
}

Then add any exporter, for example:

@@dependency[sbt,Maven,Gradle] {
group="io.opentelemetry"
artifact="opentelemetry-exporter-otlp"
version="1.12.0"
}

## Configuring an OpenTelemetry entrypoint

There are two methods you'll need to construct an `OpenTelemetry` `EndPoint`.

`OpenTelemetry.lift` is used to turn an `F[_]` that constructs a `SpanExporter`, `SpanProcessor` or `SdkTraceProvider` into a `Resource` that will shut it down cleanly.
This takes a `String` of what you've constructed, so we can give a nice error if it fails to shut down cleanly.

The `OpenTelemetry.entryPoint` method takes a boolean called `globallyRegister` which tells it whether to register this `OpenTelemetry` globally. This may be helpful if you have other Java dependencies that use the global tracer. It defaults to false.

It also takes an `OpenTelemetrySdkBuilder => Resource[F, OpenTelemetrySdkBuilder]` so that you can configure the Sdk.

Here's an example of configuring one with the `otlp` exporter with batch span processing:

```scala mdoc:passthrough
import natchez.EntryPoint
import natchez.opentelemetry.OpenTelemetry
import cats.effect._
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import io.opentelemetry.sdk.resources.{Resource => OtelResource}
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.BatchSpanProcessor

def entryPoint[F[_]: Async]: Resource[F, EntryPoint[F]] =
for {
exporter <- OpenTelemetry.lift(
"OtlpGrpcSpanExporter",
Sync[F].delay {
OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:4317")
.build()
}
)
processor <- OpenTelemetry.lift(
"BatchSpanProcessor",
Sync[F].delay {
BatchSpanProcessor.builder(exporter).build()
}
)
tracer <- OpenTelemetry.lift(
"Tracer",
Sync[F].delay {
SdkTracerProvider.builder()
.setResource(
OtelResource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "OpenTelemetryExample")
)
)
.addSpanProcessor(processor)
.build()
}
)
ep <- OpenTelemetry.entryPoint(globallyRegister = true) { builder =>
Resource.eval(Sync[F].delay {
builder
.setTracerProvider(tracer)
.setPropagators(
ContextPropagators.create(W3CTraceContextPropagator.getInstance())
)
}
)}
} yield ep
```
82 changes: 82 additions & 0 deletions modules/examples/src/main/scala/OpenTelemetryExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2019-2020 by Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT

import cats.data.Kleisli
import cats.effect._
import cats.implicits._
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.resources.{Resource => OtelResource}
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.BatchSpanProcessor
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import natchez.{EntryPoint, Span, Trace}
import natchez.opentelemetry.OpenTelemetry

import scala.concurrent.duration.DurationInt

// change this into an object if you'd like to run it
class OpenTelemetryExample extends IOApp {
def entryPoint[F[_]: Async]: Resource[F, EntryPoint[F]] =
for {
exporter <- OpenTelemetry.lift(
"OtlpGrpcSpanExporter",
Sync[F].delay {
OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:4317")
.build()
}
)
processor <- OpenTelemetry.lift(
"BatchSpanProcessor",
Sync[F].delay {
BatchSpanProcessor.builder(exporter).build()
}
)
tracer <- OpenTelemetry.lift(
"Tracer",
Sync[F].delay {
SdkTracerProvider.builder()
.setResource(
OtelResource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "OpenTelemetryExample")
)
)
.addSpanProcessor(processor)
.build()
}
)
ep <- OpenTelemetry.entryPoint(globallyRegister = true) { builder =>
Resource.eval(Sync[F].delay {
builder
.setTracerProvider(tracer)
.setPropagators(
ContextPropagators.create(W3CTraceContextPropagator.getInstance())
)
}
)}
} yield ep

override def run(args: List[String]): IO[ExitCode] =
entryPoint[IO].use { ep =>
ep.root("root span").use { span =>
span.put("service.name" -> "natchez opentelemetry example") *>
program[Kleisli[IO, Span[IO], *]].apply(span).as(ExitCode.Success)
}
}

def program[F[_]: Async: Trace]: F[Unit] =
Trace[F].traceId.flatTap(tid => Sync[F].delay { println(s"did some work with traceid of $tid") }) *>
Trace[F].span("outer span") {
Trace[F].put("foo" -> "bar") *>
(Trace[F].span("first thing") {
Temporal[F].sleep(2.seconds)
},
Trace[F].span("second thing") {
Temporal[F].sleep(2.seconds)
}).tupled
}.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,67 @@
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT

package natchez.opentelemetry
package natchez
package opentelemetry

import cats.syntax.all._
import cats.effect.{Resource, Sync}
import io.opentelemetry.sdk.OpenTelemetrySdk
import natchez.{EntryPoint, Kernel, Span}
import cats.effect.{Async, Resource, Sync}
import io.opentelemetry.api.{GlobalOpenTelemetry, OpenTelemetry => OTel}
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.sdk.{OpenTelemetrySdk, OpenTelemetrySdkBuilder}

import java.net.URI

object OpenTelemetry {
def entryPointForSdk[F[_] : Sync](sdk: OpenTelemetrySdk): F[EntryPoint[F]] =
Sync[F]
.delay(sdk.getTracer("natchez"))
.map { t =>
new EntryPoint[F] {
def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
Resource.makeCase(OpenTelemetrySpan.fromKernel(sdk, t, name, kernel))(OpenTelemetrySpan.finish).widen

def root(name: String): Resource[F, Span[F]] =
Resource.makeCase(OpenTelemetrySpan.root(sdk, t, name))(OpenTelemetrySpan.finish).widen

def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
Resource
.makeCase(OpenTelemetrySpan.fromKernelOrElseRoot(sdk, t, name, kernel))(OpenTelemetrySpan.finish)
.widen
private final val instrumentationName = "natchez.opentelemetry"

def entryPointFor[F[_] : Sync](otel: OTel): F[OpenTelemetryEntryPoint[F]] =
Sync[F].delay(otel.getTracer("natchez")).map(t =>
OpenTelemetryEntryPoint(otel, t, None)
)

def entryPointFor[F[_] : Sync](otel: OTel, tracer: Tracer, prefix: Option[URI]): OpenTelemetryEntryPoint[F] =
OpenTelemetryEntryPoint(otel, tracer, prefix)

def entryPoint[F[_] : Sync](uriPrefix: Option[URI] = None, globallyRegister: Boolean = false)(
configure: OpenTelemetrySdkBuilder => Resource[F, OpenTelemetrySdkBuilder]
): Resource[F, EntryPoint[F]] = {
val register: OpenTelemetrySdkBuilder => Resource[F, (OpenTelemetrySdk, Tracer)] = { b =>
Resource.make(
Sync[F].delay {
val sdk = if (globallyRegister)
b.buildAndRegisterGlobal()
else
b.build()
val tracer = sdk.getTracer(instrumentationName)
(sdk, tracer)
}
) { case (_, _) =>
Sync[F].delay {
if (globallyRegister)
GlobalOpenTelemetry.resetForTest() // this seems to be the only way to deregister it
}
}
}
Resource.eval(Sync[F].delay { OpenTelemetrySdk.builder() })
.flatMap(configure)
.flatMap(register)
.map { case (sdk, tracer) =>
OpenTelemetryEntryPoint(sdk, tracer, uriPrefix)
}
}

def globalEntryPoint[F[_]: Sync](uriPrefix: Option[URI] = None): F[OpenTelemetryEntryPoint[F]] =
Sync[F].delay {
val ot = GlobalOpenTelemetry.get()
OpenTelemetryEntryPoint(ot, ot.getTracer(instrumentationName), uriPrefix)
}

// Helper methods to help you construct Otel resources that clean themselves up
// We need a name so the failure error can contain something useful
def lift[F[_]: Async, T: Shutdownable](name: String, create: F[T]): Resource[F, T] =
Resource.make(create) { t =>
Sync[F].delay { Shutdownable[T].shutdown(t) }
.flatMap(Utils.asyncFromCompletableResultCode(s"$name cleanup", _))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2019-2020 by Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT

package natchez
package opentelemetry

import cats.effect.{Resource, Sync}
import cats.syntax.all._
import io.opentelemetry.api.{OpenTelemetry => OTel}
import io.opentelemetry.api.trace.Tracer

import java.net.URI

final case class OpenTelemetryEntryPoint[F[_] : Sync](sdk: OTel, tracer: Tracer, prefix: Option[URI]) extends EntryPoint[F] {
override def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
Resource.makeCase(OpenTelemetrySpan.fromKernel(sdk, tracer, prefix, name, kernel))(OpenTelemetrySpan.finish).widen

override def root(name: String): Resource[F, Span[F]] =
Resource.makeCase(OpenTelemetrySpan.root(sdk, tracer, prefix, name))(OpenTelemetrySpan.finish).widen

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
Resource
.makeCase(OpenTelemetrySpan.fromKernelOrElseRoot(sdk, tracer, prefix, name, kernel))(OpenTelemetrySpan.finish)
.widen
}
Loading

0 comments on commit bcf9bbe

Please sign in to comment.