Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk-trace: add SimpleSpanProcessor #388

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.typelevel.otel4s.sdk.trace.data.EventData
import org.typelevel.otel4s.sdk.trace.data.LinkData
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.data.StatusData
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ trait SpanRef[F[_]] { self: Span.Backend[F] =>

/** Returns the name of the span.
*
* '''Note''': the name of the span can be changed during the lifetime of the
* span by using
* [[org.typelevel.otel4s.trace.Span.updateName Span.updateName]], so this
* value cannot be cached.
* @note
* the name of the span can be changed during the lifetime of the span by
* using [[org.typelevel.otel4s.trace.Span.updateName Span.updateName]], so
* this value cannot be cached.
*/
def name: F[String]

Expand All @@ -73,10 +73,11 @@ trait SpanRef[F[_]] { self: Span.Backend[F] =>
/** Returns the attribute value for the given `key`. Returns `None` if the key
* is absent in the storage.
*
* '''Note''': the attribute values can be changed during the lifetime of the
* span by using
* [[org.typelevel.otel4s.trace.Span.addAttribute Span.addAttribute]], so
* this value cannot be cached.
* @note
* the attribute values can be changed during the lifetime of the span by
* using
* [[org.typelevel.otel4s.trace.Span.addAttribute Span.addAttribute]], so
* this value cannot be cached.
*/
def getAttribute[A](key: AttributeKey[A]): F[Option[A]]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.MonadThrow
import cats.effect.std.Console
import cats.syntax.applicative._
import cats.syntax.applicativeError._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.trace.SpanContext

/** An implementation of the [[SpanProcessor]] that passes ended
* [[data.SpanData SpanData]] directly to the configured exporter.
*
* @note
* this processor exports spans individually upon completion, resulting in a
* single span per export request.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#simple-processor]]
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private (
exporter: SpanExporter[F],
exportOnlySampled: Boolean
) extends SpanProcessor[F] {

val name: String =
s"SimpleSpanProcessor{exporter=${exporter.name}, exportOnlySampled=$exportOnlySampled}"

val isStartRequired: Boolean = false
val isEndRequired: Boolean = true

def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] =
MonadThrow[F].unit

def onEnd(span: SpanData): F[Unit] = {
val canExport = !exportOnlySampled || span.spanContext.isSampled
doExport(span).whenA(canExport)
}

private def doExport(span: SpanData): F[Unit] =
exporter.exportSpans(List(span)).handleErrorWith { e =>
Console[F].errorln(
s"SimpleSpanExporter: the export has failed: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n"
)
}

def forceFlush: F[Unit] =
MonadThrow[F].unit
}

object SimpleSpanProcessor {

/** Creates a [[SimpleSpanProcessor]] that passes only '''sampled''' ended
* spans to the given `exporter`.
*
* @param exporter
* the [[exporter.SpanExporter SpanExporter]] to use
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F]
): SimpleSpanProcessor[F] =
apply(exporter, exportOnlySampled = true)

/** Creates a [[SimpleSpanProcessor]] that passes ended spans to the given
* `exporter`.
*
* @param exporter
* the [[exporter.SpanExporter SpanExporter]] to use
*
* @param exportOnlySampled
* whether to export only sampled spans
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F],
exportOnlySampled: Boolean
): SimpleSpanProcessor[F] =
new SimpleSpanProcessor[F](exporter, exportOnlySampled)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.typelevel.otel4s
package sdk.trace
package processor

import cats.Applicative
import cats.MonadThrow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.typelevel.otel4s.sdk.trace.data.EventData
import org.typelevel.otel4s.sdk.trace.data.LinkData
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.data.StatusData
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
import org.typelevel.otel4s.trace.Status
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.Foldable
import cats.effect.IO
import cats.syntax.foldable._
import cats.syntax.traverse._
import munit.CatsEffectSuite
import munit.ScalaCheckEffectSuite
import org.scalacheck.Arbitrary
import org.scalacheck.Test
import org.scalacheck.effect.PropF
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import org.typelevel.otel4s.meta.InstrumentMeta
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.exporter.InMemorySpanExporter
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
import org.typelevel.otel4s.trace.Status

import scala.concurrent.duration.FiniteDuration

class SimpleSpanProcessorSuite
extends CatsEffectSuite
with ScalaCheckEffectSuite {

private implicit val spanDataArbitrary: Arbitrary[SpanData] =
Arbitrary(Gens.spanData)

test("show details in the name") {
val exporter = new FailingExporter(
"error-prone",
new RuntimeException("something went wrong")
)

val processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)

val expected =
"SimpleSpanProcessor{exporter=error-prone, exportOnlySampled=false}"

assertEquals(processor.name, expected)
}

test("do nothing on start") {
PropF.forAllF { (spans: List[SpanData]) =>
for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true)
_ <- spans.traverse_(s => processor.onStart(None, constSpanRef(s)))
exported <- exporter.finishedSpans
} yield assert(exported.isEmpty)
}
}

test("export only sampled spans on end (exportOnlySampled = true)") {
PropF.forAllF { (spans: List[SpanData]) =>
val sampled = spans.filter(_.spanContext.isSampled)

for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true)
_ <- spans.traverse_(span => processor.onEnd(span))
exported <- exporter.finishedSpans
} yield assertEquals(exported, sampled)
}
}

test("export all spans on end (exportOnlySampled = false)") {
PropF.forAllF { (spans: List[SpanData]) =>
for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)
_ <- spans.traverse_(span => processor.onEnd(span))
exported <- exporter.finishedSpans
} yield assertEquals(
exported.map(_.spanContext.spanIdHex),
spans.map(_.spanContext.spanIdHex)
)
}
}

test("do not rethrow export errors") {
PropF.forAllF { (spans: List[SpanData]) =>
val error = new RuntimeException("something went wrong")
val exporter = new FailingExporter("error-prone", error)
val processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)

for {
attempts <- spans.traverse(span => processor.onEnd(span).attempt)
} yield assertEquals(attempts, List.fill(spans.size)(Right(())))
}
}

private def constSpanRef(data: SpanData): SpanRef[IO] = {
new SpanRef[IO] with Span.Backend[IO] {
private val noopBackend = Span.Backend.noop[IO]

def kind: SpanKind =
data.kind

def scopeInfo: InstrumentationScope =
data.instrumentationScope

def parentSpanContext: Option[SpanContext] =
data.parentSpanContext

def name: IO[String] =
IO(data.name)

def toSpanData: IO[SpanData] =
IO(data)

def hasEnded: IO[Boolean] =
IO(false)

def duration: IO[FiniteDuration] =
IO(data.startTimestamp)

def getAttribute[A](key: AttributeKey[A]): IO[Option[A]] =
IO(data.attributes.get(key).map(_.value))

// span.backend

val meta: InstrumentMeta[IO] =
noopBackend.meta

val context: SpanContext =
noopBackend.context

def updateName(name: String): IO[Unit] =
noopBackend.updateName(name)

def addAttributes(attributes: Attribute[_]*): IO[Unit] =
noopBackend.addAttributes(attributes: _*)

def addEvent(name: String, attributes: Attribute[_]*): IO[Unit] =
noopBackend.addEvent(name, attributes: _*)

def addEvent(
name: String,
timestamp: FiniteDuration,
attributes: Attribute[_]*
): IO[Unit] =
noopBackend.addEvent(name, timestamp, attributes: _*)

def recordException(
exception: Throwable,
attributes: Attribute[_]*
): IO[Unit] =
noopBackend.recordException(exception, attributes: _*)

def setStatus(status: Status): IO[Unit] =
noopBackend.setStatus(status)

def setStatus(status: Status, description: String): IO[Unit] =
noopBackend.setStatus(status, description)

private[otel4s] def end: IO[Unit] =
noopBackend.end

private[otel4s] def end(timestamp: FiniteDuration): IO[Unit] =
noopBackend.end(timestamp)
}
}

override protected def scalaCheckTestParameters: Test.Parameters =
super.scalaCheckTestParameters
.withMinSuccessfulTests(10)
.withMaxSize(10)

private class FailingExporter(
exporterName: String,
onExport: Throwable
) extends SpanExporter[IO] {
def name: String = exporterName

def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] =
IO.raiseError(onExport)

def flush: IO[Unit] =
IO.unit
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package org.typelevel.otel4s.sdk.trace.processor

import cats.data.NonEmptyList
import cats.effect.IO
import munit.FunSuite
import org.typelevel.otel4s.sdk.trace.SpanRef
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

Expand Down