Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Mar 5, 2024
1 parent ab30861 commit 378fe6c
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,12 @@ object AtomicFields {
AtomicFields(withLimits)
}

def atomicErrorsToSchemaViolation(errors: NonEmptyList[AtomicError]): FailureDetails.SchemaViolation = {
val messages = errors.map { error =>
ValidatorReport(error.message, Some(error.field), Nil, error.value)
}
val validatorError = ValidatorError.InvalidData(messages)
val clientError = ValidationError(validatorError, None)
def atomicErrorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
)
}
}

case class AtomicError(
field: String,
value: Option[String],
message: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import cats.data.NonEmptyList

import cats.implicits._

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

Expand Down Expand Up @@ -50,13 +53,14 @@ object AtomicFieldsLengthValidator {
private def validateField(
event: EnrichedEvent,
atomicField: LimitedAtomicField
): Either[AtomicError, Unit] = {
): Either[ValidatorReport, Unit] = {
val actualValue = atomicField.value.enrichedValueExtractor(event)
if (actualValue != null && actualValue.length > atomicField.limit)
AtomicError(
atomicField.value.name,
Option(actualValue),
s"Field is longer than maximum allowed size ${atomicField.limit}"
ValidatorReport(
s"Field is longer than maximum allowed size ${atomicField.limit}",
Some(atomicField.value.name),
Nil,
Some(actualValue)
).asLeft
else
Right(())
Expand All @@ -65,12 +69,12 @@ object AtomicFieldsLengthValidator {
private def handleAcceptableErrors[F[_]: Monad](
invalidCount: F[Unit],
event: EnrichedEvent,
errors: NonEmptyList[AtomicError]
errors: NonEmptyList[ValidatorReport]
): F[Unit] =
invalidCount *>
Monad[F].pure(
logger.debug(
s"Enriched event not valid against atomic schema. Event id: ${event.event_id}. Invalid fields: ${errors.map(_.field).toList.mkString(", ")}"
s"Enriched event not valid against atomic schema. Event id: ${event.event_id}. Invalid fields: ${errors.map(_.path).toList.flatten.mkString(", ")}"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.lang.{Integer => JInteger}

import cats.syntax.either._

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

/**
* Contains enrichments related to the client - where the client is the software which is using the
* Snowplow tracker. Enrichments relate to browser resolution.
Expand All @@ -34,7 +36,7 @@ object ClientEnrichments {
* @param res The packed string holding the screen dimensions
* @return the ResolutionTuple or an error message, boxed in a Scalaz Validation
*/
val extractViewDimensions: (String, String) => Either[AtomicError, (JInteger, JInteger)] =
val extractViewDimensions: (String, String) => Either[ValidatorReport, (JInteger, JInteger)] =
(field, res) =>
(res match {
case ResRegex(width, height) =>
Expand All @@ -43,7 +45,7 @@ object ClientEnrichments {
.leftMap(_ => "Could not be converted to java.lang.Integer s")
case _ => s"Does not conform to regex ${ResRegex.toString}".asLeft
}).leftMap { msg =>
AtomicError(field, Option(res), msg)
ValidatorReport(msg, Some(field), Nil, Option(res))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.snowplowanalytics.refererparser._

import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._
Expand Down Expand Up @@ -115,20 +116,20 @@ object EnrichmentManager {
registryLookup: RegistryLookup[F]
): EitherT[F, BadRow, IgluUtils.EventExtractResult] =
EitherT {
val setup = setupEnrichedEvent(raw, enrichedEvent, etlTstamp, processor).toValidatedNel
IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.map((setup, _).mapN((_, extract) => extract))
.map {
_.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}.toEither
for {
setup <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor).map(_.toValidatedNel)
iglu <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
} yield (setup, iglu)
.mapN((_, extract) => extract)
.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
.toEither
}

/**
Expand Down Expand Up @@ -312,34 +313,35 @@ object EnrichmentManager {
}

/** Initialize the mutable [[EnrichedEvent]]. */
private def setupEnrichedEvent(
private def setupEnrichedEvent[F[_]: Monad](
raw: RawEvent,
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): Either[FailureDetails.SchemaViolation, Unit] = {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
e.v_etl = ME.etlVersion(processor)
e.etl_tstamp = EE.toTimestamp(etlTstamp)
e.network_userid = raw.context.userId.map(_.toString).orNull // May be updated later by 'nuid'
e.user_ipaddress = ME
.extractIp("user_ipaddress", raw.context.ipAddress.orNull)
.toOption
.orNull // May be updated later by 'ip'
// May be updated later if we have a `ua` parameter
setUseragent(e, raw.context.useragent)
// Validate that the collectorTstamp exists and is Redshift-compatible
val collectorTstamp = setCollectorTstamp(e, raw.context.timestamp).toValidatedNel
// Map/validate/transform input fields to enriched event fields
val transformed = Transform.transform(raw, e)

(collectorTstamp |+| transformed)
.leftMap(AtomicFields.atomicErrorsToSchemaViolation)
.toEither
}
): F[Either[FailureDetails.SchemaViolation, Unit]] =
Monad[F].pure {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
e.v_etl = ME.etlVersion(processor)
e.etl_tstamp = EE.toTimestamp(etlTstamp)
e.network_userid = raw.context.userId.map(_.toString).orNull // May be updated later by 'nuid'
e.user_ipaddress = ME
.extractIp("user_ipaddress", raw.context.ipAddress.orNull)
.toOption
.orNull // May be updated later by 'ip'
// May be updated later if we have a `ua` parameter
setUseragent(e, raw.context.useragent)
// Validate that the collectorTstamp exists and is Redshift-compatible
val collectorTstamp = setCollectorTstamp(e, raw.context.timestamp).toValidatedNel
// Map/validate/transform input fields to enriched event fields
val transformed = Transform.transform(raw, e)

(collectorTstamp |+| transformed)
.leftMap(AtomicFields.atomicErrorsToSchemaViolation)
.toEither
}

def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[AtomicError, Unit] =
def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[ValidatorReport, Unit] =
EE.formatCollectorTstamp(timestamp).map { t =>
event.collector_tstamp = t
().asRight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import cats.syntax.option._
import org.joda.time.{DateTime, DateTimeZone, Period}
import org.joda.time.format.DateTimeFormat

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.badrows._

/** Holds the enrichments related to events. */
Expand Down Expand Up @@ -47,14 +49,14 @@ object EventEnrichments {
* @param Optional collectorTstamp
* @return Validation boxing the result of making the timestamp Redshift-compatible
*/
def formatCollectorTstamp(collectorTstamp: Option[DateTime]): Either[AtomicError, String] =
def formatCollectorTstamp(collectorTstamp: Option[DateTime]): Either[ValidatorReport, String] =
collectorTstamp match {
case None => AtomicError("collector_tstamp", None, "Field not set").asLeft
case None => ValidatorReport("Field not set", Some("collector_tstamp"), Nil, None).asLeft
case Some(t) =>
val formattedTimestamp = toTimestamp(t)
if (formattedTimestamp.startsWith("-") || t.getYear > 9999 || t.getYear < 0) {
val msg = s"Formatted as $formattedTimestamp is not Redshift-compatible"
AtomicError("collector_tstamp", t.toString.some, msg).asLeft
ValidatorReport(msg, Some("collector_tstamp"), Nil, Some(t.toString)).asLeft
} else
formattedTimestamp.asRight
}
Expand Down Expand Up @@ -111,25 +113,27 @@ object EventEnrichments {
* @param tstamp The timestamp as stored in the Tracker Protocol
* @return a Tuple of two Strings (date and time), or an error message if the format was invalid
*/
val extractTimestamp: (String, String) => Either[AtomicError, String] =
val extractTimestamp: (String, String) => Either[ValidatorReport, String] =
(field, tstamp) =>
try {
val dt = new DateTime(tstamp.toLong)
val timestampString = toTimestamp(dt)
if (timestampString.startsWith("-") || dt.getYear > 9999 || dt.getYear < 0)
AtomicError(
field,
Option(tstamp),
s"Formatting as $timestampString is not Redshift-compatible"
ValidatorReport(
s"Formatting as $timestampString is not Redshift-compatible",
Some(field),
Nil,
Option(tstamp)
).asLeft
else
timestampString.asRight
} catch {
case _: NumberFormatException =>
AtomicError(
field,
Option(tstamp),
"Not in the expected format: ms since epoch"
ValidatorReport(
"Not in the expected format: ms since epoch",
Some(field),
Nil,
Option(tstamp)
).asLeft
}

Expand All @@ -140,7 +144,7 @@ object EventEnrichments {
* @param eventCode The event code
* @return the event type, or an error message if not recognised, boxed in a Scalaz Validation
*/
val extractEventType: (String, String) => Either[AtomicError, String] =
val extractEventType: (String, String) => Either[ValidatorReport, String] =
(field, code) =>
code match {
case "se" => "struct".asRight
Expand All @@ -153,7 +157,7 @@ object EventEnrichments {
case "pp" => "page_ping".asRight
case _ =>
val msg = "Not a valid event type"
AtomicError(field, Option(code), msg).asLeft
ValidatorReport(msg, Some(field), Nil, Option(code)).asLeft
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import io.circe._

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

Expand Down Expand Up @@ -44,7 +46,7 @@ object MiscEnrichments {
* @param platform The code for the platform generating this event.
* @return a Scalaz ValidatedString.
*/
val extractPlatform: (String, String) => Either[AtomicError, String] =
val extractPlatform: (String, String) => Either[ValidatorReport, String] =
(field, platform) =>
platform match {
case "web" => "web".asRight // Web, including Mobile Web
Expand All @@ -58,11 +60,11 @@ object MiscEnrichments {
case "headset" => "headset".asRight // AR/VR Headset
case _ =>
val msg = "Not a valid platform"
AtomicError(field, Option(platform), msg).asLeft
ValidatorReport(msg, Some(field), Nil, Option(platform)).asLeft
}

/** Make a String TSV safe */
val toTsvSafe: (String, String) => Either[AtomicError, String] =
val toTsvSafe: (String, String) => Either[ValidatorReport, String] =
(_, value) => CU.makeTsvSafe(value).asRight

/**
Expand All @@ -71,7 +73,7 @@ object MiscEnrichments {
* Here we retrieve the first one as it is supposed to be the client one, c.f.
* https://en.m.wikipedia.org/wiki/X-Forwarded-For#Format
*/
val extractIp: (String, String) => Either[AtomicError, String] =
val extractIp: (String, String) => Either[ValidatorReport, String] =
(_, value) => {
val lastIp = Option(value).map(_.split("[,|, ]").head).orNull
CU.makeTsvSafe(lastIp).asRight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments
import cats.implicits._
import cats.data.ValidatedNel

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EventEnrichments => EE}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{MiscEnrichments => ME}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{ClientEnrichments => CE}
Expand All @@ -29,7 +31,7 @@ object Transform {
* to "user_ipaddress" in the enriched event
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
*/
private[enrichments] def transform(raw: RawEvent, enriched: EnrichedEvent): ValidatedNel[AtomicError, Unit] = {
private[enrichments] def transform(raw: RawEvent, enriched: EnrichedEvent): ValidatedNel[ValidatorReport, Unit] = {
val sourceMap: SourceMap = raw.parameters.collect { case (k, Some(v)) => (k, v) }
val firstPassTransform = enriched.transform(sourceMap, firstPassTransformMap)
val secondPassTransform = enriched.transform(sourceMap, secondPassTransformMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ object CrossNavigationEnrichment extends ParseableEnrichment {
EE.extractTimestamp("sp_dtm", s)
.leftMap { error =>
val f = FailureDetails.EnrichmentFailureMessage.InputData(
error.field,
error.value,
error.path.getOrElse(""),
error.keyword,
error.message
)
FailureDetails.EnrichmentFailure(None, f)
Expand Down
Loading

0 comments on commit 378fe6c

Please sign in to comment.