Skip to content

Commit

Permalink
sdk-trace: add SimpleSpanProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Dec 16, 2023
1 parent 208574d commit cb534e6
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.typelevel.otel4s.sdk.trace.data.EventData
import org.typelevel.otel4s.sdk.trace.data.LinkData
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.data.StatusData
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ trait SpanRef[F[_]] { self: Span.Backend[F] =>

/** Returns the name of the span.
*
* '''Note''': the name of the span can be changed during the lifetime of the
* span by using
* [[org.typelevel.otel4s.trace.Span.updateName Span.updateName]], so this
* value cannot be cached.
* @note
* the name of the span can be changed during the lifetime of the span by
* using [[org.typelevel.otel4s.trace.Span.updateName Span.updateName]], so
* this value cannot be cached.
*/
def name: F[String]

Expand All @@ -73,10 +73,11 @@ trait SpanRef[F[_]] { self: Span.Backend[F] =>
/** Returns the attribute value for the given `key`. Returns `None` if the key
* is absent in the storage.
*
* '''Note''': the attribute values can be changed during the lifetime of the
* span by using
* [[org.typelevel.otel4s.trace.Span.addAttribute Span.addAttribute]], so
* this value cannot be cached.
* @note
* the attribute values can be changed during the lifetime of the span by
* using
* [[org.typelevel.otel4s.trace.Span.addAttribute Span.addAttribute]], so
* this value cannot be cached.
*/
def getAttribute[A](key: AttributeKey[A]): F[Option[A]]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.MonadThrow
import cats.effect.std.Console
import cats.syntax.applicative._
import cats.syntax.applicativeError._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.trace.SpanContext

/** An implementation of the [[SpanProcessor]] that passes ended
* [[data.SpanData SpanData]] directly to the configured exporter.
*
* @note
* this processor exports spans individually upon completion, resulting in a
* single span per export request.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#simple-processor]]
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private (
exporter: SpanExporter[F],
exportOnlySampled: Boolean
) extends SpanProcessor[F] {

val name: String =
s"SimpleSpanProcessor{exporter=${exporter.name}, exportOnlySampled=$exportOnlySampled}"

val isStartRequired: Boolean = false
val isEndRequired: Boolean = true

def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] =
MonadThrow[F].unit

def onEnd(span: SpanData): F[Unit] = {
val canExport = !exportOnlySampled || span.spanContext.isSampled
doExport(span).whenA(canExport)
}

private def doExport(span: SpanData): F[Unit] =
exporter.exportSpans(List(span)).handleErrorWith { e =>
Console[F].errorln(
s"SimpleSpanExporter: the export has failed: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n"
)
}

def forceFlush: F[Unit] =
MonadThrow[F].unit
}

object SimpleSpanProcessor {

/** Creates a [[SimpleSpanProcessor]] that passes only '''sampled''' ended
* spans to the given `exporter`.
*
* @param exporter
* the [[exporter.SpanExporter SpanExporter]] to use
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F]
): SimpleSpanProcessor[F] =
apply(exporter, exportOnlySampled = true)

/** Creates a [[SimpleSpanProcessor]] that passes ended spans to the given
* `exporter`.
*
* @param exporter
* the [[exporter.SpanExporter SpanExporter]] to use
*
* @param exportOnlySampled
* whether to export only sampled spans
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F],
exportOnlySampled: Boolean
): SimpleSpanProcessor[F] =
new SimpleSpanProcessor[F](exporter, exportOnlySampled)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.typelevel.otel4s
package sdk.trace
package processor

import cats.Applicative
import cats.MonadThrow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.typelevel.otel4s.sdk.trace.data.EventData
import org.typelevel.otel4s.sdk.trace.data.LinkData
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.data.StatusData
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
import org.typelevel.otel4s.trace.Status
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.Foldable
import cats.effect.IO
import cats.syntax.foldable._
import cats.syntax.traverse._
import munit.CatsEffectSuite
import munit.ScalaCheckEffectSuite
import org.scalacheck.Arbitrary
import org.scalacheck.Test
import org.scalacheck.effect.PropF
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import org.typelevel.otel4s.meta.InstrumentMeta
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.exporter.InMemorySpanExporter
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanContext
import org.typelevel.otel4s.trace.SpanKind
import org.typelevel.otel4s.trace.Status

import scala.concurrent.duration.FiniteDuration

class SimpleSpanProcessorSuite
extends CatsEffectSuite
with ScalaCheckEffectSuite {

private implicit val spanDataArbitrary: Arbitrary[SpanData] =
Arbitrary(Gens.spanData)

test("show details in the name") {
val exporter = new FailingExporter(
"error-prone",
new RuntimeException("something went wrong")
)

val processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)

val expected =
"SimpleSpanProcessor{exporter=error-prone, exportOnlySampled=false}"

assertEquals(processor.name, expected)
}

test("do nothing on start") {
PropF.forAllF { (spans: List[SpanData]) =>
for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true)
_ <- spans.traverse_(s => processor.onStart(None, constSpanRef(s)))
exported <- exporter.finishedSpans
} yield assert(exported.isEmpty)
}
}

test("export only sampled spans on end (exportOnlySampled = true)") {
PropF.forAllF { (spans: List[SpanData]) =>
val sampled = spans.filter(_.spanContext.isSampled)

for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true)
_ <- spans.traverse_(span => processor.onEnd(span))
exported <- exporter.finishedSpans
} yield assertEquals(exported, sampled)
}
}

test("export all spans on end (exportOnlySampled = false)") {
PropF.forAllF { (spans: List[SpanData]) =>
for {
exporter <- InMemorySpanExporter.create[IO](None)
processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)
_ <- spans.traverse_(span => processor.onEnd(span))
exported <- exporter.finishedSpans
} yield assertEquals(
exported.map(_.spanContext.spanIdHex),
spans.map(_.spanContext.spanIdHex)
)
}
}

test("do not rethrow export errors") {
PropF.forAllF { (spans: List[SpanData]) =>
val error = new RuntimeException("something went wrong")
val exporter = new FailingExporter("error-prone", error)
val processor = SimpleSpanProcessor(exporter, exportOnlySampled = false)

for {
attempts <- spans.traverse(span => processor.onEnd(span).attempt)
} yield assertEquals(attempts, List.fill(spans.size)(Right(())))
}
}

private def constSpanRef(data: SpanData): SpanRef[IO] = {
new SpanRef[IO] with Span.Backend[IO] {
private val noopBackend = Span.Backend.noop[IO]

def kind: SpanKind =
data.kind

def scopeInfo: InstrumentationScope =
data.instrumentationScope

def parentSpanContext: Option[SpanContext] =
data.parentSpanContext

def name: IO[String] =
IO(data.name)

def toSpanData: IO[SpanData] =
IO(data)

def hasEnded: IO[Boolean] =
IO(false)

def duration: IO[FiniteDuration] =
IO(data.startTimestamp)

def getAttribute[A](key: AttributeKey[A]): IO[Option[A]] =
IO(data.attributes.get(key).map(_.value))

// span.backend

val meta: InstrumentMeta[IO] =
noopBackend.meta

val context: SpanContext =
noopBackend.context

def updateName(name: String): IO[Unit] =
noopBackend.updateName(name)

def addAttributes(attributes: Attribute[_]*): IO[Unit] =
noopBackend.addAttributes(attributes: _*)

def addEvent(name: String, attributes: Attribute[_]*): IO[Unit] =
noopBackend.addEvent(name, attributes: _*)

def addEvent(
name: String,
timestamp: FiniteDuration,
attributes: Attribute[_]*
): IO[Unit] =
noopBackend.addEvent(name, timestamp, attributes: _*)

def recordException(
exception: Throwable,
attributes: Attribute[_]*
): IO[Unit] =
noopBackend.recordException(exception, attributes: _*)

def setStatus(status: Status): IO[Unit] =
noopBackend.setStatus(status)

def setStatus(status: Status, description: String): IO[Unit] =
noopBackend.setStatus(status, description)

private[otel4s] def end: IO[Unit] =
noopBackend.end

private[otel4s] def end(timestamp: FiniteDuration): IO[Unit] =
noopBackend.end(timestamp)
}
}

override protected def scalaCheckTestParameters: Test.Parameters =
super.scalaCheckTestParameters
.withMinSuccessfulTests(10)
.withMaxSize(10)

private class FailingExporter(
exporterName: String,
onExport: Throwable
) extends SpanExporter[IO] {
def name: String = exporterName

def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] =
IO.raiseError(onExport)

def flush: IO[Unit] =
IO.unit
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package org.typelevel.otel4s.sdk.trace.processor

import cats.data.NonEmptyList
import cats.effect.IO
import munit.FunSuite
import org.typelevel.otel4s.sdk.trace.SpanRef
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

Expand Down

0 comments on commit cb534e6

Please sign in to comment.