Skip to content

Commit

Permalink
send seen progress in batches (#120)
Browse files Browse the repository at this point in the history
* send seen progress in batches

* update tests
  • Loading branch information
stlava authored Sep 5, 2023
1 parent ebfe027 commit cf75db4
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 92 deletions.
2 changes: 1 addition & 1 deletion app/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func New(shutdownHandler shutdown.ShutdownHandler,
statsChan := make(chan stats.Stat, clientBufferSize*5)

// Transactions channels for progress reporting to the progress tracker
txnsSeen := make(chan *progress.Seen) // Must be unbuffered to maintain seen -> written ordering
txnsSeen := make(chan []*progress.Seen) // Must be unbuffered to maintain seen -> written ordering
txnsWritten := make(chan *ordered_map.OrderedMap, clientBufferSize*5)

// Initialize in reverse order
Expand Down
22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,25 @@ services:
- POSTGRES_PASSWORD=pgbifrost
ports:
- 5432:5432
networks:
- net

bifrost:
container_name: bifrost
image: pg-bifrost:fastest
depends_on:
- postgres
ports:
- 6060:6060
environment:
- BATCH_FLUSH_MAX_AGE=120000
- BATCH_FLUSH_UPDATE_AGE=1000
- NO_MARSHAL_OLD_VALUE=true
- WORKERS=1
- BATCHER_MEMORY_SOFT_LIMIT=104857600
networks:
- net
command: /pg-bifrost --host postgres --password pgbifrost replicate -s stdout

networks:
net:
2 changes: 1 addition & 1 deletion marshaller/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func New(shutdownHandler shutdown.ShutdownHandler,
statsChan chan stats.Stat,
noMarshalOldValue bool) Marshaller {

outputChan := make(chan *MarshalledMessage, 1000)
outputChan := make(chan *MarshalledMessage)

return Marshaller{shutdownHandler, inputChan, outputChan, statsChan, noMarshalOldValue}
}
Expand Down
29 changes: 18 additions & 11 deletions transport/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Batcher struct {

inputChan <-chan *marshaller.MarshalledMessage // receive single MarshalledMessages
outputChans []chan transport.Batch // one output channel per Transporter worker
txnsSeenChan chan<- *progress.Seen // channel to report transactions seen to ProgressTracker
txnsSeenChan chan<- []*progress.Seen // channel to report transactions seen to ProgressTracker
statsChan chan stats.Stat

tickRate time.Duration // controls frequency that batcher looks for input. This should be non-zero to avoid CPU spin.
Expand All @@ -75,14 +75,15 @@ type Batcher struct {
routingMethod BatchRouting // method to use when routing a batch to an output channel/worker
roundRobinPosition int // in Round-robin routing mode this keeps track of the position
txnsSeenTimeout time.Duration // timeout to wait on writing seen transactions. If this limit is reached then batcher errors out.
seenList []*progress.Seen
}

// NewBatcher returns a new Batcher with output channels to use as inputs for Transporters. Note the caller must
// provide a BatchFactory which is compatible with the downstream Transporter. The Batcher does not check
// check compatibility.
func NewBatcher(shutdownHandler shutdown.ShutdownHandler,
inputChan <-chan *marshaller.MarshalledMessage,
txnsSeenChan chan<- *progress.Seen,
txnsSeenChan chan<- []*progress.Seen,
statsChan chan stats.Stat,

tickRate int, // number of milliseconds that batcher will wait to check for input.
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewBatcher(shutdownHandler shutdown.ShutdownHandler,
routingMethod,
0,
time.Second * 12, // max time to wait for sending transactions to ProgressTracker via txnsSeenChan
[]*progress.Seen{},
}
}

Expand Down Expand Up @@ -206,18 +208,11 @@ func (b *Batcher) StartBatching() {

// Update ledger when a COMMIT is seen
if msg.Operation == "COMMIT" {
s := &progress.Seen{
b.seenList = append(b.seenList, &progress.Seen{
Transaction: msg.Transaction,
TimeBasedKey: msg.TimeBasedKey,
TotalMsgs: totalMsgsInTxn,
CommitWalStart: msg.WalStart}
select {
case b.txnsSeenChan <- s:
log.Debug("seen a BatchTransaction")
case <-time.After(b.txnsSeenTimeout):
log.Error("fatal time out sending a BatchTransaction to the ProgressTracker")
return
}
CommitWalStart: msg.WalStart})
}

// Reset counter for number of messages in transaction when
Expand Down Expand Up @@ -369,6 +364,18 @@ func (b *Batcher) handleTicker() bool {
}

func (b *Batcher) sendBatch(batch transport.Batch) bool {

// First flush out seen list
if len(b.seenList) > 0 {
select {
case b.txnsSeenChan <- b.seenList:
b.seenList = []*progress.Seen{}
log.Debug("seen a BatchTransaction")
case <-time.After(b.txnsSeenTimeout):
log.Panic("fatal time out sending a BatchTransaction to the ProgressTracker")
}
}

ok, err := batch.Close()
if !ok {
log.Error(err)
Expand Down
Loading

0 comments on commit cf75db4

Please sign in to comment.