Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle segment granularity changes #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/com/metamx/tranquility/beam/Beam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval

/**
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
* the beam should be discarded (after calling close()).
*/
trait Beam[A]
trait Beam[A] extends DiscoverableInterval
{
/**
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
Expand All @@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
{
def this(s: String) = this(s, null)
}

trait DiscoverableInterval
{
/**
* Returns the interval handled by the Beam, can return None if there is no associated interval
* */
def getInterval(): Option[Interval]
}
215 changes: 150 additions & 65 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper](
emitter: ServiceEmitter
) extends Beam[A] with Logging
{

def getInterval() = None

private[this] implicit val timer: Timer = DefaultTimer.twitter

private[this] val port = if (uri.port > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class MemoryBeam[A](
jsonWriter: JsonWriter[A]
) extends Beam[A]
{
def getInterval() = None

def propagate(events: Seq[A]) = {
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,12 @@ class MergingPartitioningBeam[A](
Future.collect(beams map (_.close())) map (_ => ())
}

def getInterval() = {
beams.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}

override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology

class NoopBeam[A] extends Beam[A]
{
Expand All @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
def close() = Future.Done

override def toString = "NoopBeam()"

def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ class RoundRobinBeam[A](
}

override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")

def getInterval() = {
beams.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.metamx.tranquility.druid
import com.metamx.common.scala.Logging
import com.metamx.common.scala.Predef._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.{Beam, DefunctBeamException}
import com.metamx.tranquility.finagle._
import com.metamx.tranquility.typeclass.ObjectWriter
import com.twitter.util.Closable
Expand All @@ -44,6 +43,9 @@ class DruidBeam[A](
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging with Closable
{

def getInterval() = Some(interval)

private[this] val clients = Map(
tasks map {
task =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
}

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
Expand Down Expand Up @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)

val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ object DruidBeams
def close() = clusteredBeam.close() map (_ => lifecycle.stop())

override def toString = clusteredBeam.toString

def getInterval() = clusteredBeam.getInterval()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
}

override def close() = memoryBeam.close()

def getInterval() = None
}

val acked = new AtomicLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework
import org.joda.time.DateTimeZone
import org.joda.time.DateTime
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology
import org.scala_tools.time.Implicits._
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
Expand All @@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")),
SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")),
SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")),
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h"))
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")),
SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")),
SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")),
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o"))
) map {
x => x.fields("foo") -> x
}).toMap
Expand All @@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
val localZone = new DateTime().getZone

def buffers = _lock.synchronized {
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
_buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
}

def buffersWithInterval = _lock.synchronized {
_buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet
}

def beamsList = _lock.synchronized {
_beams.toList
}

class EventBuffer(val timestamp: DateTime, val partition: Int)
class EventBuffer(val interval: Interval, val partition: Int)
{
val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer()
@volatile var open: Boolean = true
Expand All @@ -109,20 +121,24 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def close() = {
beam.close()
}

def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString)
extends Beam[SimpleEvent]
{
_lock.synchronized {
_beams += this
}

def getInterval() = Some(interval)

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
} else {
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = true
buffer.buffer ++= _events
Future.value(_events.size)
Expand All @@ -131,35 +147,35 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA

def close() = _lock.synchronized {
_beams -= this
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = false
Future.Done
}

def toDict = Dict(
"timestamp" -> timestamp.toString(),
"interval" -> interval.toString,
"partition" -> partition,
"uuid" -> uuid
)
}

class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam]
{
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition)
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition)

def toDict(beam: TestingBeam) = {
Dict(
"timestamp" -> beam.timestamp.toString(),
"interval" -> beam.interval.toString,
"partition" -> beam.partition,
"uuid" -> beam.uuid
)
}

def fromDict(d: Dict) = {
val timestamp = new DateTime(d("timestamp"))
val interval= new Interval(d("interval"))
val partition = int(d("partition"))
val uuid = str(d("uuid"))
new TestingBeam(timestamp, partition, uuid)
new TestingBeam(interval, partition, uuid)
}
}

Expand Down Expand Up @@ -353,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
}
}

test("IncreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)
beamsA.blockagate(Seq("i") map events)
beamsA.timekeeper.now = start + 1.minute
beamsA.blockagate(Seq("j") map events)
beamsA.blockagate(Seq("j") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 2.minute
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("n") map events)

Await.result(beamsA.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events),
// "j" and "l" are in same partition as diff beams were used to propagate them
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events)
))
}
}

test("DecreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 4.minute
beamsB.blockagate(Seq("j") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("o") map events)
beamsB.blockagate(Seq("o") map events)
Await.result(beamsB.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events)
))
}
}

test("DefunctBeam") {
withLocalCurator {
curator =>
Expand Down Expand Up @@ -385,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
))
val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_))
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) {
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) {
Thread.sleep(100)
}
assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired)
assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.metamx.tranquility.beam.HashPartitionBeam
import com.twitter.util.Await
import com.twitter.util.Future
import java.util.concurrent.CopyOnWriteArrayList
import org.joda.time.Interval
import org.scalatest.FunSuite
import org.scalatest.Matchers
import scala.collection.JavaConverters._
Expand All @@ -44,6 +45,10 @@ class HashPartitionBeamTest extends FunSuite with Matchers
Future(events.size)
}

override def getInterval() = {
None
}

override def close() = Future.Done
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
}

def close() = Future.Done

def getInterval() = None
}

object SimpleBeam
Expand Down