Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the emitter generic (Closes #162) #168

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ case class EsLoaderBadRow(line: String, errors: NonEmptyList[String]) {
.withZone(DateTimeZone.UTC)

def toCompactJson =
Json.obj(
"line" -> line.asJson,
"errors" -> errors.asJson,
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
).noSpaces
Json
.obj(
"line" -> line.asJson,
"errors" -> errors.asJson,
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
)
.noSpaces
}

object EsLoaderBadRow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.{
}
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration
import com.amazonaws.services.kinesis.connectors.impl.{AllPassFilter, BasicMemoryBuffer}
import emitters.Emitter

// This project
import com.snowplowanalytics.stream.loader.sinks._
Expand Down Expand Up @@ -60,7 +61,7 @@ class KinesisPipeline(
) extends IKinesisConnectorPipeline[ValidatedJsonRecord, EmitterJsonInput] {

def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[EmitterJsonInput] =
new Emitter(bulkSender, goodSink, badSink, bufferRecordLimit, bufferByteLimit)
new Emitter(bulkSender, badSink, bufferRecordLimit, bufferByteLimit)

def getBuffer(configuration: KinesisConnectorConfiguration): IBuffer[ValidatedJsonRecord] =
new BasicMemoryBuffer[ValidatedJsonRecord](configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,91 +17,94 @@
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.stream.loader
package emitters

// Amazon
import cats.data.NonEmptyList
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
import com.snowplowanalytics.stream.loader.clients.BulkSender
import com.snowplowanalytics.stream.loader.sinks.ISink
import com.snowplowanalytics.stream.loader.EsLoaderBadRow

import scala.collection.mutable.ListBuffer

// Java
import java.io.IOException
import java.util.{List => JList}
import java.util.{List => List}

// cats
import cats.data.Validated

// Scala
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.{List => SList}

// This project
import sinks.ISink
import clients.BulkSender

/**
* Emitter class for any sort of BulkSender Extension
*
* @param bulkSender The bulkSender Client to use for the sink
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
* @param bufferRecordLimit record limit for buffer
* @param bufferByteLimit byte limit for buffer
*/
class Emitter(
bulkSender: BulkSender[EmitterJsonInput],
goodSink: Option[ISink],
class Emitter[T](
bulkSender: BulkSender[T],
badSink: ISink,
bufferRecordLimit: Long,
bufferByteLimit: Long
) extends IEmitter[EmitterJsonInput] {

@throws[IOException]
override def emit(buffer: UnmodifiableBuffer[EmitterJsonInput]): JList[EmitterJsonInput] =
attemptEmit(buffer.getRecords.asScala.toList).asJava
) extends IEmitter[T] {

/**
* Emits good records to stdout or sink.
* All records which sink rejects and all records which failed transformation
* This function is called from AWS library.
* Emits good records to Kinesis, Postgres, S3 or Elasticsearch.
* All records which Elasticsearch rejects and all records which failed transformation
* get sent to to stderr or Kinesis.
*
* @param records list containing EmitterJsonInputs
* @return list of inputs which failed transformation or which the sink rejected
* @param buffer list containing EmitterInputs
* @return list of inputs which failed transformation or which Elasticsearch rejected
*/
@throws[IOException]
private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
if (records.isEmpty) {
Nil
def emit(buffer: UnmodifiableBuffer[T]): List[T] =
if (buffer.getRecords.asScala.isEmpty) {
null
} else {
val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) =
records.partition(_._2.isValid)
// Send all valid records to stdout / Sink and return those rejected by it
val rejects = goodSink match {
case Some(s) =>
validRecords.foreach {
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
case _ => ()
}
Nil
case None if validRecords.isEmpty => Nil
case _ => emit(validRecords)
}
invalidRecords ++ rejects
}
}
// Send all valid records to bulk sender and returned rejected/unvalidated ones.
sliceAndSend(buffer.getRecords.asScala.toList)
}.asJava

/**
* This is called from NsqSourceExecutor
* Emits good records to Sink and bad records to Kinesis.
* All valid records in the buffer get sent to the sink in a bulk request.
* All invalid requests and all requests which failed transformation get sent to Kinesis.
*
* @param records List of records to send
* @return List of inputs which the sink rejected
*/
def emit(records: List[EmitterJsonInput]): List[EmitterJsonInput] =
def emitList(records: SList[T]): SList[T] =
for {
recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit)
result <- bulkSender.send(recordSlice)
} yield result

/**
* Emits good records to Elasticsearch and bad records to Kinesis.
* All valid records in the buffer get sent to Elasticsearch in a bulk request.
* All invalid requests and all requests which failed transformation get sent to Kinesis.
*
* @param records List of records to send to Elasticsearch
* @return List of inputs which Elasticsearch rejected
*/
def sliceAndSend(records: SList[T]): SList[T] = {
val failures: SList[SList[T]] = for {
recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit)
} yield bulkSender.send(recordSlice)
failures.flatten
}

/**
* Splits the buffer into emittable chunks based on the
* buffer settings defined in the config
Expand All @@ -111,16 +114,16 @@ class Emitter(
* @param recordLimit emitter record limit
* @return a list of buffers
*/
private def splitBuffer(
records: List[EmitterJsonInput],
def splitBuffer(
records: SList[T],
byteLimit: Long,
recordLimit: Long
): List[List[EmitterJsonInput]] = {
): SList[SList[T]] = {
// partition the records in
val remaining: ListBuffer[EmitterJsonInput] = records.to[ListBuffer]
val buffers: ListBuffer[List[EmitterJsonInput]] = new ListBuffer
val curBuffer: ListBuffer[EmitterJsonInput] = new ListBuffer
var runningByteCount: Long = 0L
val remaining: ListBuffer[T] = records.to[ListBuffer]
val buffers: ListBuffer[SList[T]] = new ListBuffer
val curBuffer: ListBuffer[T] = new ListBuffer
var runningByteCount: Long = 0L

while (remaining.nonEmpty) {
val record = remaining.remove(0)
Expand Down Expand Up @@ -164,9 +167,9 @@ class Emitter(
*
* @param records List of failed records
*/
override def fail(records: JList[EmitterJsonInput]): Unit = {
override def fail(records: List[T]): Unit = {
records.asScala.foreach {
case (r: String, Validated.Invalid(fs)) =>
case (r: String, Validated.Invalid(fs: NonEmptyList[String])) =>
val output = EsLoaderBadRow(r, fs).toCompactJson
badSink.store(output, None, false)
case (_, Validated.Valid(_)) => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.snowplowanalytics.client.nsq.NSQConfig
import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback
import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback
import com.snowplowanalytics.client.nsq.exceptions.NSQException
import emitters.Emitter

//Java
import java.nio.charset.StandardCharsets.UTF_8
Expand Down Expand Up @@ -71,9 +72,8 @@ class NsqSourceExecutor(
private val msgBuffer = new ListBuffer[EmitterJsonInput]()
// ElasticsearchEmitter instance
private val emitter =
new Emitter(
new Emitter[EmitterJsonInput](
bulkSender,
goodSink,
badSink,
config.streams.buffer.recordLimit,
config.streams.buffer.byteLimit)
Expand All @@ -98,7 +98,7 @@ class NsqSourceExecutor(
msg.finished()

if (msgBuffer.size == nsqBufferSize) {
val rejectedRecords = emitter.emit(msgBuffer.toList)
val rejectedRecords = emitter.emitList(msgBuffer.toList)
emitter.fail(rejectedRecords.asJava)
msgBuffer.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ package com.snowplowanalytics.stream.loader

// Java
import java.util.Properties

import emitters.Emitter
import org.slf4j.Logger

// Scala
Expand Down Expand Up @@ -75,7 +77,7 @@ class EmitterSpec extends Specification {

val kcc =
new KinesisConnectorConfiguration(new Properties, new DefaultAWSCredentialsProviderChain)
val eem = new Emitter(fakeSender, None, new StdouterrSink, 1, 1L)
val eem = new Emitter(fakeSender, new StdouterrSink, 1, 1L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid
val invalidInput: EmitterJsonInput = "bad" -> "malformed event".invalidNel
Expand All @@ -96,7 +98,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1000L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -120,7 +122,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1000L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -144,7 +146,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 100, 1048576L)
val eem = new Emitter(ess, new StdouterrSink, 100, 1048576L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -168,7 +170,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1048576L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1048576L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -192,7 +194,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 2, 200L)
val eem = new Emitter(ess, new StdouterrSink, 2, 200L)

// record size is 95 bytes
val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clients
// AWS
import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchObject
import com.amazonaws.auth.AWSCredentialsProvider
import emitters.Emitter

// Java
import com.google.common.base.Charsets
Expand Down