Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Preliminary EventReplicationDecider #5

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ case class EncodedEvent(metadata: EventMetadata, payload: EventBytes) extends Du
localSequenceNr = localSequenceNr))
}
}

case class FullEvent(encoded: EncodedEvent, decoded: DecodedEvent)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.rbmhtechnology.eventuate.sandbox

import akka.serialization.Serialization
import akka.serialization.Serializer
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer.decode

import scala.reflect.ClassTag
import scala.reflect._
import scala.util.Failure
import scala.util.Success
import scala.util.Try

trait EventCompatibility
case class Compatible(event: FullEvent) extends EventCompatibility
case class MinorIncompatibility(event: FullEvent, required: EventVersion, supported: EventVersion) extends EventCompatibility
case class MajorIncompatibility(schema: String, required: EventVersion, supported: EventVersion) extends EventCompatibility
case class CannotDecode(serializer: Serializer, schema: String, cause: Throwable) extends EventCompatibility
case class NoSerializer(serializerId: Int) extends EventCompatibility
case class NotEventPayloadSerializer(event: FullEvent, serializer: Serializer) extends EventCompatibility
case class NoEventVersion(event: FullEvent, serializerId: Int) extends EventCompatibility

object EventCompatibility {

def eventCompatibility(encoded: EncodedEvent)(implicit serialization: Serialization): EventCompatibility = {
val serializerId = encoded.payload.serializerId
val manifest = encoded.payload.manifest
val compatibility= for {
serializer <- toRight(serialization.serializerByIdentity.get(serializerId), NoSerializer(serializerId))
event <- toRight(decode(encoded).map(FullEvent(encoded, _)), CannotDecode(serializer, manifest.schema, _ : Throwable))
payloadSerializer <- castOrLeft[EventPayloadSerializer, EventCompatibility](serializer, NotEventPayloadSerializer(event, serializer))
eventVersion <- toRight(manifest.eventVersion, NoEventVersion(event, serializerId))
_ <- Left(compareVersions(event, payloadSerializer.eventVersion(manifest.schema), eventVersion)).right
} yield ()
compatibility.left.get
}

private def compareVersions(event: FullEvent, supported: EventVersion, required: EventVersion): EventCompatibility = {
if(supported.majorVersion < required.majorVersion) MajorIncompatibility(event.encoded.payload.manifest.schema, required, supported)
else if(supported.majorVersion == required.majorVersion && supported.minorVersion < required.minorVersion) MinorIncompatibility(event, required, supported)
else Compatible(event)
}

private def castOrLeft[A : ClassTag, L](a: AnyRef, left: L): Either.RightProjection[L, A] =
Either.cond(classTag[A].runtimeClass.isAssignableFrom(a.getClass), a.asInstanceOf[A], left).right

private def toRight[L, R](option: Option[R], left: L): Either.RightProjection[L, R] =
Either.cond(option.isDefined, option.get, left).right

private def toRight[L, R](t: Try[R], makeLeft: Throwable => L): Either.RightProjection[L, R] =
t match {
case Success(r) => Right(r).right
case Failure(ex) => Left(makeLeft(ex)).right
}
}
79 changes: 64 additions & 15 deletions src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package com.rbmhtechnology.eventuate.sandbox

import akka.actor._
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.eventCompatibility
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.ReplicationDecision.replicationContinues
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.ReplicationDecision.unwrapKeep
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.Filter
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.ReplicationDecision
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.Stop
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.StopOnUnserializableKeepOthers
import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._
import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
Expand Down Expand Up @@ -44,26 +53,32 @@ trait EventLogOps {
progressStore.getOrElse(logId, 0L)

def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events, (evt, snr) => evt.emitted(id, snr))
write[EncodedEvent](events, identity, (evt, snr) => evt.emitted(id, snr))

def replicationWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events.filter(causalityFilter(_versionVector).apply), (evt, snr) => evt.replicated(id, snr))
def replicationWrite(events: Seq[FullEvent]): Seq[FullEvent] =
write[FullEvent](
events.filter(ev => causalityFilter(_versionVector)(ev.encoded)),
_.encoded,
(evt, snr) => {
val encoded = evt.encoded.replicated(id, snr)
evt.copy(encoded, evt.decoded.copy(metadata = encoded.metadata))
})

