From cf75db4815cad5fee10b0e7d7e9d4a445c7633c3 Mon Sep 17 00:00:00 2001 From: Slava Markeyev Date: Tue, 5 Sep 2023 10:48:23 -0700 Subject: [PATCH] send seen progress in batches (#120) * send seen progress in batches * update tests --- app/runner.go | 2 +- docker-compose.yml | 22 ++++ marshaller/marshaller.go | 2 +- transport/batcher/batcher.go | 29 +++-- transport/batcher/batcher_test.go | 125 +++++++++++--------- transport/factory/factory.go | 2 +- transport/manager/manager.go | 4 +- transport/progress/progress_tracker.go | 14 ++- transport/progress/progress_tracker_test.go | 28 ++--- 9 files changed, 136 insertions(+), 92 deletions(-) diff --git a/app/runner.go b/app/runner.go index ab3aed13..e1c60d81 100644 --- a/app/runner.go +++ b/app/runner.go @@ -171,7 +171,7 @@ func New(shutdownHandler shutdown.ShutdownHandler, statsChan := make(chan stats.Stat, clientBufferSize*5) // Transactions channels for progress reporting to the progress tracker - txnsSeen := make(chan *progress.Seen) // Must be unbuffered to maintain seen -> written ordering + txnsSeen := make(chan []*progress.Seen) // Must be unbuffered to maintain seen -> written ordering txnsWritten := make(chan *ordered_map.OrderedMap, clientBufferSize*5) // Initialize in reverse order diff --git a/docker-compose.yml b/docker-compose.yml index e81b1480..4519dd3b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,3 +8,25 @@ services: - POSTGRES_PASSWORD=pgbifrost ports: - 5432:5432 + networks: + - net + + bifrost: + container_name: bifrost + image: pg-bifrost:fastest + depends_on: + - postgres + ports: + - 6060:6060 + environment: + - BATCH_FLUSH_MAX_AGE=120000 + - BATCH_FLUSH_UPDATE_AGE=1000 + - NO_MARSHAL_OLD_VALUE=true + - WORKERS=1 + - BATCHER_MEMORY_SOFT_LIMIT=104857600 + networks: + - net + command: /pg-bifrost --host postgres --password pgbifrost replicate -s stdout + +networks: + net: \ No newline at end of file diff --git a/marshaller/marshaller.go b/marshaller/marshaller.go index 58235d13..6f2e97e2 100644 --- a/marshaller/marshaller.go +++ b/marshaller/marshaller.go @@ -109,7 +109,7 @@ func New(shutdownHandler shutdown.ShutdownHandler, statsChan chan stats.Stat, noMarshalOldValue bool) Marshaller { - outputChan := make(chan *MarshalledMessage, 1000) + outputChan := make(chan *MarshalledMessage) return Marshaller{shutdownHandler, inputChan, outputChan, statsChan, noMarshalOldValue} } diff --git a/transport/batcher/batcher.go b/transport/batcher/batcher.go index f60a0eb4..940fb348 100644 --- a/transport/batcher/batcher.go +++ b/transport/batcher/batcher.go @@ -62,7 +62,7 @@ type Batcher struct { inputChan <-chan *marshaller.MarshalledMessage // receive single MarshalledMessages outputChans []chan transport.Batch // one output channel per Transporter worker - txnsSeenChan chan<- *progress.Seen // channel to report transactions seen to ProgressTracker + txnsSeenChan chan<- []*progress.Seen // channel to report transactions seen to ProgressTracker statsChan chan stats.Stat tickRate time.Duration // controls frequency that batcher looks for input. This should be non-zero to avoid CPU spin. @@ -75,6 +75,7 @@ type Batcher struct { routingMethod BatchRouting // method to use when routing a batch to an output channel/worker roundRobinPosition int // in Round-robin routing mode this keeps track of the position txnsSeenTimeout time.Duration // timeout to wait on writing seen transactions. If this limit is reached then batcher errors out. + seenList []*progress.Seen } // NewBatcher returns a new Batcher with output channels to use as inputs for Transporters. Note the caller must @@ -82,7 +83,7 @@ type Batcher struct { // check compatibility. func NewBatcher(shutdownHandler shutdown.ShutdownHandler, inputChan <-chan *marshaller.MarshalledMessage, - txnsSeenChan chan<- *progress.Seen, + txnsSeenChan chan<- []*progress.Seen, statsChan chan stats.Stat, tickRate int, // number of milliseconds that batcher will wait to check for input. @@ -120,6 +121,7 @@ func NewBatcher(shutdownHandler shutdown.ShutdownHandler, routingMethod, 0, time.Second * 12, // max time to wait for sending transactions to ProgressTracker via txnsSeenChan + []*progress.Seen{}, } } @@ -206,18 +208,11 @@ func (b *Batcher) StartBatching() { // Update ledger when a COMMIT is seen if msg.Operation == "COMMIT" { - s := &progress.Seen{ + b.seenList = append(b.seenList, &progress.Seen{ Transaction: msg.Transaction, TimeBasedKey: msg.TimeBasedKey, TotalMsgs: totalMsgsInTxn, - CommitWalStart: msg.WalStart} - select { - case b.txnsSeenChan <- s: - log.Debug("seen a BatchTransaction") - case <-time.After(b.txnsSeenTimeout): - log.Error("fatal time out sending a BatchTransaction to the ProgressTracker") - return - } + CommitWalStart: msg.WalStart}) } // Reset counter for number of messages in transaction when @@ -369,6 +364,18 @@ func (b *Batcher) handleTicker() bool { } func (b *Batcher) sendBatch(batch transport.Batch) bool { + + // First flush out seen list + if len(b.seenList) > 0 { + select { + case b.txnsSeenChan <- b.seenList: + b.seenList = []*progress.Seen{} + log.Debug("seen a BatchTransaction") + case <-time.After(b.txnsSeenTimeout): + log.Panic("fatal time out sending a BatchTransaction to the ProgressTracker") + } + } + ok, err := batch.Close() if !ok { log.Error(err) diff --git a/transport/batcher/batcher_test.go b/transport/batcher/batcher_test.go index d535b569..2fe49b39 100644 --- a/transport/batcher/batcher_test.go +++ b/transport/batcher/batcher_test.go @@ -1,20 +1,21 @@ package batcher import ( + "errors" + "fmt" "testing" - "fmt" + "github.com/Nextdoor/pg-bifrost.git/transport" + "github.com/Nextdoor/pg-bifrost.git/transport/mocks" + "github.com/golang/mock/gomock" + "time" "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" - "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/Nextdoor/pg-bifrost.git/transport/batch" - "github.com/Nextdoor/pg-bifrost.git/transport/mocks" "github.com/Nextdoor/pg-bifrost.git/transport/progress" - "github.com/golang/mock/gomock" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -26,7 +27,7 @@ func TestBatchSizeOneOneTxnOneData(t *testing.T) { batchSize := 1 in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) @@ -61,11 +62,13 @@ func TestBatchSizeOneOneTxnOneData(t *testing.T) { Transaction: transaction, } - expectedSeen := &progress.Seen{ - Transaction: transaction, - TimeBasedKey: timeBasedKey, - CommitWalStart: commit.WalStart, - TotalMsgs: 1, + expectedSeen := []*progress.Seen{ + { + Transaction: transaction, + TimeBasedKey: timeBasedKey, + CommitWalStart: commit.WalStart, + TotalMsgs: 1, + }, } in <- begin @@ -115,7 +118,7 @@ func TestBatchSizeOneOneTxnTwoData(t *testing.T) { transaction := "1" in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) @@ -158,11 +161,13 @@ func TestBatchSizeOneOneTxnTwoData(t *testing.T) { Transaction: transaction, } - expectedSeen := &progress.Seen{ - Transaction: transaction, - TimeBasedKey: timeBasedKey, - CommitWalStart: commit.WalStart, - TotalMsgs: 2, + expectedSeen := []*progress.Seen{ + { + Transaction: transaction, + TimeBasedKey: timeBasedKey, + CommitWalStart: commit.WalStart, + TotalMsgs: 2, + }, } in <- begin @@ -217,7 +222,7 @@ func TestBatchSizeThreeTwoTxnTwoData(t *testing.T) { batchSize := 3 in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) sh := shutdown.NewShutdownHandler() @@ -260,11 +265,13 @@ func TestBatchSizeThreeTwoTxnTwoData(t *testing.T) { Transaction: transactionA, } - expectedSeenA := &progress.Seen{ - Transaction: transactionA, - TimeBasedKey: timeBasedKeyA, - CommitWalStart: commitA.WalStart, - TotalMsgs: 2, + expectedSeenA := []*progress.Seen{ + { + Transaction: transactionA, + TimeBasedKey: timeBasedKeyA, + CommitWalStart: commitA.WalStart, + TotalMsgs: 2, + }, } // Transaction 2 @@ -300,11 +307,13 @@ func TestBatchSizeThreeTwoTxnTwoData(t *testing.T) { Transaction: transactionB, } - expectedSeenB := &progress.Seen{ - Transaction: transactionB, - TimeBasedKey: timeBasedKeyB, - CommitWalStart: commitB.WalStart, - TotalMsgs: 2, + expectedSeenB := []*progress.Seen{ + { + Transaction: transactionB, + TimeBasedKey: timeBasedKeyB, + CommitWalStart: commitB.WalStart, + TotalMsgs: 2, + }, } in <- beginA @@ -390,7 +399,7 @@ func TestBatchSizeOneTwoTxnTwoWorkers(t *testing.T) { batchSize := 1 in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(1) sh := shutdown.NewShutdownHandler() @@ -424,11 +433,13 @@ func TestBatchSizeOneTwoTxnTwoWorkers(t *testing.T) { Transaction: transactionA, } - expectedSeenA := &progress.Seen{ - Transaction: transactionA, - TimeBasedKey: timeBasedKeyA, - CommitWalStart: commitA.WalStart, - TotalMsgs: 1, + expectedSeenA := []*progress.Seen{ + { + Transaction: transactionA, + TimeBasedKey: timeBasedKeyA, + CommitWalStart: commitA.WalStart, + TotalMsgs: 1, + }, } // Transaction 2 @@ -454,11 +465,13 @@ func TestBatchSizeOneTwoTxnTwoWorkers(t *testing.T) { Transaction: transactionB, } - expectedSeenB := &progress.Seen{ - Transaction: transactionB, - TimeBasedKey: timeBasedKeyB, - CommitWalStart: commitB.WalStart, - TotalMsgs: 1, + expectedSeenB := []*progress.Seen{ + { + Transaction: transactionB, + TimeBasedKey: timeBasedKeyB, + CommitWalStart: commitB.WalStart, + TotalMsgs: 1, + }, } in <- beginA @@ -513,7 +526,7 @@ func TestBatchSizeOneTwoTxnTwoWorkers(t *testing.T) { func TestInputChannelClose(t *testing.T) { // Setup in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(1) sh := shutdown.NewShutdownHandler() @@ -556,7 +569,7 @@ func TestErrMsgTooBig(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -607,7 +620,7 @@ func TestErrCantFit(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -665,7 +678,7 @@ func TestErrMsgInvalid(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -716,7 +729,7 @@ func TestErrUnknown(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -771,7 +784,7 @@ func TestFlushBatchTimeoutUpdate(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -823,7 +836,7 @@ func TestFlushBatchTimeoutMaxAge(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -875,7 +888,7 @@ func TestFlushFullBatch(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -940,7 +953,7 @@ func TestFlushEmptyBatchTimeout(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -981,7 +994,7 @@ func TestTxnsSeenTimeout(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen) // unbuffered channel which will block (this is the test) + txnSeen := make(chan []*progress.Seen) // unbuffered channel which will block (this is the test) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -990,9 +1003,8 @@ func TestTxnsSeenTimeout(t *testing.T) { b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) b.txnsSeenTimeout = time.Millisecond * 50 - commit := &marshaller.MarshalledMessage{ + commitA := &marshaller.MarshalledMessage{ Operation: "COMMIT", - Json: []byte("{MSG1}"), TimeBasedKey: "1-1", WalStart: 901, PartitionKey: "foo", @@ -1000,10 +1012,11 @@ func TestTxnsSeenTimeout(t *testing.T) { } // Expects + mockBatch.EXPECT().IsFull().Return(true) mockBatchFactory.EXPECT().NewBatch("foo").Return(mockBatch).Times(1) // Loop iteration 1 - add commit - in <- commit + in <- commitA go b.StartBatching() @@ -1025,7 +1038,7 @@ func TestTxnsSeenTimeout(t *testing.T) { } } -func getBasicSetup(t *testing.T) (*gomock.Controller, chan *marshaller.MarshalledMessage, chan *progress.Seen, *Batcher, *mocks.MockBatchFactory, *mocks.MockBatch) { +func getBasicSetup(t *testing.T) (*gomock.Controller, chan *marshaller.MarshalledMessage, chan []*progress.Seen, *Batcher, *mocks.MockBatchFactory, *mocks.MockBatch) { // Setup mock mockCtrl := gomock.NewController(t) @@ -1034,7 +1047,7 @@ func getBasicSetup(t *testing.T) (*gomock.Controller, chan *marshaller.Marshalle // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() @@ -1090,7 +1103,7 @@ func TestAddToBatchSendFatal(t *testing.T) { in <- commit - txnSeen := make(chan *progress.Seen) + txnSeen := make(chan []*progress.Seen) b.txnsSeenChan = txnSeen mockBatch.EXPECT().IsFull().Return(false) @@ -1362,7 +1375,7 @@ func TestPartitionRouting(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) - txnSeen := make(chan *progress.Seen, 1000) + txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() diff --git a/transport/factory/factory.go b/transport/factory/factory.go index 6dd9a969..1aa3c14e 100644 --- a/transport/factory/factory.go +++ b/transport/factory/factory.go @@ -35,7 +35,7 @@ func NewTransport(shutdownHandler shutdown.ShutdownHandler, transportType transport.TransportType, transportConfig map[string]interface{}, inputChan <-chan *marshaller.MarshalledMessage, - txnsSeen chan<- *progress.Seen, + txnsSeen chan<- []*progress.Seen, txnsWritten chan<- *ordered_map.OrderedMap, // Map of statsChan chan stats.Stat, workers int, diff --git a/transport/manager/manager.go b/transport/manager/manager.go index 64527631..77275bc5 100644 --- a/transport/manager/manager.go +++ b/transport/manager/manager.go @@ -27,7 +27,7 @@ import ( type TransportManager struct { inputChan <-chan *marshaller.MarshalledMessage - txnsSeen chan *progress.Seen + txnsSeen chan []*progress.Seen txnsWritten chan *ordered_map.OrderedMap statsChan chan stats.Stat @@ -43,7 +43,7 @@ type TransportManager struct { // returns a TransportManager which can start the Batcher and Transporters as go routines. func New(shutdownHandler shutdown.ShutdownHandler, inputChan <-chan *marshaller.MarshalledMessage, - txnsSeen chan *progress.Seen, + txnsSeen chan []*progress.Seen, txnsWritten chan *ordered_map.OrderedMap, // Map of statsChan chan stats.Stat, transportType transport.TransportType, diff --git a/transport/progress/progress_tracker.go b/transport/progress/progress_tracker.go index 00bcdd98..09d9d443 100644 --- a/transport/progress/progress_tracker.go +++ b/transport/progress/progress_tracker.go @@ -67,7 +67,7 @@ type Written struct { type ProgressTracker struct { shutdownHandler shutdown.ShutdownHandler - txnSeenChan <-chan *Seen + txnSeenChan <-chan []*Seen txnsWritten <-chan *ordered_map.OrderedMap // statsChan chan stats.Stat OutputChan chan uint64 // channel to send overall (youngest) progress on @@ -80,7 +80,7 @@ type ProgressTracker struct { // New creates a progress "table" based on the number of workers (inputChans) and returns a ProgressTracker // txnsSeen MUST be an unbuffered channel func New(shutdownHandler shutdown.ShutdownHandler, - txnSeenChan <-chan *Seen, + txnSeenChan <-chan []*Seen, txnsWritten <-chan *ordered_map.OrderedMap, statsChan chan stats.Stat) ProgressTracker { @@ -110,11 +110,13 @@ func (p ProgressTracker) shutdown() { } // updateSeen adds an entire transaction that was seen by the batcher to the ledger -func (p *ProgressTracker) updateSeen(seen *Seen) error { - err := p.ledger.updateSeen(seen) +func (p *ProgressTracker) updateSeen(seen []*Seen) error { + for _, s := range seen { + err := p.ledger.updateSeen(s) - if err != nil { - return err + if err != nil { + return err + } } return nil diff --git a/transport/progress/progress_tracker_test.go b/transport/progress/progress_tracker_test.go index bdcc9e9e..426c8f15 100644 --- a/transport/progress/progress_tracker_test.go +++ b/transport/progress/progress_tracker_test.go @@ -37,7 +37,7 @@ func init() { type testCase struct { action string writtenMap *ordered_map.OrderedMap - seen *Seen + seen []*Seen } type testExpected struct { @@ -79,8 +79,8 @@ func compareLedger(a Ledger, b Ledger) bool { return true } -func getProgressTracker() (ProgressTracker, chan *Seen, chan *ordered_map.OrderedMap) { - seen := make(chan *Seen) +func getProgressTracker() (ProgressTracker, chan []*Seen, chan *ordered_map.OrderedMap) { + seen := make(chan []*Seen) written := make(chan *ordered_map.OrderedMap, 1000) sh := shutdown.NewShutdownHandler() statsChan := make(chan stats.Stat, 100) @@ -153,7 +153,7 @@ func TestSingleSeenEntry(t *testing.T) { omap.Set("1-1", &LedgerEntry{"1", "1-1", uint64(111), 0, 1}) cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 1, 111}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 1, 111}}}, } entries := []testExpected{ testExpected{"1", "1-1", 111, 0, 1}, @@ -188,8 +188,8 @@ func TestSingleSeenEntry(t *testing.T) { func TestTwoDistinctSeenEntry(t *testing.T) { cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 1, 111}}, - testCase{"seen", nil, &Seen{"2", "2-1", 1, 222}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 1, 111}}}, + testCase{"seen", nil, []*Seen{{"2", "2-1", 1, 222}}}, } entries := []testExpected{ testExpected{"1", "1-1", 111, 0, 1}, @@ -207,7 +207,7 @@ func TestSingleSeenAndWrittenEntryWithoutTicker(t *testing.T) { omap2.Set("1-1", &Written{"1", "1-1", 1}) cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 1, 111}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 1, 111}}}, testCase{"written", omap2, nil}, } @@ -226,7 +226,7 @@ func TestSingleSeenAndWrittenEmitted(t *testing.T) { omap2.Set("1-1", &Written{"1", "1-1", 1}) cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 1, 999}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 1, 999}}}, testCase{"written", omap2, nil}, } @@ -249,8 +249,8 @@ func TestMultipleSeenAndWrittenEmitted(t *testing.T) { omap4.Set("1-1", &Written{"1", "1-1", 2}) cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 5, 888}}, - testCase{"seen", nil, &Seen{"2", "2-1", 2, 999}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 5, 888}}}, + testCase{"seen", nil, []*Seen{{"2", "2-1", 2, 999}}}, testCase{"written", omap1, nil}, testCase{"written", omap2, nil}, testCase{"written", omap3, nil}, @@ -269,8 +269,8 @@ func TestMultipleSeenAndWrittenEmitted(t *testing.T) { func TestSeenAndSeenAgain(t *testing.T) { cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 1, 999}}, - testCase{"seen", nil, &Seen{"1", "1-2", 1, 999}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 1, 999}}}, + testCase{"seen", nil, []*Seen{{"1", "1-2", 1, 999}}}, } entries := []testExpected{ testExpected{"1", "1-2", 999, 0, 1}, @@ -296,10 +296,10 @@ func TestSeenAndSeenAgainThenWritten(t *testing.T) { omap4.Set("1-2", &Written{"1", "1-2", 3}) cases := []testCase{ - testCase{"seen", nil, &Seen{"1", "1-1", 10, 999}}, + testCase{"seen", nil, []*Seen{{"1", "1-1", 10, 999}}}, testCase{"written", omap1, nil}, - testCase{"seen", nil, &Seen{"1", "1-2", 10, 999}}, + testCase{"seen", nil, []*Seen{{"1", "1-2", 10, 999}}}, testCase{"written", omap2, nil}, testCase{"written", omap3, nil}, testCase{"written", omap4, nil},