-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
308 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
102 changes: 102 additions & 0 deletions
102
sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Copyright 2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.typelevel.otel4s.sdk.trace | ||
package processor | ||
|
||
import cats.MonadThrow | ||
import cats.effect.std.Console | ||
import cats.syntax.applicative._ | ||
import cats.syntax.applicativeError._ | ||
import cats.syntax.flatMap._ | ||
import cats.syntax.functor._ | ||
import org.typelevel.otel4s.sdk.trace.data.SpanData | ||
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter | ||
import org.typelevel.otel4s.trace.SpanContext | ||
|
||
import scala.util.control.NonFatal | ||
|
||
/** An implementation of the [[SpanProcessor]] that passes ended | ||
* [[data.SpanData SpanData]] directly to the configured exporter. | ||
* | ||
* @note | ||
* this processor exports spans individually upon completion, resulting in a | ||
* single span per export request. | ||
* | ||
* @see | ||
* [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#simple-processor]] | ||
* | ||
* @tparam F | ||
* the higher-kinded type of a polymorphic effect | ||
*/ | ||
final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private ( | ||
exporter: SpanExporter[F], | ||
exportOnlySampled: Boolean | ||
) extends SpanProcessor[F] { | ||
|
||
val isStartRequired: Boolean = false | ||
val isEndRequired: Boolean = true | ||
|
||
def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = | ||
MonadThrow[F].unit | ||
|
||
def onEnd(span: SpanData): F[Unit] = { | ||
val canExport = !exportOnlySampled || span.spanContext.isSampled | ||
doExport(span).whenA(canExport) | ||
} | ||
|
||
private def doExport(span: SpanData): F[Unit] = | ||
exporter.exportSpans(List(span)).onError { | ||
case e if NonFatal(e) => | ||
for { | ||
_ <- Console[F].errorln("SimpleSpanExporter: the export has failed") | ||
_ <- Console[F].printStackTrace(e) | ||
} yield () | ||
} | ||
|
||
def forceFlush: F[Unit] = | ||
MonadThrow[F].unit | ||
} | ||
|
||
object SimpleSpanProcessor { | ||
|
||
/** Creates a [[SimpleSpanProcessor]] that passes only '''sampled''' ended | ||
* spans to the given `exporter`. | ||
* | ||
* @param exporter | ||
* the [[exporter.SpanExporter SpanExporter]] to use | ||
*/ | ||
def apply[F[_]: MonadThrow: Console]( | ||
exporter: SpanExporter[F] | ||
): SimpleSpanProcessor[F] = | ||
apply(exporter, exportOnlySampled = true) | ||
|
||
/** Creates a [[SimpleSpanProcessor]] that passes ended spans to the given | ||
* `exporter`. | ||
* | ||
* @param exporter | ||
* the [[exporter.SpanExporter SpanExporter]] to use | ||
* | ||
* @param exportOnlySampled | ||
* whether to export only sampled spans | ||
*/ | ||
def apply[F[_]: MonadThrow: Console]( | ||
exporter: SpanExporter[F], | ||
exportOnlySampled: Boolean | ||
): SimpleSpanProcessor[F] = | ||
new SimpleSpanProcessor[F](exporter, exportOnlySampled) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
190 changes: 190 additions & 0 deletions
190
...ce/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessorSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
/* | ||
* Copyright 2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.typelevel.otel4s.sdk.trace | ||
package processor | ||
|
||
import cats.Foldable | ||
import cats.effect.IO | ||
import cats.syntax.foldable._ | ||
import cats.syntax.traverse._ | ||
import munit.CatsEffectSuite | ||
import munit.ScalaCheckEffectSuite | ||
import org.scalacheck.Arbitrary | ||
import org.scalacheck.Test | ||
import org.scalacheck.effect.PropF | ||
import org.typelevel.otel4s.Attribute | ||
import org.typelevel.otel4s.AttributeKey | ||
import org.typelevel.otel4s.meta.InstrumentMeta | ||
import org.typelevel.otel4s.sdk.common.InstrumentationScope | ||
import org.typelevel.otel4s.sdk.trace.data.SpanData | ||
import org.typelevel.otel4s.sdk.trace.exporter.InMemorySpanExporter | ||
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter | ||
import org.typelevel.otel4s.trace.Span | ||
import org.typelevel.otel4s.trace.SpanContext | ||
import org.typelevel.otel4s.trace.SpanKind | ||
import org.typelevel.otel4s.trace.Status | ||
|
||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class SimpleSpanProcessorSuite | ||
extends CatsEffectSuite | ||
with ScalaCheckEffectSuite { | ||
|
||
private implicit val spanDataArbitrary: Arbitrary[SpanData] = | ||
Arbitrary(Gens.spanData) | ||
|
||
test("do nothing on start") { | ||
PropF.forAllF { (spans: List[SpanData]) => | ||
for { | ||
exporter <- InMemorySpanExporter.create[IO](None) | ||
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true) | ||
_ <- spans.traverse_(s => processor.onStart(None, constSpanRef(s))) | ||
exported <- exporter.finishedSpans | ||
} yield assert(exported.isEmpty) | ||
} | ||
} | ||
|
||
test("export only sampled spans on end (exportOnlySampled = true)") { | ||
PropF.forAllF { (spans: List[SpanData]) => | ||
val sampled = spans.filter(_.spanContext.isSampled) | ||
|
||
for { | ||
exporter <- InMemorySpanExporter.create[IO](None) | ||
processor = SimpleSpanProcessor(exporter, exportOnlySampled = true) | ||
_ <- spans.traverse_(span => processor.onEnd(span)) | ||
exported <- exporter.finishedSpans | ||
} yield assertEquals(exported, sampled) | ||
} | ||
} | ||
|
||
test("export all spans on end (exportOnlySampled = false)") { | ||
PropF.forAllF { (spans: List[SpanData]) => | ||
for { | ||
exporter <- InMemorySpanExporter.create[IO](None) | ||
processor = SimpleSpanProcessor(exporter, exportOnlySampled = false) | ||
_ <- spans.traverse_(span => processor.onEnd(span)) | ||
exported <- exporter.finishedSpans | ||
} yield assertEquals( | ||
exported.map(_.spanContext.spanIdHex), | ||
spans.map(_.spanContext.spanIdHex) | ||
) | ||
} | ||
} | ||
|
||
test("rethrow export errors") { | ||
PropF.forAllF { (spans: List[SpanData]) => | ||
val error = new RuntimeException("something went wrong") | ||
val exporter = new FailingExporter("error-prone", error) | ||
val processor = SimpleSpanProcessor(exporter, exportOnlySampled = false) | ||
|
||
for { | ||
attempts <- spans.traverse(span => processor.onEnd(span).attempt) | ||
} yield assertEquals(attempts, List.fill(spans.size)(Left(error))) | ||
} | ||
} | ||
|
||
private def constSpanRef(data: SpanData): SpanRef[IO] = { | ||
new SpanRef[IO] with Span.Backend[IO] { | ||
private val noopBackend = Span.Backend.noop[IO] | ||
|
||
def kind: SpanKind = | ||
data.kind | ||
|
||
def scopeInfo: InstrumentationScope = | ||
data.instrumentationScope | ||
|
||
def parentSpanContext: Option[SpanContext] = | ||
data.parentSpanContext | ||
|
||
def name: IO[String] = | ||
IO(data.name) | ||
|
||
def toSpanData: IO[SpanData] = | ||
IO(data) | ||
|
||
def hasEnded: IO[Boolean] = | ||
IO(false) | ||
|
||
def duration: IO[FiniteDuration] = | ||
IO(data.startTimestamp) | ||
|
||
def getAttribute[A](key: AttributeKey[A]): IO[Option[A]] = | ||
IO(data.attributes.get(key).map(_.value)) | ||
|
||
// span.backend | ||
|
||
val meta: InstrumentMeta[IO] = | ||
noopBackend.meta | ||
|
||
val context: SpanContext = | ||
noopBackend.context | ||
|
||
def updateName(name: String): IO[Unit] = | ||
noopBackend.updateName(name) | ||
|
||
def addAttributes(attributes: Attribute[_]*): IO[Unit] = | ||
noopBackend.addAttributes(attributes: _*) | ||
|
||
def addEvent(name: String, attributes: Attribute[_]*): IO[Unit] = | ||
noopBackend.addEvent(name, attributes: _*) | ||
|
||
def addEvent( | ||
name: String, | ||
timestamp: FiniteDuration, | ||
attributes: Attribute[_]* | ||
): IO[Unit] = | ||
noopBackend.addEvent(name, timestamp, attributes: _*) | ||
|
||
def recordException( | ||
exception: Throwable, | ||
attributes: Attribute[_]* | ||
): IO[Unit] = | ||
noopBackend.recordException(exception, attributes: _*) | ||
|
||
def setStatus(status: Status): IO[Unit] = | ||
noopBackend.setStatus(status) | ||
|
||
def setStatus(status: Status, description: String): IO[Unit] = | ||
noopBackend.setStatus(status, description) | ||
|
||
private[otel4s] def end: IO[Unit] = | ||
noopBackend.end | ||
|
||
private[otel4s] def end(timestamp: FiniteDuration): IO[Unit] = | ||
noopBackend.end(timestamp) | ||
} | ||
} | ||
|
||
override protected def scalaCheckTestParameters: Test.Parameters = | ||
super.scalaCheckTestParameters | ||
.withMinSuccessfulTests(10) | ||
.withMaxSize(10) | ||
|
||
private class FailingExporter( | ||
exporterName: String, | ||
onExport: Throwable | ||
) extends SpanExporter[IO] { | ||
def name: String = exporterName | ||
|
||
def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] = | ||
IO.raiseError(onExport) | ||
|
||
def flush: IO[Unit] = | ||
IO.unit | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters