Skip to content

Commit

Permalink
Common: add an enrichment extracting canonical properties into dedica…
Browse files Browse the repository at this point in the history
…ted contexts (WIP #47)
  • Loading branch information
chuwy committed Jun 7, 2021
1 parent a47072f commit f5f78b7
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import enrichments.{EventEnrichments => EE}
import enrichments.{MiscEnrichments => ME}
import enrichments.registry._
import enrichments.registry.apirequest.ApiRequestEnrichment
import enrichments.registry.extractor.ExtractorEnrichment
import enrichments.registry.pii.PiiPseudonymizerEnrichment
import enrichments.registry.sqlquery.SqlQueryEnrichment
import enrichments.web.{PageEnrichments => WPE}
Expand Down Expand Up @@ -92,8 +93,15 @@ object EnrichmentManager {
enriched.pii = pii.asString
}
}
_ <- extractor[F](processor, raw, enriched, registry.extractor)
} yield enriched

def extractor[F[_]: Monad](processor: Processor, rawEvent: RawEvent, enriched: EnrichedEvent, enrichment: Option[ExtractorEnrichment]): EitherT[F, BadRow, Unit] =
enrichment match {
case Some(extractor) => extractor.process(processor, rawEvent, enriched)
case None => EitherT.rightT[F, BadRow](())
}

