Skip to content

Commit

Permalink
decrease memory allocations (#114)
Browse files Browse the repository at this point in the history
* have marshaller re-use memory to avoid allocations

* fix erroneous debug which allocates memory
  • Loading branch information
stlava authored Aug 31, 2023
1 parent 1de09f2 commit de7e724
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 16 deletions.
93 changes: 79 additions & 14 deletions marshaller/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package marshaller

import (
"os"
"sync"
"time"

gojson "github.com/goccy/go-json"
Expand All @@ -35,11 +36,59 @@ var (
log = logger.WithField("package", "marshaller")
)

const epochFormatted = "1970-01-01T00:00:00Z"

func init() {
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
}

var colValuesPool = sync.Pool{
New: func() interface{} {
return map[string]string{}
},
}

var usedColValues []map[string]string

func getColValue() map[string]string {
v := colValuesPool.Get().(map[string]string)
usedColValues = append(usedColValues, v)
return v
}

func clearColValues() {
for _, m := range usedColValues {
colValuesPool.Put(m)
}

usedColValues = usedColValues[0:0]
}

var colValuePairPool = sync.Pool{
New: func() interface{} {
return map[string]map[string]string{}
},
}

var usedColValueParis []map[string]map[string]string

func getColValuePair() map[string]map[string]string {
v := colValuePairPool.Get().(map[string]map[string]string)
usedColValueParis = append(usedColValueParis, v)
return v
}

func clearColValuePairs() {
for _, m := range usedColValueParis {
delete(m, "old")
delete(m, "new")
colValuePairPool.Put(m)
}

usedColValueParis = usedColValueParis[0:0]
}

type Marshaller struct {
shutdownHandler shutdown.ShutdownHandler

Expand Down Expand Up @@ -158,24 +207,30 @@ func marshalColumnValue(cv *parselogical.ColumnValue) map[string]string {
if cv.Quoted {
quoted = "true"
}
return map[string]string{"v": cv.Value, "t": cv.Type, "q": quoted}

mp := getColValue()
mp["v"] = cv.Value
mp["t"] = cv.Type
mp["q"] = quoted

return mp
}

// marshalColumnValuePair marshals the column value pairs which shows the old and new columns
func marshalColumnValuePair(newValue *parselogical.ColumnValue, oldValue *parselogical.ColumnValue) map[string]map[string]string {
if oldValue != nil && newValue != nil {
return map[string]map[string]string{
"old": marshalColumnValue(oldValue),
"new": marshalColumnValue(newValue),
}
cvp := getColValuePair()
cvp["old"] = marshalColumnValue(oldValue)
cvp["new"] = marshalColumnValue(newValue)
return cvp
} else if newValue != nil {
return map[string]map[string]string{
"new": marshalColumnValue(newValue),
}
cvp := getColValuePair()
cvp["new"] = marshalColumnValue(newValue)
return cvp
} else if oldValue != nil {
return map[string]map[string]string{
"old": marshalColumnValue(oldValue),
}
cvp := getColValuePair()
cvp["old"] = marshalColumnValue(oldValue)
return cvp
}

return nil
Expand All @@ -185,8 +240,13 @@ func marshalColumnValuePair(newValue *parselogical.ColumnValue, oldValue *parsel
func marshalWalToJson(msg *replication.WalMessage, noMarshalOldValue bool) ([]byte, error) {
lsn := pglogrepl.LSN(msg.WalStart).String()

// ServerTime * 1,000,000 to convert from milliseconds to nanoseconds
t := time.Unix(0, int64(msg.ServerTime)*1000000).UTC().Format(time.RFC3339)
var t string
if msg.ServerTime != 0 {
// ServerTime * 1,000,000 to convert from milliseconds to nanoseconds
t = time.Unix(0, int64(msg.ServerTime)*1000000).UTC().Format(time.RFC3339)
} else {
t = epochFormatted
}
columns := make(map[string]map[string]map[string]string)

for k, v := range msg.Pr.Columns {
Expand Down Expand Up @@ -218,11 +278,16 @@ func marshalWalToJson(msg *replication.WalMessage, noMarshalOldValue bool) ([]by
}
}

return gojson.Marshal(&jsonWalEntry{
ret, err := gojson.Marshal(&jsonWalEntry{
Time: &t,
Lsn: &lsn,
Table: &msg.Pr.Relation,
Operation: &msg.Pr.Operation,
Columns: &columns,
})

clearColValues()
clearColValuePairs()

return ret, err
}
8 changes: 8 additions & 0 deletions marshaller/marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,11 @@ func TestTerminationContextOutput(t *testing.T) {
assert.Fail(t, "output channel not properly closed")
}
}

func BenchmarkMarshalWalToJson(b *testing.B) {
msg := _basicInsertMessage()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = marshalWalToJson(msg, true)
}
}
6 changes: 4 additions & 2 deletions stats/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,10 @@ func computeAggregateKey(s stats.Stat) string {
// isBtimeExpired returns true if the current time is past the bucket time with a grace period
func (a *Aggregator) isBtimeExpired(bucketTime int64) bool {
timeNow := a.timeNow().UnixNano()
log.Debugf(`%s > %s ?`, strconv.FormatInt(timeNow, 10),
strconv.FormatInt(bucketTime+a.aggregateTimeNano+reportGraceNano, 10))
if log.Level == logrus.DebugLevel {
log.Debugf(`%s > %s ?`, strconv.FormatInt(timeNow, 10),
strconv.FormatInt(bucketTime+a.aggregateTimeNano+reportGraceNano, 10))
}
return timeNow > bucketTime+a.aggregateTimeNano+reportGraceNano
}

Expand Down

0 comments on commit de7e724

Please sign in to comment.