From fae910c1c82f45ead0c6b5693dc622be99817f96 Mon Sep 17 00:00:00 2001 From: szareiangm Date: Mon, 6 Jul 2020 15:31:18 -0400 Subject: [PATCH] Make the emitter generic (Closes #162) --- .../loader/EsLoaderBadRow.scala | 12 ++- .../loader/KinesisPipeline.scala | 3 +- .../loader/{ => emitters}/Emitter.scala | 95 ++++++++++--------- .../loader/executors/NsqSourceExecutor.scala | 6 +- .../EmitterSpec.scala | 14 +-- .../clients/ElasticsearchBulkSender.scala | 1 + 6 files changed, 70 insertions(+), 61 deletions(-) rename core/src/main/scala/com.snowplowanalytics.stream/loader/{ => emitters}/Emitter.scala (66%) diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala index bb51af05..d3bf26eb 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala @@ -40,11 +40,13 @@ case class EsLoaderBadRow(line: String, errors: NonEmptyList[String]) { .withZone(DateTimeZone.UTC) def toCompactJson = - Json.obj( - "line" -> line.asJson, - "errors" -> errors.asJson, - "failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson - ).noSpaces + Json + .obj( + "line" -> line.asJson, + "errors" -> errors.asJson, + "failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson + ) + .noSpaces } object EsLoaderBadRow { diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/KinesisPipeline.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/KinesisPipeline.scala index d46c2f32..8e68b016 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/KinesisPipeline.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/KinesisPipeline.scala @@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.{ } import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration import com.amazonaws.services.kinesis.connectors.impl.{AllPassFilter, BasicMemoryBuffer} +import emitters.Emitter // This project import com.snowplowanalytics.stream.loader.sinks._ @@ -60,7 +61,7 @@ class KinesisPipeline( ) extends IKinesisConnectorPipeline[ValidatedJsonRecord, EmitterJsonInput] { def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[EmitterJsonInput] = - new Emitter(bulkSender, goodSink, badSink, bufferRecordLimit, bufferByteLimit) + new Emitter(bulkSender, badSink, bufferRecordLimit, bufferByteLimit) def getBuffer(configuration: KinesisConnectorConfiguration): IBuffer[ValidatedJsonRecord] = new BasicMemoryBuffer[ValidatedJsonRecord](configuration) diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/emitters/Emitter.scala similarity index 66% rename from core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala rename to core/src/main/scala/com.snowplowanalytics.stream/loader/emitters/Emitter.scala index 01b9268f..c88dfe99 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/emitters/Emitter.scala @@ -17,78 +17,66 @@ * governing permissions and limitations there under. */ package com.snowplowanalytics.stream.loader +package emitters // Amazon +import cats.data.NonEmptyList import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter +import com.snowplowanalytics.stream.loader.clients.BulkSender +import com.snowplowanalytics.stream.loader.sinks.ISink +import com.snowplowanalytics.stream.loader.EsLoaderBadRow + +import scala.collection.mutable.ListBuffer // Java import java.io.IOException -import java.util.{List => JList} +import java.util.{List => List} // cats import cats.data.Validated // Scala -import scala.collection.mutable.ListBuffer import scala.collection.JavaConverters._ +import scala.collection.immutable.{List => SList} // This project -import sinks.ISink -import clients.BulkSender /** * Emitter class for any sort of BulkSender Extension * * @param bulkSender The bulkSender Client to use for the sink - * @param goodSink the configured GoodSink * @param badSink the configured BadSink * @param bufferRecordLimit record limit for buffer * @param bufferByteLimit byte limit for buffer */ -class Emitter( - bulkSender: BulkSender[EmitterJsonInput], - goodSink: Option[ISink], +class Emitter[T]( + bulkSender: BulkSender[T], badSink: ISink, bufferRecordLimit: Long, bufferByteLimit: Long -) extends IEmitter[EmitterJsonInput] { - - @throws[IOException] - override def emit(buffer: UnmodifiableBuffer[EmitterJsonInput]): JList[EmitterJsonInput] = - attemptEmit(buffer.getRecords.asScala.toList).asJava +) extends IEmitter[T] { /** - * Emits good records to stdout or sink. - * All records which sink rejects and all records which failed transformation + * This function is called from AWS library. + * Emits good records to Kinesis, Postgres, S3 or Elasticsearch. + * All records which Elasticsearch rejects and all records which failed transformation * get sent to to stderr or Kinesis. * - * @param records list containing EmitterJsonInputs - * @return list of inputs which failed transformation or which the sink rejected + * @param buffer list containing EmitterInputs + * @return list of inputs which failed transformation or which Elasticsearch rejected */ @throws[IOException] - private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = { - if (records.isEmpty) { - Nil + def emit(buffer: UnmodifiableBuffer[T]): List[T] = + if (buffer.getRecords.asScala.isEmpty) { + null } else { - val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) = - records.partition(_._2.isValid) - // Send all valid records to stdout / Sink and return those rejected by it - val rejects = goodSink match { - case Some(s) => - validRecords.foreach { - case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true) - case _ => () - } - Nil - case None if validRecords.isEmpty => Nil - case _ => emit(validRecords) - } - invalidRecords ++ rejects - } - } + // Send all valid records to bulk sender and returned rejected/unvalidated ones. + sliceAndSend(buffer.getRecords.asScala.toList) + }.asJava /** + * This is called from NsqSourceExecutor * Emits good records to Sink and bad records to Kinesis. * All valid records in the buffer get sent to the sink in a bulk request. * All invalid requests and all requests which failed transformation get sent to Kinesis. @@ -96,12 +84,27 @@ class Emitter( * @param records List of records to send * @return List of inputs which the sink rejected */ - def emit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = + def emitList(records: SList[T]): SList[T] = for { recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit) result <- bulkSender.send(recordSlice) } yield result + /** + * Emits good records to Elasticsearch and bad records to Kinesis. + * All valid records in the buffer get sent to Elasticsearch in a bulk request. + * All invalid requests and all requests which failed transformation get sent to Kinesis. + * + * @param records List of records to send to Elasticsearch + * @return List of inputs which Elasticsearch rejected + */ + def sliceAndSend(records: SList[T]): SList[T] = { + val failures: SList[SList[T]] = for { + recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit) + } yield bulkSender.send(recordSlice) + failures.flatten + } + /** * Splits the buffer into emittable chunks based on the * buffer settings defined in the config @@ -111,16 +114,16 @@ class Emitter( * @param recordLimit emitter record limit * @return a list of buffers */ - private def splitBuffer( - records: List[EmitterJsonInput], + def splitBuffer( + records: SList[T], byteLimit: Long, recordLimit: Long - ): List[List[EmitterJsonInput]] = { + ): SList[SList[T]] = { // partition the records in - val remaining: ListBuffer[EmitterJsonInput] = records.to[ListBuffer] - val buffers: ListBuffer[List[EmitterJsonInput]] = new ListBuffer - val curBuffer: ListBuffer[EmitterJsonInput] = new ListBuffer - var runningByteCount: Long = 0L + val remaining: ListBuffer[T] = records.to[ListBuffer] + val buffers: ListBuffer[SList[T]] = new ListBuffer + val curBuffer: ListBuffer[T] = new ListBuffer + var runningByteCount: Long = 0L while (remaining.nonEmpty) { val record = remaining.remove(0) @@ -164,9 +167,9 @@ class Emitter( * * @param records List of failed records */ - override def fail(records: JList[EmitterJsonInput]): Unit = { + override def fail(records: List[T]): Unit = { records.asScala.foreach { - case (r: String, Validated.Invalid(fs)) => + case (r: String, Validated.Invalid(fs: NonEmptyList[String])) => val output = EsLoaderBadRow(r, fs).toCompactJson badSink.store(output, None, false) case (_, Validated.Valid(_)) => () diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala index b0fd7971..537b67fd 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala @@ -27,6 +27,7 @@ import com.snowplowanalytics.client.nsq.NSQConfig import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback import com.snowplowanalytics.client.nsq.exceptions.NSQException +import emitters.Emitter //Java import java.nio.charset.StandardCharsets.UTF_8 @@ -71,9 +72,8 @@ class NsqSourceExecutor( private val msgBuffer = new ListBuffer[EmitterJsonInput]() // ElasticsearchEmitter instance private val emitter = - new Emitter( + new Emitter[EmitterJsonInput]( bulkSender, - goodSink, badSink, config.streams.buffer.recordLimit, config.streams.buffer.byteLimit) @@ -98,7 +98,7 @@ class NsqSourceExecutor( msg.finished() if (msgBuffer.size == nsqBufferSize) { - val rejectedRecords = emitter.emit(msgBuffer.toList) + val rejectedRecords = emitter.emitList(msgBuffer.toList) emitter.fail(rejectedRecords.asJava) msgBuffer.clear() } diff --git a/core/src/test/scala/com.snowplowanalytics.stream.loader/EmitterSpec.scala b/core/src/test/scala/com.snowplowanalytics.stream.loader/EmitterSpec.scala index fd47fdcb..eb26b793 100644 --- a/core/src/test/scala/com.snowplowanalytics.stream.loader/EmitterSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.stream.loader/EmitterSpec.scala @@ -14,6 +14,8 @@ package com.snowplowanalytics.stream.loader // Java import java.util.Properties + +import emitters.Emitter import org.slf4j.Logger // Scala @@ -75,7 +77,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(new Properties, new DefaultAWSCredentialsProviderChain) - val eem = new Emitter(fakeSender, None, new StdouterrSink, 1, 1L) + val eem = new Emitter(fakeSender, new StdouterrSink, 1, 1L) val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid val invalidInput: EmitterJsonInput = "bad" -> "malformed event".invalidNel @@ -96,7 +98,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain) val ess = new MockElasticsearchSender - val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L) + val eem = new Emitter(ess, new StdouterrSink, 1, 1000L) val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid @@ -120,7 +122,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain) val ess = new MockElasticsearchSender - val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L) + val eem = new Emitter(ess, new StdouterrSink, 1, 1000L) val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid @@ -144,7 +146,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain) val ess = new MockElasticsearchSender - val eem = new Emitter(ess, None, new StdouterrSink, 100, 1048576L) + val eem = new Emitter(ess, new StdouterrSink, 100, 1048576L) val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid @@ -168,7 +170,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain) val ess = new MockElasticsearchSender - val eem = new Emitter(ess, None, new StdouterrSink, 1, 1048576L) + val eem = new Emitter(ess, new StdouterrSink, 1, 1048576L) val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid @@ -192,7 +194,7 @@ class EmitterSpec extends Specification { val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain) val ess = new MockElasticsearchSender - val eem = new Emitter(ess, None, new StdouterrSink, 2, 200L) + val eem = new Emitter(ess, new StdouterrSink, 2, 200L) // record size is 95 bytes val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala index 8b8da057..018cc20f 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala @@ -16,6 +16,7 @@ package clients // AWS import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchObject import com.amazonaws.auth.AWSCredentialsProvider +import emitters.Emitter // Java import com.google.common.base.Charsets