/**
* Run all the enrichments and aggregate the errors if any
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.data.{EitherT, ValidatedNel, NonEmptyList}

import cats.effect.Clock
import cats.implicits._

import io.circe._
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -37,6 +37,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.Enrichm
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.PiiPseudonymizerEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.SqlQueryEnrichment

Expand Down Expand Up @@ -109,6 +110,8 @@ object EnrichmentRegistry {
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
case c: ExtractorConf =>
er.map(_.copy(extractor = Some(c.enrichment)))
case c: ApiRequestConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker))
Expand Down Expand Up @@ -250,5 +253,6 @@ final case class EnrichmentRegistry[F[_]](
uaParser: Option[UaParserEnrichment] = None,
userAgentUtils: Option[UserAgentUtilsEnrichment] = None,
weather: Option[WeatherEnrichment[F]] = None,
yauaa: Option[YauaaEnrichment] = None
yauaa: Option[YauaaEnrichment] = None,
extractor: Option[ExtractorEnrichment] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
HttpApi
}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.{ ExtractorEnrichment, Extractable }
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

sealed trait EnrichmentConf {
Expand Down Expand Up @@ -209,4 +210,8 @@ object EnrichmentConf {
) extends EnrichmentConf {
def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize)
}

final case class ExtractorConf(schemaKey: SchemaKey, entities: Set[Extractable], erase: Boolean) extends EnrichmentConf {
def enrichment: ExtractorEnrichment = ExtractorEnrichment(entities, erase)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import io.circe.{JsonObject, Json, Decoder}
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey}

import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

sealed trait Extractable extends Product with Serializable {
def schemaKey: SchemaKey
def keys: List[(String, TypedField)]

private def getters =
keys.map { case (key, v) => Extractable.EventClass.getMethod(key) -> v }
private def erasers =
keys.map { case (key, f) => Extractable.EventClass.getMethod("set" ++ key.capitalize, f.manifest.runtimeClass) }

def getJson(event: EnrichedEvent): Either[Throwable, JsonObject] =
getters
.traverse { case (getter, to) => to.cast(getter.invoke(event)).map { value => to.name -> value } }
.map { kvs => JsonObject.fromIterable(kvs.collect { case (k, Some(v)) => (k, v) }) }

def process(event: EnrichedEvent): Either[Throwable, Option[SelfDescribingData[Json]]] =
getJson(event) match {
case Right(o) if o.isEmpty => Right(None)
case Right(o) => Right(Some(SelfDescribingData(schemaKey, Json.fromJsonObject(o))))
case Left(error) => Left(error)
}

def erase(event: EnrichedEvent): Unit = {
erasers.foreach { eraser => eraser.invoke(event, null) }
}
}

object Extractable {

type Extractables = List[Extractable]

def All: Extractables = List(MaxMind)

implicit def extractableCirceDecoder: Decoder[Extractable] =
Decoder[String].map(_.toLowerCase).emap { e =>
All
.find(_.toString.toLowerCase == e.toLowerCase)
.toRight(s"$e is an unknown entity to extract. Try: ${All.map(_.toString.toLowerCase).mkString(", ")} or all")
}

implicit def extractablesCirceDecoder: Decoder[Extractables] =
Decoder[List[Extractable]]
.handleErrorWith(e => Decoder[String].emap(s => if (s.toLowerCase == "all") All.asRight else e.show.asLeft))

private val EventClass = classOf[EnrichedEvent]

case object MaxMind extends Extractable {
val schemaKey = SchemaKey("com.maxmind", "context", "jsonschema", SchemaVer.Full(1,0,0))

def keys = List(
"geo_country" -> TypedField.Str("country"),
"geo_region" -> TypedField.Str("region"),
"geo_city" -> TypedField.Str("city"),
"geo_zipcode" -> TypedField.Str("zipcode"),
"geo_latitude" -> TypedField.Flo("latitude"),
"geo_longitude" -> TypedField.Flo("longitude"),
"geo_region_name" -> TypedField.Str("region_name"),

"ip_isp" -> TypedField.Str("isp"),
"ip_organization" -> TypedField.Str("organization"),
"ip_domain" -> TypedField.Str("domain"),
"ip_netspeed" -> TypedField.Str("netspeed")
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import java.time.Instant

import cats.Applicative
import cats.implicits._
import cats.data.{EitherT, ValidatedNel, NonEmptyList}

import io.circe.{Error, Json}
import io.circe.parser.parse
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData, SchemaKey}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows.FailureDetails.{EnrichmentFailureMessage, EnrichmentFailure}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Payload, Failure}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent.toRawEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.MiscEnrichments
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor.ExtractorEnrichment.failedReflection
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{ParseableEnrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils

final case class ExtractorEnrichment(entities: Set[Extractable], erase: Boolean) {
def process[F[_]: Applicative](processor: Processor, rawEvent: RawEvent, event: EnrichedEvent): EitherT[F, BadRow, Unit] = {
// Create beforehand because later it could be mutated
val badRowPayload = Payload.EnrichmentPayload(toPartiallyEnrichedEvent(event), toRawEvent(rawEvent))
entities.toList.flatTraverse { entity =>
entity.process(event) match {
case Left(value) =>
Left((entity.toString.toLowerCase, value))
case Right(Some(json)) =>
if (erase) entity.erase(event) else () // Unsafe mutation
Right(List(json))
case Right(None) =>
Right(Nil)
}
} match {
case Left(error) =>
EitherT.leftT(failedReflection(processor, badRowPayload)(error))
case Right(entities) if entities.isEmpty =>
EitherT.rightT[F, BadRow](())
case Right(entities) =>
val contexts = if (event.derived_contexts == null)
SelfDescribingData(MiscEnrichments.ContextsSchema, List.empty[Json]).asRight
else
parse(event.derived_contexts)
.flatMap(_.as[SelfDescribingData[Json]])
.flatMap { contexts => contexts.data.as[List[Json]].map(data => SelfDescribingData(contexts.schema, data)) }

contexts
.map { contexts => SelfDescribingData(contexts.schema, (contexts.data ++ entities.map(_.normalize)).asJson) }
.leftMap(ExtractorEnrichment.failedDecoding(processor, badRowPayload))
.toEitherT[F]
.map(contexts => event.setDerived_contexts(contexts.asString)) // Unsafe mutation
}
}
}

object ExtractorEnrichment extends ParseableEnrichment {

def failedDecoding(processor: Processor, payload: Payload.EnrichmentPayload)(error: Error): BadRow = {
val message = EnrichmentFailureMessage.Simple(s"Cannot decode derived_contexts. ${error.show}")
val enrichmentFailure = EnrichmentFailure(None, message)
val messages = NonEmptyList.one(enrichmentFailure)
val failure = Failure.EnrichmentFailures(Instant.now(), messages)
BadRow.EnrichmentFailures(processor, failure, payload)
}

def failedReflection(processor: Processor, payload: Payload.EnrichmentPayload)(error: (String, Throwable)): BadRow = {
val message = EnrichmentFailureMessage.Simple(s"Failed to extract a property for ${error._1}. ${error._2}")
val enrichmentFailure = EnrichmentFailure(None, message)
val messages = NonEmptyList.one(enrichmentFailure)
val failure = Failure.EnrichmentFailures(Instant.now(), messages)
BadRow.EnrichmentFailures(processor, failure, payload)
}


val supportedSchema: SchemaCriterion =
SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
"extractor_enrichment_config",
"jsonschema",
1,
0,
0
)

def parse(config: Json, schemaKey: SchemaKey, localMode: Boolean): ValidatedNel[String, EnrichmentConf] = {
isParseable(config, schemaKey)
.toValidatedNel
.andThen { _ =>
val erase = CirceUtils.extract[Boolean](config, "parameters", "erase")
val extract = CirceUtils.extract[Extractable.Extractables](config, "parameters", "extract")

(erase, extract).mapN { (er, ex) =>
EnrichmentConf.ExtractorConf(schemaKey, ex.toSet, er)
}.toValidatedNel
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.extractor

import io.circe.{Encoder, Json}

import cats.implicits._

sealed trait TypedField {
type Target
def manifest: Manifest[Target]
def name: String
def encoder: Encoder[Target]

def cast(anyRef: AnyRef): Either[Throwable, Option[Json]] =
Either.catchNonFatal(anyRef.asInstanceOf[Target]).map(Option.apply).nested.map(encoder.apply).value
}

object TypedField {
case class Str(name: String) extends TypedField {
type Target = String
def manifest: Manifest[String] = implicitly[Manifest[String]]
val encoder = Encoder[String]
}
case class Flo(name: String) extends TypedField {
type Target = java.lang.Float
def manifest: Manifest[java.lang.Float] = implicitly[Manifest[java.lang.Float]]
val encoder = Encoder[java.lang.Float]
}
}

Loading

0 comments on commit f5f78b7

Please sign in to comment.