def progressWrite(progresses: Map[String, Long]): Unit =
progressStore = progressStore ++ progresses

private def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = {
private def write[A](as: Seq[A], getEncoded: A => EncodedEvent, prepare: (A, Long) => A): Seq[A] = {
var snr = _sequenceNr
var cvv = _versionVector
var log = eventStore

val written = events.map { event =>
val written = as.map { event =>
snr = snr + 1L

val prepared = prepare(event, snr)

cvv = cvv.merge(prepared.metadata.vectorTimestamp)
log = log :+ prepared
cvv = cvv.merge(getEncoded(prepared).metadata.vectorTimestamp)
log = log :+ getEncoded(prepared)

prepared
}
Expand Down Expand Up @@ -92,7 +107,10 @@ trait EventSubscribers {
class EventLog(val id: String, val targetFilters: Map[String, ReplicationFilter], val sourceFilter: ReplicationFilter) extends Actor with EventLogOps with EventSubscribers {
import EventLog._

import context.system
private var replicationDeciders: Map[String, EventReplicationDecider] = Map.empty

private implicit val serialization =
SerializationExtension(context.system)

override def receive = {
case Subscribe(subscriber) =>
Expand All @@ -108,29 +126,60 @@ class EventLog(val id: String, val targetFilters: Map[String, ReplicationFilter]
val decoded = encoded.zip(events).map { case (enc, dec) => dec.copy(enc.metadata) }
sender() ! WriteSuccess(decoded)
publish(decoded)
case ReplicationWrite(events, sourceLogId, progress) =>
val encoded = replicationWrite(events); progressWrite(Map(sourceLogId -> progress))
val decoded = decode(encoded)
sender() ! ReplicationWriteSuccess(encoded, sourceLogId, progress, versionVector)
publish(decoded)
case ReplicationWrite(encodedEvents, sourceLogId, progress) =>
val (continued, stopped) = split(encodedEvents, sourceLogId)
val filtered = continued
.filter(_ != Filter)
.map(unwrapKeep)

val events = replicationWrite(filtered.toList)
if(stopped.isEmpty) progressWrite(Map(sourceLogId -> progress))
val response =
failureIfStoppedOnFirst(continued ++ stopped)
.getOrElse(ReplicationWriteSuccess(events.map(_.encoded), sourceLogId, progress, versionVector))
sender() ! response
publish(events.map(_.decoded))
case GetReplicationProgressAndVersionVector(logId) =>
sender() ! GetReplicationProgressAndVersionVectorSuccess(progressRead(logId), versionVector)

case AddDecider(sourceLogId: String, decider: EventReplicationDecider) =>
replicationDeciders += sourceLogId -> decider
case RemoveDecider(sourceLogId: String) =>
replicationDeciders -= sourceLogId
}

private def split(encodedEvents: Seq[EncodedEvent], sourceLogId: String): (Stream[ReplicationDecision], Stream[ReplicationDecision]) = {
Stream(encodedEvents: _*)
.map(eventCompatibility)
.map(replicationDeciders.getOrElse(sourceLogId, StopOnUnserializableKeepOthers).decide)
.span(replicationContinues)
}

def targetFilter(targetLogId: String): ReplicationFilter =
targetFilters.getOrElse(targetLogId, NoFilter)

private def failureIfStoppedOnFirst(decisions: Stream[ReplicationDecision]): Option[ReplicationWriteFailure] =
decisions.headOption.collect {
case Stop(reason) => ReplicationWriteFailure(new ReplicationStoppedException(reason))
}
}

object EventLog {

case class AddDecider(sourceLogId: String, eventReplicationDecider: EventReplicationDecider)
case class RemoveDecider(sourceLogId: String)

def props(id: String): Props =
props(id, Map.empty, NoFilter)

def props(id: String, targetFilters: Map[String, ReplicationFilter], sourceFilter: ReplicationFilter): Props =
Props(new EventLog(id, targetFilters, sourceFilter))

def encode(events: Seq[DecodedEvent])(implicit system: ActorSystem): Seq[EncodedEvent] =
def encode(events: Seq[DecodedEvent])(implicit serialization: Serialization): Seq[EncodedEvent] =
events.map(EventPayloadSerializer.encode)

def decode(events: Seq[EncodedEvent])(implicit system: ActorSystem): Seq[DecodedEvent] =
def decode(events: Seq[EncodedEvent])(implicit serialization: Serialization): Seq[DecodedEvent] =
events.map(e => EventPayloadSerializer.decode(e).get)

class ReplicationStoppedException(compatibility: EventCompatibility) extends IllegalStateException(s"Replication stooped: $compatibility")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.ReplicationDecision

// TODO find better names
trait EventReplicationDecider {
def decide(eventCompatibility: EventCompatibility): ReplicationDecision
}

object EventReplicationDecider {

trait ReplicationDecision
case class Keep(event: FullEvent) extends ReplicationDecision
case class Stop(reason: EventCompatibility) extends ReplicationDecision
case object Filter extends ReplicationDecision

object ReplicationDecision {
def unwrapKeep(decision: ReplicationDecision): FullEvent = decision match {
case Keep(event) => event
}
def replicationContinues(decision: ReplicationDecision): Boolean =
!decision.isInstanceOf[Stop]
}

object StopOnIncompatibility extends EventReplicationDecider {
override def decide(eventCompatibility: EventCompatibility): ReplicationDecision =
eventCompatibility match {
case Compatible(event) => Keep(event)
case _ => Stop(eventCompatibility)
}
}

object StopOnUnserializableKeepOthers extends EventReplicationDecider {
override def decide(eventCompatibility: EventCompatibility): ReplicationDecision =
eventCompatibility match {
case Compatible(event) => Keep(event)
case MinorIncompatibility(event, _, _) => Keep(event)
case NoEventVersion(event, _) => Keep(event)
case NotEventPayloadSerializer(event, _) => Keep(event)
case _ => Stop(eventCompatibility)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import java.util.function.UnaryOperator

import akka.actor._
import akka.pattern.{ask, pipe}

import com.rbmhtechnology.eventuate.sandbox.EventLog.AddDecider
import com.rbmhtechnology.eventuate.sandbox.EventLog.RemoveDecider
import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.typesafe.config._
Expand Down Expand Up @@ -51,9 +52,15 @@ class ReplicationEndpoint(

def connect(remoteEndpoint: ReplicationEndpoint): Future[String] =
connect(remoteEndpoint.connectionAcceptor)
def connect(remoteEndpoint: ReplicationEndpoint, eventReplicationDeciders: Map[String, EventReplicationDecider]): Future[String] =
connect(remoteEndpoint.connectionAcceptor, eventReplicationDeciders)

def connect(remoteAcceptor: ActorRef): Future[String] =
def connect(remoteAcceptor: ActorRef, eventReplicationDeciders: Map[String, EventReplicationDecider] = Map.empty): Future[String] = {
remoteAcceptor.ask(GetReplicationSourceLogs(logNames))(settings.askTimeout).mapTo[GetReplicationSourceLogsSuccess].map { reply =>
eventReplicationDeciders.foreach { case (logName, decider) =>
eventLogs.get(logName).foreach(_ ! AddDecider(logId(reply.endpointId, logName), decider))
}
//TODO make sure deciders are added before replicators are started
val replicators = reply.sourceLogs.map {
case (logName, sourceLog) =>
val sourceLogId = logId(reply.endpointId, logName)
Expand All @@ -63,9 +70,14 @@ class ReplicationEndpoint(
addConnection(reply.endpointId, replicators.toSet)
reply.endpointId
}
}

def disconnect(remoteEndpointId: String): Unit =
def disconnect(remoteEndpointId: String): Unit = {
removeConnection(remoteEndpointId).foreach(system.stop)
eventLogs.foreach { case (logName, eventLog) =>
eventLog ! RemoveDecider(logId(remoteEndpointId, logName))
}
}

def terminate(): Future[Terminated] =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ object ReplicationProtocol {

case class ReplicationWrite(events: Seq[EncodedEvent], sourceLogId: String, progress: Long)
case class ReplicationWriteSuccess(events: Seq[EncodedEvent], sourceLogId: String, progress: Long, targetVersionVector: VectorTime)
case class ReplicationWriteFailure(cause: Throwable)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.rbmhtechnology.eventuate.sandbox.serializer

import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.serialization.Serialization
import akka.serialization.Serializer
import akka.serialization.SerializerWithStringManifest
import com.rbmhtechnology.eventuate.sandbox.DecodedEvent
Expand All @@ -17,18 +16,18 @@ abstract class EventPayloadSerializer extends SerializerWithStringManifest {
}

object EventPayloadSerializer {
def encode(event: DecodedEvent)(implicit system: ActorSystem): EncodedEvent =
def encode(event: DecodedEvent)(implicit serialization: Serialization): EncodedEvent =
EncodedEvent(event.metadata, serializePayload(event.payload))

def decode(event: EncodedEvent)(implicit system: ActorSystem): Try[DecodedEvent] =
def decode(event: EncodedEvent)(implicit serialization: Serialization): Try[DecodedEvent] =
deserializePayload(event.payload)
.map(payload => DecodedEvent(event.metadata, payload))

def isDeserializable(event: EncodedEvent)(implicit system: ActorSystem): Boolean =
def isDeserializable(event: EncodedEvent)(implicit serialization: Serialization): Boolean =
deserializePayload(event.payload).isSuccess

private def serializePayload(payload: AnyRef)(implicit system: ActorSystem): EventBytes = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
private def serializePayload(payload: AnyRef)(implicit serialization: Serialization): EventBytes = {
val serializer = serialization.findSerializerFor(payload)
EventBytes(serializer.toBinary(payload), serializer.identifier, eventManifest(serializer, payload))
}

Expand All @@ -51,14 +50,14 @@ object EventPayloadSerializer {
case _ => None
}

private def deserializePayload(payload: EventBytes)(implicit system: ActorSystem): Try[AnyRef] =
private def deserializePayload(payload: EventBytes)(implicit serialization: Serialization): Try[AnyRef] =
if (payload.manifest.isStringManifest) {
SerializationExtension(system).deserialize(
serialization.deserialize(
payload.bytes,
payload.serializerId,
payload.manifest.schema)
} else {
val manifestClass = Class.forName(payload.manifest.schema)
SerializationExtension(system).deserialize(payload.bytes, manifestClass).map(_.asInstanceOf[AnyRef])
serialization.deserialize(payload.bytes, manifestClass).map(_.asInstanceOf[AnyRef])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package com.rbmhtechnology.eventuate.sandbox

import akka.actor._
import akka.pattern.ask
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.testkit._
import akka.util.Timeout
import com.rbmhtechnology.eventuate.sandbox.EventReplicationDecider.StopOnUnserializableKeepOthers
import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer
Expand All @@ -22,12 +25,12 @@ object EventLogSpec {
val LogId2 = "L2"
val LogId3 = "L3"

class ExcludePayload(payload: String)(implicit system: ActorSystem) extends ReplicationFilter {
class ExcludePayload(payload: String)(implicit serialization: Serialization) extends ReplicationFilter {
override def apply(event: EncodedEvent): Boolean =
EventPayloadSerializer.decode(event).get.payload != payload
}

def excludePayload(payload: String)(implicit system: ActorSystem): ReplicationFilter =
def excludePayload(payload: String)(implicit serialization: Serialization): ReplicationFilter =
new ExcludePayload(payload)
}

Expand All @@ -46,6 +49,8 @@ class EventLogSpec extends TestKit(ActorSystem("test")) with WordSpecLike with M
implicit override val patienceConfig =
PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis), interval = Span(100, Millis))

implicit val serialization: Serialization = SerializationExtension(system)

override protected def beforeEach(): Unit =
log = system.actorOf(EventLog.props(LogId1, Map(LogId2 -> excludePayload("y")), excludePayload("z")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import akka.actor._
import akka.pattern.ask
import akka.testkit._
import akka.util.Timeout

import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._
import com.rbmhtechnology.eventuate.sandbox.ReplicationEndpoint._

import org.scalatest._

object ReplicationEndpointSpec {
Expand Down
Loading