diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 00000000..5e9dad15 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,37 @@ +name: Lint + +env: + GO_VERSION: '1.20.4' + LINTER_VERSION: 'v1.52.2' + +on: push + +jobs: + lint: + runs-on: ubuntu-20.04 + steps: + - uses: actions/setup-go@v3 + with: + go-version: ${{ env.GO_VERSION }} + - uses: actions/checkout@v2 + - name: Cache Vendor + uses: actions/cache@v2 + env: + cache-name: cache-vendor-v1 + with: + path: vendor + key: ${{ env.cache-name }}-${{ env.GO_VERSION }}-${{ hashFiles('go.mod') }}-${{ hashFiles('go.sum') }} + restore-keys: | + ${{ env.cache-name }}-${{ env.GO_VERSION }}- + ${{ env.cache-name }}- + + - name: Populate Vendor + shell: bash + run: | + make vendor + + - shell: bash + run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@${{ env.LINTER_VERSION }} + + - shell: bash + run: golangci-lint run --out-format=github-actions \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 3a4aa58d..d40f3f14 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,8 +8,9 @@ FROM golang:1.20.4-buster as intermediate # Make a directory to place pprof files in. Typically used for itests. RUN mkdir /perf -# Build dependencies +# Build/test dependencies RUN go install github.com/golang/mock/mockgen@v1.6.0 +RUN go install golang.org/x/tools/cmd/goimports@latest WORKDIR /go/src/github.com/Nextdoor/pg-bifrost.git/ diff --git a/Makefile b/Makefile index 1785267f..a8928cbe 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,18 @@ ifeq ($(CI),true) GO_LDFLAGS += -X main.GitRevision=$(GIT_REVISION) -X main.Version=$(GIT_TAG_VERSION) endif +vendor: go.sum go.mod + go mod vendor -v + +check_imports: + @echo "Checking goimports for imports and formatting..." + goimports -l -d . + @goimports -l -d . | xargs echo | xargs test -z 2> /dev/null + +lint: vet check_imports + @echo "Running golangci-lint" + golangci-lint run + vet: @echo "Running go vet ..." go list ./... | xargs go vet @@ -20,7 +32,7 @@ generate: go generate ./... ;\ fi -test: generate vet +test: generate check_imports go clean -testcache || true @echo "Executing tests ..." go test -race -v ${GO_TEST_EXTRAS} ./... @@ -62,4 +74,4 @@ docker_get_binary: @$(DOCKER) cp "pg-bifrost-build":/pg-bifrost target/ @$(DOCKER) rm "pg-bifrost-build" -.PHONY: clean test itests docker_build docker_get_binary +.PHONY: clean test itests docker_build docker_get_binary vendor lint check_imports diff --git a/app/config/headers.go b/app/config/headers.go index 0ee9ac3e..92689edb 100644 --- a/app/config/headers.go +++ b/app/config/headers.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package config diff --git a/app/runner.go b/app/runner.go index feee6326..9e19023c 100644 --- a/app/runner.go +++ b/app/runner.go @@ -17,11 +17,14 @@ package app import ( + "time" + "github.com/Nextdoor/pg-bifrost.git/app/config" "github.com/Nextdoor/pg-bifrost.git/partitioner" "github.com/Nextdoor/pg-bifrost.git/transport/batcher" "github.com/jackc/pgx/v5/pgconn" - "time" + + "os" "github.com/Nextdoor/pg-bifrost.git/filter" "github.com/Nextdoor/pg-bifrost.git/marshaller" @@ -37,7 +40,6 @@ import ( "github.com/Nextdoor/pg-bifrost.git/transport/progress" "github.com/cevaris/ordered_map" "github.com/sirupsen/logrus" - "os" ) var ( @@ -234,7 +236,7 @@ func (m Runner) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing global channels") diff --git a/filter/filter.go b/filter/filter.go index c0b37b6a..48002b87 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package filter @@ -99,7 +99,7 @@ func (f *Filter) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing output channel") diff --git a/filter/filter_test.go b/filter/filter_test.go index 7d6658ab..5406883f 100644 --- a/filter/filter_test.go +++ b/filter/filter_test.go @@ -12,19 +12,20 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package filter import ( "github.com/Nextdoor/pg-bifrost.git/replication" + "testing" + "time" + + "github.com/Nextdoor/parselogical" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" - "github.com/Nextdoor/parselogical" "github.com/stretchr/testify/assert" - "testing" - "time" ) func _testMessage(relation string, operation string) *replication.WalMessage { diff --git a/main/main.go b/main/main.go index 4dc4b2c9..04d444af 100644 --- a/main/main.go +++ b/main/main.go @@ -274,7 +274,7 @@ func runReplicate( // If it's a SIGUSR1 and the cpuprofile flag was set, then dump pprof files and continue running. // https://golang.org/pkg/runtime/pprof if sig == syscall.SIGUSR1 && cpuprofile != "" { - if stoppedCpuProfiling != true { + if !stoppedCpuProfiling { log.Warn("Stopping CPU profiling") pprof.StopCPUProfile() log.Infof("Wrote pprof cpu profile to: '%s'", cpuprofile) @@ -292,7 +292,7 @@ func runReplicate( // https://golang.org/pkg/runtime/pprof if sig == syscall.SIGUSR2 && memprofile != "" { time.Sleep(1 * time.Second) - memProfile(fmt.Sprintf(memprofile)) + memProfile(memprofile) continue } @@ -300,7 +300,7 @@ func runReplicate( log.Info("pg-bifrost received a shutdown") // Stop profiling on shutdown - if cpuprofile != "" && stoppedCpuProfiling != true { + if cpuprofile != "" && !stoppedCpuProfiling { log.Warn("Stopping CPU profiling") pprof.StopCPUProfile() } diff --git a/marshaller/marshalled_message.go b/marshaller/marshalled_message.go index 8e226190..c84d579e 100644 --- a/marshaller/marshalled_message.go +++ b/marshaller/marshalled_message.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package marshaller diff --git a/marshaller/marshalled_message_test.go b/marshaller/marshalled_message_test.go index 14bcc83e..f1123e6c 100644 --- a/marshaller/marshalled_message_test.go +++ b/marshaller/marshalled_message_test.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package marshaller diff --git a/marshaller/marshaller.go b/marshaller/marshaller.go index 7c3a18ac..55437c66 100644 --- a/marshaller/marshaller.go +++ b/marshaller/marshaller.go @@ -17,7 +17,7 @@ package marshaller import ( - "github.com/json-iterator/go" + jsoniter "github.com/json-iterator/go" "os" "time" @@ -83,7 +83,7 @@ func (m Marshaller) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing output channel") diff --git a/marshaller/marshaller_test.go b/marshaller/marshaller_test.go index 885718fc..07a605d5 100644 --- a/marshaller/marshaller_test.go +++ b/marshaller/marshaller_test.go @@ -17,14 +17,15 @@ package marshaller import ( + "testing" + "time" + "github.com/Nextdoor/parselogical" "github.com/Nextdoor/pg-bifrost.git/replication" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "testing" - "time" ) func init() { diff --git a/partitioner/partitioner.go b/partitioner/partitioner.go index e5e32f0e..a78d9451 100644 --- a/partitioner/partitioner.go +++ b/partitioner/partitioner.go @@ -12,18 +12,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package partitioner import ( + "os" + "strconv" + "github.com/Nextdoor/pg-bifrost.git/replication" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" "github.com/Nextdoor/pg-bifrost.git/utils" "github.com/sirupsen/logrus" - "os" - "strconv" ) type PartitionMethod int @@ -66,7 +67,7 @@ type Partitioner struct { statsChan chan stats.Stat - method PartitionMethod + method PartitionMethod buckets int } @@ -98,7 +99,7 @@ func (f *Partitioner) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing output channel") @@ -144,16 +145,12 @@ func (f *Partitioner) Start() { switch f.method { case PART_METHOD_NONE: partitionKey = "" - break case PART_METHOD_TABLENAME: partitionKey = msg.Pr.Relation - break case PART_METHOD_TXN: partitionKey = msg.Pr.Transaction - break case PART_METHOD_TXN_BUCKET: partitionKey = strconv.Itoa(utils.QuickHash(msg.Pr.Transaction, f.buckets)) - break } msg.PartitionKey = partitionKey diff --git a/partitioner/partitioner_test.go b/partitioner/partitioner_test.go index d642d978..cdc721f0 100644 --- a/partitioner/partitioner_test.go +++ b/partitioner/partitioner_test.go @@ -12,18 +12,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package partitioner import ( + "testing" + + "github.com/Nextdoor/parselogical" "github.com/Nextdoor/pg-bifrost.git/replication" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" - "github.com/Nextdoor/parselogical" "github.com/stretchr/testify/assert" - "testing" - "time" ) func _setupPartitioner(partMethod PartitionMethod, buckets int) (Partitioner, chan *replication.WalMessage, chan stats.Stat) { @@ -36,24 +36,6 @@ func _setupPartitioner(partMethod PartitionMethod, buckets int) (Partitioner, ch return p, in, statsChan } -func _collectOutput(outputChan chan *replication.WalMessage) []*replication.WalMessage { - actual := make([]*replication.WalMessage, 0) - - func() { - for { - select { - case <-time.After(5 * time.Millisecond): - // Got all the stats - return - case m := <-outputChan: - actual = append(actual, m) - } - } - }() - - return actual -} - func _testMessage(relation string, transaction string) *replication.WalMessage { pr := parselogical.ParseResult{ State: parselogical.ParseState{}, @@ -84,7 +66,7 @@ func TestNone(t *testing.T) { go p.Start() in <- msg - outMsg := <- p.OutputChan + outMsg := <-p.OutputChan assert.Equal(t, "", outMsg.PartitionKey) } @@ -97,7 +79,7 @@ func TestTableName(t *testing.T) { go p.Start() in <- msg - outMsg := <- p.OutputChan + outMsg := <-p.OutputChan assert.Equal(t, "users", outMsg.PartitionKey) } @@ -110,7 +92,7 @@ func TestTransaction(t *testing.T) { go p.Start() in <- msg - outMsg := <- p.OutputChan + outMsg := <-p.OutputChan assert.Equal(t, "19", outMsg.PartitionKey) } @@ -121,10 +103,10 @@ func TestTransactionBuckets(t *testing.T) { go p.Start() in <- _testMessage("users", "111111") - outMsgA := <- p.OutputChan + outMsgA := <-p.OutputChan assert.Equal(t, "4", outMsgA.PartitionKey) in <- _testMessage("users", "333333") - outMsgB := <- p.OutputChan + outMsgB := <-p.OutputChan assert.Equal(t, "1", outMsgB.PartitionKey) -} \ No newline at end of file +} diff --git a/replication/client/client.go b/replication/client/client.go index 15a9f926..0766aa4f 100644 --- a/replication/client/client.go +++ b/replication/client/client.go @@ -18,15 +18,16 @@ package client import ( "context" - "github.com/Nextdoor/parselogical" - "github.com/jackc/pglogrepl" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgproto3" "os" "strconv" "strings" "time" + "github.com/Nextdoor/parselogical" + "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgproto3" + "github.com/Nextdoor/pg-bifrost.git/replication" "github.com/Nextdoor/pg-bifrost.git/replication/client/conn" "github.com/Nextdoor/pg-bifrost.git/shutdown" @@ -116,7 +117,7 @@ func (c *Replicator) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() close(c.outputChan) diff --git a/replication/client/client_test.go b/replication/client/client_test.go index 69caad3b..5af6082d 100644 --- a/replication/client/client_test.go +++ b/replication/client/client_test.go @@ -22,6 +22,8 @@ import ( "encoding/binary" "errors" "fmt" + "time" + "github.com/Nextdoor/pg-bifrost.git/replication" "github.com/Nextdoor/pg-bifrost.git/replication/client/conn/mocks" "github.com/Nextdoor/pg-bifrost.git/shutdown" @@ -30,7 +32,6 @@ import ( "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgproto3" "github.com/stretchr/testify/assert" - "time" "testing" ) @@ -112,8 +113,7 @@ func waitForShutdown(t *testing.T, mockManager *mocks.MockManagerInterface, hand handler.CancelFunc() // Add a little delay to ensure shutdown ran - var timeout *time.Timer - timeout = time.NewTimer(4000 * time.Millisecond) + var timeout = time.NewTimer(4000 * time.Millisecond) // Check to see if shutdown closed output channel select { @@ -819,8 +819,7 @@ func TestClosedProgressChan(t *testing.T) { close(progChan) // Add a little delay to ensure shutdown ran - var timeout *time.Timer - timeout = time.NewTimer(100 * time.Millisecond) + var timeout = time.NewTimer(100 * time.Millisecond) // Check to see if shutdown closed output channel select { @@ -987,18 +986,14 @@ func TestWithMultipleProgress(t *testing.T) { go replicator.Start(progChan) // Wait for first replicator to start - select { - case <-time.After(100 * time.Millisecond): - } + time.Sleep(100 * time.Millisecond) progChan <- uint64(progress1) progChan <- uint64(progress2) progChan <- uint64(progress3) // Wait a little bit for replicator to process progress - select { - case <-time.After(200 * time.Millisecond): - } + time.Sleep(200 * time.Millisecond) // Wait for shutdown waitForShutdown(t, mockManager, sh, stoppedChan) @@ -1298,8 +1293,7 @@ func TestHeartbeatRequestedShutdown(t *testing.T) { go replicator.Start(progChan) // Add a little delay to ensure shutdown ran - var timeout *time.Timer - timeout = time.NewTimer(1000 * time.Millisecond) + var timeout = time.NewTimer(1000 * time.Millisecond) // Check to see if shutdown closed output channel select { @@ -1581,8 +1575,7 @@ func TestRecoveryFailed(t *testing.T) { sh.CancelFunc() // Add a little delay to ensure shutdown ran - var timeout *time.Timer - timeout = time.NewTimer(100 * time.Millisecond) + var timeout = time.NewTimer(100 * time.Millisecond) // Check to see if shutdown closed output channel select { diff --git a/replication/client/conn/conn.go b/replication/client/conn/conn.go index 503043ab..d9dc5e67 100644 --- a/replication/client/conn/conn.go +++ b/replication/client/conn/conn.go @@ -18,11 +18,12 @@ package conn import ( "context" + "time" + "github.com/cenkalti/backoff/v4" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgproto3" - "time" ) // PgReplConnWrapper is a wrapper struct around pgx.ReplicationConn to help with gomocks diff --git a/replication/client/conn/interface.go b/replication/client/conn/interface.go index 591167c6..561b1de7 100644 --- a/replication/client/conn/interface.go +++ b/replication/client/conn/interface.go @@ -18,6 +18,7 @@ package conn import ( "context" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgproto3" ) diff --git a/replication/client/conn/manager.go b/replication/client/conn/manager.go index 9ee5894e..84027213 100644 --- a/replication/client/conn/manager.go +++ b/replication/client/conn/manager.go @@ -18,17 +18,16 @@ package conn import ( "context" + "os" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgconn" "github.com/sirupsen/logrus" - "os" - "time" ) var ( - logger = logrus.New() - log = logger.WithField("package", "conn") - logProgressInterval = int64(30 * time.Second) + logger = logrus.New() + log = logger.WithField("package", "conn") ) func init() { diff --git a/replication/message.go b/replication/message.go index 636d8fd6..fcaa0c42 100644 --- a/replication/message.go +++ b/replication/message.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package replication diff --git a/shutdown/shutdown.go b/shutdown/shutdown.go index 435cf621..ab34e285 100644 --- a/shutdown/shutdown.go +++ b/shutdown/shutdown.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package shutdown diff --git a/stats/aggregator/aggregate.go b/stats/aggregator/aggregate.go index 1853e45b..dbeb76a2 100644 --- a/stats/aggregator/aggregate.go +++ b/stats/aggregator/aggregate.go @@ -2,8 +2,9 @@ package aggregator import ( "fmt" - "github.com/Nextdoor/pg-bifrost.git/stats" "math" + + "github.com/Nextdoor/pg-bifrost.git/stats" ) // aggregate type which keeps a unique Stat's metadata, a running count of it's value diff --git a/stats/aggregator/aggregator.go b/stats/aggregator/aggregator.go index 6407f049..56f314aa 100644 --- a/stats/aggregator/aggregator.go +++ b/stats/aggregator/aggregator.go @@ -19,7 +19,6 @@ // // // receive aggregated stats // aggregated_stat := <-out_stats -// package aggregator import ( @@ -112,7 +111,7 @@ func (a *Aggregator) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing output channel") @@ -242,7 +241,7 @@ func (a *Aggregator) reportAggregatesWorker() { } if len(reportedBucketBtimes) != 0 { - for bucketTime, _ := range reportedBucketBtimes { + for bucketTime := range reportedBucketBtimes { delete(a.aggregates, bucketTime) } } diff --git a/stats/aggregator/aggregator_test.go b/stats/aggregator/aggregator_test.go index 7da37d6d..4e2ddcb1 100644 --- a/stats/aggregator/aggregator_test.go +++ b/stats/aggregator/aggregator_test.go @@ -116,8 +116,7 @@ func _testStart(t *testing.T, sts []testStat, expecteds []testResult) { // Compare the respective result and expected elements for _, expected := range expecteds { - var expectedStat stats.Stat - expectedStat = stats.NewStatCount("rplclient", + var expectedStat = stats.NewStatCount("rplclient", expected.statName, expected.value, expected.timestamp, @@ -235,24 +234,19 @@ func TestCloseInputChannel(t *testing.T) { go statsAggregator.Start() // Wait to ensure routine started - timeout := time.NewTimer(25 * time.Millisecond) - - select { - case <-timeout.C: - } + time.Sleep(25 * time.Millisecond) // Close input close(in) // Verify output gets closed - timeoutVerify := time.NewTimer(25 * time.Millisecond) select { case _, ok := <-statsAggregator.outputChan: if ok { assert.Fail(t, "output channel not properly closed") } - case <-timeoutVerify.C: + case <-time.After(25 * time.Millisecond): assert.Fail(t, "output channel not closed in time") } } @@ -335,14 +329,12 @@ func TestTerminationContextInSend(t *testing.T) { time.Sleep(250 * time.Millisecond) // Verify output gets closed - timeoutVerify := time.NewTimer(10 * time.Millisecond) - select { case _, ok := <-statsAggregator.outputChan: if ok { assert.Fail(t, "output channel not properly closed") } - case <-timeoutVerify.C: + case <-time.After(10 * time.Millisecond): assert.Fail(t, "output channel not closed in time") } } diff --git a/stats/reporters/datadog/datadog.go b/stats/reporters/datadog/datadog.go index 598b9a92..1d24ee73 100644 --- a/stats/reporters/datadog/datadog.go +++ b/stats/reporters/datadog/datadog.go @@ -63,10 +63,9 @@ func (r *DataDogReporter) Start() { switch s.StatType { case stats.Count: - r.client.Count(statName, s.Value, nil, 1) - break + _ = r.client.Count(statName, s.Value, nil, 1) case stats.Histogram: - r.client.Gauge(statName, float64(s.Value), nil, 1) + _ = r.client.Gauge(statName, float64(s.Value), nil, 1) } err := r.client.Flush() diff --git a/stats/reporters/factory/factory.go b/stats/reporters/factory/factory.go index 34946c17..7dcf128c 100644 --- a/stats/reporters/factory/factory.go +++ b/stats/reporters/factory/factory.go @@ -1,6 +1,8 @@ package factory import ( + "os" + "github.com/DataDog/datadog-go/v5/statsd" "github.com/Nextdoor/pg-bifrost.git/app/config" "github.com/Nextdoor/pg-bifrost.git/shutdown" @@ -9,7 +11,6 @@ import ( "github.com/Nextdoor/pg-bifrost.git/stats/reporters/datadog" "github.com/Nextdoor/pg-bifrost.git/stats/reporters/stdout" "github.com/sirupsen/logrus" - "os" ) var ( @@ -32,7 +33,6 @@ func New(shutdownHandler shutdown.ShutdownHandler, switch reporterType { case reporters.STDOUT: r = stdout.New(shutdownHandler, inputChan) - break case reporters.DATADOG: addr, ok := reporterConfig[config.VAR_NAME_DD_HOST].(string) if !ok { @@ -55,7 +55,6 @@ func New(shutdownHandler shutdown.ShutdownHandler, } r = datadog.New(shutdownHandler, inputChan, c) - break } return r, nil diff --git a/stats/stat.go b/stats/stat.go index c5f131ef..acc7b4dc 100644 --- a/stats/stat.go +++ b/stats/stat.go @@ -1,9 +1,10 @@ package stats import ( - "github.com/google/go-cmp/cmp" "testing" "time" + + "github.com/google/go-cmp/cmp" ) type StatType string diff --git a/transport/batch/generic_batch.go b/transport/batch/generic_batch.go index 865a69ae..5f56aecf 100644 --- a/transport/batch/generic_batch.go +++ b/transport/batch/generic_batch.go @@ -1,9 +1,10 @@ package batch import ( + "time" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/transport/progress" - "time" "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/cevaris/ordered_map" @@ -72,10 +73,7 @@ func (b *GenericBatch) GetPartitionKey() string { } func (b *GenericBatch) IsFull() bool { - if len(b.messages) >= b.maxSize { - return true - } - return false + return len(b.messages) >= b.maxSize } func (b *GenericBatch) IsEmpty() bool { diff --git a/transport/batch/generic_batch_test.go b/transport/batch/generic_batch_test.go index b4612b83..fea813a5 100644 --- a/transport/batch/generic_batch_test.go +++ b/transport/batch/generic_batch_test.go @@ -1,11 +1,12 @@ package batch import ( + "testing" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/transport/progress" "github.com/cevaris/ordered_map" "github.com/stretchr/testify/assert" - "testing" ) func TestAddTransaction(t *testing.T) { @@ -35,9 +36,9 @@ func TestAddTransaction(t *testing.T) { Transaction: "1", } - b.Add(begin) - b.Add(insert) - b.Add(commit) + _, _ = b.Add(begin) + _, _ = b.Add(insert) + _, _ = b.Add(commit) messages := b.GetPayload() messagesSlice, ok := messages.([]*marshaller.MarshalledMessage) @@ -127,11 +128,11 @@ func TestPartialTransaction(t *testing.T) { Transaction: "2", } - b.Add(begin1) - b.Add(insert1) - b.Add(commit1) - b.Add(begin2) - b.Add(insert2) + _, _ = b.Add(begin1) + _, _ = b.Add(insert1) + _, _ = b.Add(commit1) + _, _ = b.Add(begin2) + _, _ = b.Add(insert2) assert.Equal(t, true, b.IsFull(), "Batch should be full") messages := b.GetPayload() @@ -184,7 +185,7 @@ func TestIsEmptyFalse(t *testing.T) { Transaction: "2", } - b.Add(insert) + _, _ = b.Add(insert) assert.Equal(t, false, b.IsEmpty()) assert.Equal(t, 1, b.NumMessages()) @@ -207,7 +208,7 @@ func TestNewBatchFromFactory(t *testing.T) { Transaction: "2", } - b.Add(insert) + _, _ = b.Add(insert) b = f.NewBatch("") assert.Equal(t, true, b.IsEmpty()) diff --git a/transport/batcher/batcher.go b/transport/batcher/batcher.go index 9116ae7d..03002ccd 100644 --- a/transport/batcher/batcher.go +++ b/transport/batcher/batcher.go @@ -2,6 +2,9 @@ package batcher import ( "container/heap" + "os" + "time" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -10,8 +13,6 @@ import ( "github.com/Nextdoor/pg-bifrost.git/transport/progress" "github.com/Nextdoor/pg-bifrost.git/utils" "github.com/sirupsen/logrus" - "os" - "time" ) var ( @@ -48,7 +49,7 @@ func GetRoutingMethod(name string) BatchRouting { const ( DEFAULT_MAX_MEMORY_BYTES = int64(1024 * 1024 * 100) - TICKER_RATE = 1 * time.Second + TICKER_RATE = 1 * time.Second ) func init() { @@ -123,7 +124,7 @@ func NewBatcher(shutdownHandler shutdown.ShutdownHandler, func safeCloseChan(c chan transport.Batch) { defer func() { // recover if channel is already closed - recover() + _ = recover() }() close(c) @@ -147,7 +148,7 @@ func (b *Batcher) shutdown() { // Also close the transaction progress channel defer func() { // recover if channel is already closed - recover() + _ = recover() }() close(b.txnsSeenChan) @@ -382,10 +383,8 @@ func (b *Batcher) sendBatch(batch transport.Batch) bool { } else { b.roundRobinPosition++ } - break case BATCH_ROUTING_PARTITION: channelIndex = utils.QuickHash(batch.GetPartitionKey(), b.workers) - break } log.Debugf("sending batch to worker %d", channelIndex) diff --git a/transport/batcher/batcher_test.go b/transport/batcher/batcher_test.go index d3c941b0..3b767a6f 100644 --- a/transport/batcher/batcher_test.go +++ b/transport/batcher/batcher_test.go @@ -4,6 +4,8 @@ import ( "testing" "fmt" + "time" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -14,7 +16,6 @@ import ( "github.com/golang/mock/gomock" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "time" ) func TestBatchSizeOneOneTxnOneData(t *testing.T) { diff --git a/transport/interfaces.go b/transport/interfaces.go index 6a4e1570..3c68e6b3 100644 --- a/transport/interfaces.go +++ b/transport/interfaces.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package transport diff --git a/transport/manager/manager.go b/transport/manager/manager.go index 297dea6e..acbe895e 100644 --- a/transport/manager/manager.go +++ b/transport/manager/manager.go @@ -9,10 +9,11 @@ // go m.StartBatcher() // go m.StartTransporterGroup() // go m.Start() -// package manager import ( + "log" + "github.com/Nextdoor/pg-bifrost.git/app/config" "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" @@ -22,7 +23,6 @@ import ( "github.com/Nextdoor/pg-bifrost.git/transport/factory" "github.com/Nextdoor/pg-bifrost.git/transport/progress" "github.com/cevaris/ordered_map" - "log" ) type TransportManager struct { diff --git a/transport/progress/ledger.go b/transport/progress/ledger.go index fcb033d8..9e1a82a2 100644 --- a/transport/progress/ledger.go +++ b/transport/progress/ledger.go @@ -12,12 +12,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package progress import ( - "errors" "fmt" "github.com/cevaris/ordered_map" @@ -84,7 +83,7 @@ func (l *Ledger) updateSeen(seen *Seen) error { // Sanity check. Don't allow overwriting ledger entries. Duplicates should have been removed above. if ledgerEntry.CommitWalStart != 0 { - return errors.New(fmt.Sprintf("Transaction ID %s CommitWalStart was not 0", seen.TimeBasedKey)) + return fmt.Errorf("transaction ID %s CommitWalStart was not 0", seen.TimeBasedKey) } ledgerEntry.TotalMsgs = seen.TotalMsgs diff --git a/transport/progress/ledger_test.go b/transport/progress/ledger_test.go index 0d6b4b7b..49547614 100644 --- a/transport/progress/ledger_test.go +++ b/transport/progress/ledger_test.go @@ -12,13 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package progress import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func _compareLedgerToSeen(seen *Seen, ledger Ledger, t *testing.T) { diff --git a/transport/progress/progress_tracker.go b/transport/progress/progress_tracker.go index 64866145..413dc721 100644 --- a/transport/progress/progress_tracker.go +++ b/transport/progress/progress_tracker.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package progress @@ -74,8 +74,8 @@ type ProgressTracker struct { txnSeenChan <-chan *Seen txnsWritten <-chan *ordered_map.OrderedMap // - statsChan chan stats.Stat - OutputChan chan uint64 // channel to send overall (youngest) progress on + statsChan chan stats.Stat + OutputChan chan uint64 // channel to send overall (youngest) progress on ledger Ledger // ordered map which contains batches seen & written per transaction @@ -87,7 +87,7 @@ type ProgressTracker struct { func New(shutdownHandler shutdown.ShutdownHandler, txnSeenChan <-chan *Seen, txnsWritten <-chan *ordered_map.OrderedMap, - statsChan chan stats.Stat) ProgressTracker { + statsChan chan stats.Stat) ProgressTracker { outputChan := make(chan uint64, outputChanSize) stopChan := make(chan struct{}) @@ -107,7 +107,7 @@ func (p ProgressTracker) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() log.Debug("closing output channel") diff --git a/transport/progress/progress_tracker_test.go b/transport/progress/progress_tracker_test.go index 808b7533..bdcc9e9e 100644 --- a/transport/progress/progress_tracker_test.go +++ b/transport/progress/progress_tracker_test.go @@ -12,17 +12,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package progress import ( "fmt" - "github.com/Nextdoor/pg-bifrost.git/stats" "reflect" "testing" "time" + "github.com/Nextdoor/pg-bifrost.git/stats" + "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/cevaris/ordered_map" "github.com/sirupsen/logrus" @@ -155,7 +156,7 @@ func TestSingleSeenEntry(t *testing.T) { testCase{"seen", nil, &Seen{"1", "1-1", 1, 111}}, } entries := []testExpected{ - testExpected{"1","1-1", 111, 0, 1}, + testExpected{"1", "1-1", 111, 0, 1}, } expecteds := testExpecteds{ entries: entries, @@ -164,7 +165,6 @@ func TestSingleSeenEntry(t *testing.T) { testRunner(t, cases, expecteds, 0) } - // TODO(#9): add a test case to panic if you see two COMMITs with the same timeBasedKey //func TestDoubleSeenEntry(t *testing.T) { // omap1 := ordered_map.NewOrderedMap() @@ -192,8 +192,8 @@ func TestTwoDistinctSeenEntry(t *testing.T) { testCase{"seen", nil, &Seen{"2", "2-1", 1, 222}}, } entries := []testExpected{ - testExpected{"1","1-1", 111, 0, 1}, - testExpected{"2","2-1", 222, 0, 1}, + testExpected{"1", "1-1", 111, 0, 1}, + testExpected{"2", "2-1", 222, 0, 1}, } expecteds := testExpecteds{ entries: entries, @@ -350,11 +350,7 @@ func TestDualShutdown(t *testing.T) { close(written) // Give time for ProgressTracker to handle shutdown - timeout := time.NewTimer(25 * time.Millisecond) - - select { - case <-timeout.C: - } + time.Sleep(25 * time.Millisecond) // Catch panic if it does happen defer func() { diff --git a/transport/progress/utils.go b/transport/progress/utils.go index 14dac50e..5876b3f6 100644 --- a/transport/progress/utils.go +++ b/transport/progress/utils.go @@ -33,7 +33,7 @@ func CompareBatchTransactions(a *ordered_map.OrderedMap, b *ordered_map.OrderedM key := kv.Key.(string) value := kv.Value.(*Written) - if packedVal, ok := b.Get(key); ok == true { + if packedVal, ok := b.Get(key); ok { val := packedVal.(*Written) if val.Transaction != value.Transaction { diff --git a/transport/progress/utils_test.go b/transport/progress/utils_test.go index 0d28e7ef..213bc0fe 100644 --- a/transport/progress/utils_test.go +++ b/transport/progress/utils_test.go @@ -12,14 +12,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package progress import ( + "testing" + "github.com/cevaris/ordered_map" "github.com/stretchr/testify/assert" - "testing" ) func TestIsEqual(t *testing.T) { @@ -84,7 +85,6 @@ func TestIsNotEqualMultiple(t *testing.T) { omapA.Set("1-1", &Written{"1", "1-1", 1}) omapA.Set("2-1", &Written{"2", "2-1", 1}) - omapB := ordered_map.NewOrderedMap() omapB.Set("1-1", &Written{"1", "1-1", 1}) omapB.Set("2-1", &Written{"2", "2-1", 4}) diff --git a/transport/transporters/kinesis/batch/batch.go b/transport/transporters/kinesis/batch/batch.go index 16d26611..bcdea56a 100644 --- a/transport/transporters/kinesis/batch/batch.go +++ b/transport/transporters/kinesis/batch/batch.go @@ -12,12 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package batch import ( "fmt" + "time" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/Nextdoor/pg-bifrost.git/transport/progress" @@ -25,7 +27,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cevaris/ordered_map" "github.com/pkg/errors" - "time" ) const ( @@ -77,7 +78,6 @@ func (b *KinesisBatch) Add(msg *marshaller.MarshalledMessage) (bool, error) { switch b.partitionMethod { case utils.KINESIS_PART_WALSTART: partitionKey = fmt.Sprintf("%v", msg.WalStart) - break case utils.KINESIS_PART_BATCH: partitionKey = msg.PartitionKey } @@ -140,11 +140,7 @@ func (b *KinesisBatch) CreateTime() int64 { } func (b *KinesisBatch) IsFull() bool { - if len(b.records) >= MAX_RECORDS { - return true - } - - return false + return len(b.records) >= MAX_RECORDS } func (b *KinesisBatch) IsEmpty() bool { diff --git a/transport/transporters/kinesis/batch/batch_test.go b/transport/transporters/kinesis/batch/batch_test.go index c62728b5..01c65389 100644 --- a/transport/transporters/kinesis/batch/batch_test.go +++ b/transport/transporters/kinesis/batch/batch_test.go @@ -12,11 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package batch import ( + "testing" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/Nextdoor/pg-bifrost.git/transport/progress" @@ -24,7 +26,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cevaris/ordered_map" "github.com/stretchr/testify/assert" - "testing" ) func TestAddTransaction(t *testing.T) { @@ -54,9 +55,9 @@ func TestAddTransaction(t *testing.T) { Transaction: "1", } - batch.Add(begin) - batch.Add(insert) - batch.Add(commit) + _, _ = batch.Add(begin) + _, _ = batch.Add(insert) + _, _ = batch.Add(commit) payload := batch.GetPayload() prre, ok := payload.([]*kinesis.PutRecordsRequestEntry) @@ -70,7 +71,7 @@ func TestAddTransaction(t *testing.T) { assert.Equal(t, insert.Json, prre[0].Data, "Batch should only contain INSERT") omap := ordered_map.NewOrderedMap() - omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count:1}) + omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count: 1}) assert.Equal(t, true, progress.CompareBatchTransactions(omap, batch.GetTransactions()), "Batch should have expected transactions") } @@ -148,7 +149,7 @@ func TestFullBatchCommit(t *testing.T) { // Verify COMMIT recorded in transactions omap := ordered_map.NewOrderedMap() - omap.Set("0-0", &progress.Written{Transaction: "0", TimeBasedKey: "0-0", Count:MAX_RECORDS}) + omap.Set("0-0", &progress.Written{Transaction: "0", TimeBasedKey: "0-0", Count: MAX_RECORDS}) assert.Equal(t, true, progress.CompareBatchTransactions(omap, batch.GetTransactions()), "Batch should have expected transactions") } @@ -237,7 +238,7 @@ func TestInvalid(t *testing.T) { } func TestClose(t *testing.T) { - batch := NewKinesisBatch("",utils.KINESIS_PART_WALSTART) + batch := NewKinesisBatch("", utils.KINESIS_PART_WALSTART) ok, err := batch.Close() assert.Equal(t, true, ok) diff --git a/transport/transporters/kinesis/config.go b/transport/transporters/kinesis/config.go index 63e5db4f..1e440686 100644 --- a/transport/transporters/kinesis/config.go +++ b/transport/transporters/kinesis/config.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package kinesis diff --git a/transport/transporters/kinesis/factory.go b/transport/transporters/kinesis/factory.go index c05e6929..8ddbef75 100644 --- a/transport/transporters/kinesis/factory.go +++ b/transport/transporters/kinesis/factory.go @@ -17,11 +17,12 @@ package kinesis import ( + "os" + "time" + "github.com/Nextdoor/pg-bifrost.git/app/config" "github.com/Nextdoor/pg-bifrost.git/partitioner" "github.com/Nextdoor/pg-bifrost.git/transport/transporters/kinesis/utils" - "os" - "time" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" diff --git a/transport/transporters/kinesis/transporter/transporter.go b/transport/transporters/kinesis/transporter/transporter.go index 2e390126..7c9f1482 100644 --- a/transport/transporters/kinesis/transporter/transporter.go +++ b/transport/transporters/kinesis/transporter/transporter.go @@ -21,6 +21,7 @@ import ( "time" "fmt" + "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" "github.com/Nextdoor/pg-bifrost.git/transport" @@ -134,7 +135,7 @@ func (t *KinesisTransporter) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() t.log.Debug("closing progress channel") diff --git a/transport/transporters/kinesis/transporter/transporter_test.go b/transport/transporters/kinesis/transporter/transporter_test.go index f43afc9f..2255aa1c 100644 --- a/transport/transporters/kinesis/transporter/transporter_test.go +++ b/transport/transporters/kinesis/transporter/transporter_test.go @@ -18,6 +18,7 @@ package transporter import ( "fmt" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -28,6 +29,9 @@ import ( "github.com/Nextdoor/pg-bifrost.git/utils" utils_mocks "github.com/Nextdoor/pg-bifrost.git/utils/mocks" + "testing" + "time" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cenkalti/backoff/v4" "github.com/cevaris/ordered_map" @@ -35,8 +39,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "testing" - "time" ) var ( @@ -74,7 +76,7 @@ func TestPutOk(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) @@ -200,7 +202,7 @@ func TestPutRetryWithError(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) @@ -279,7 +281,7 @@ func TestPutWithFailuresNoError(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) @@ -373,8 +375,8 @@ func TestRetryOnlyFailures(t *testing.T) { Transaction: "124", } - b.Add(&firstMessage) - b.Add(&secondMessage) + _, _ = b.Add(&firstMessage) + _, _ = b.Add(&secondMessage) // Input firstPk := fmt.Sprintf("%v", firstMessage.WalStart) @@ -553,7 +555,7 @@ func TestTerminationContextInRetry(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) @@ -626,7 +628,7 @@ func TestPanicHandling(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) @@ -690,7 +692,7 @@ func TestMissMatchReply(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects pk := fmt.Sprintf("%v", marshalledMessage.WalStart) diff --git a/transport/transporters/kinesis/utils/kinesis.go b/transport/transporters/kinesis/utils/kinesis.go index 00680526..70dd01d7 100644 --- a/transport/transporters/kinesis/utils/kinesis.go +++ b/transport/transporters/kinesis/utils/kinesis.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package utils diff --git a/transport/transporters/rabbitmq/transporter/connection_test.go b/transport/transporters/rabbitmq/transporter/connection_test.go index 3bcf7b6a..589138e8 100644 --- a/transport/transporters/rabbitmq/transporter/connection_test.go +++ b/transport/transporters/rabbitmq/transporter/connection_test.go @@ -38,7 +38,9 @@ func TestGetConnection(t *testing.T) { if err != nil { t.Error(err) } - defer fakeServer.Stop() + defer func() { + _ = fakeServer.Stop() + }() connMan := NewConnectionManager(connString, log, makeDialer(connString)) conn, err := connMan.GetConnection(context.Background()) @@ -68,13 +70,15 @@ func TestGetConnectionAfterFailure(t *testing.T) { if conn == nil { t.Error() } - fakeServer.Stop() + _ = fakeServer.Stop() err = fakeServer.Start() if err != nil { t.Error(err) } - defer fakeServer.Stop() + defer func() { + _ = fakeServer.Stop() + }() conn, err = connMan.GetConnection(context.Background()) if err != nil { t.Error(err) diff --git a/transport/transporters/rabbitmq/transporter/transporter.go b/transport/transporters/rabbitmq/transporter/transporter.go index a6da46ef..6d735d5a 100644 --- a/transport/transporters/rabbitmq/transporter/transporter.go +++ b/transport/transporters/rabbitmq/transporter/transporter.go @@ -101,7 +101,7 @@ func (t *RabbitMQTransporter) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() t.log.Debug("closing progress channel") diff --git a/transport/transporters/rabbitmq/transporter/transporter_test.go b/transport/transporters/rabbitmq/transporter/transporter_test.go index c1ad873d..e9261f88 100644 --- a/transport/transporters/rabbitmq/transporter/transporter_test.go +++ b/transport/transporters/rabbitmq/transporter/transporter_test.go @@ -18,11 +18,12 @@ package transporter import ( "context" - "github.com/stretchr/testify/assert" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/NeowayLabs/wabbit" w_amqp "github.com/NeowayLabs/wabbit/amqp" "github.com/NeowayLabs/wabbit/amqptest" @@ -66,7 +67,9 @@ func TestPutOk(t *testing.T) { if err != nil { t.Error(err) } - defer fakeServer.Stop() + defer func() { + _ = fakeServer.Stop() + }() connMan := NewConnectionManager(connString, log, makeDialer(connString)) mockConn, err := connMan.GetConnection(context.Background()) @@ -121,7 +124,7 @@ func TestPutOk(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) mockTime.EXPECT().UnixNano().Return(int64(0)) mockTime.EXPECT().UnixNano().Return(int64(1000 * time.Millisecond)) @@ -231,7 +234,7 @@ func TestConnectionDies(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) mockTime.EXPECT().UnixNano().Return(int64(0)) mockTime.EXPECT().UnixNano().Return(int64(1000 * time.Millisecond)) @@ -242,7 +245,7 @@ func TestConnectionDies(t *testing.T) { expectChan <- 1 }) - fakeServer.Stop() + _ = fakeServer.Stop() in <- b @@ -252,7 +255,9 @@ func TestConnectionDies(t *testing.T) { if err != nil { t.Error(err) } - defer fakeServer.Stop() + defer func() { + _ = fakeServer.Stop() + }() select { case <-time.After(400 * time.Millisecond): @@ -284,7 +289,9 @@ func TestRetryAfterWaitForConnectionError(t *testing.T) { if err != nil { t.Error(err) } - defer fakeServer.Stop() + defer func() { + _ = fakeServer.Stop() + }() connMan := NewConnectionManager(connString, log, makeDialer(connString)) mockConn, err := connMan.GetConnection(context.Background()) @@ -340,7 +347,7 @@ func TestRetryAfterWaitForConnectionError(t *testing.T) { Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) mockTime.EXPECT().UnixNano().Return(int64(0)) mockTime.EXPECT().UnixNano().Return(int64(1000 * time.Millisecond)) diff --git a/transport/transporters/s3/config.go b/transport/transporters/s3/config.go index faa1ff4e..26de020c 100644 --- a/transport/transporters/s3/config.go +++ b/transport/transporters/s3/config.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package s3 @@ -24,7 +24,7 @@ import ( const ( ConfVarBucketName = "s3-bucket" ConfVarKeySpace = "s3-key-space" - ConfVarPutBatchSize = "s3-batch-size" + ConfVarPutBatchSize = "s3-batch-size" ConfVarAwsAccessKeyId = "aws-access-key-id" ConfVarAwsSecretAccessKey = "aws-secret-access-key" ConfVarAwsRegion = "aws-region" diff --git a/transport/transporters/s3/transporter/transporter.go b/transport/transporters/s3/transporter/transporter.go index f9f2c918..aeaf328e 100644 --- a/transport/transporters/s3/transporter/transporter.go +++ b/transport/transporters/s3/transporter/transporter.go @@ -21,7 +21,7 @@ import ( "compress/gzip" "context" "fmt" - "io/ioutil" + "io" "strings" "time" @@ -174,7 +174,7 @@ func (t *S3Transporter) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() t.log.Debug("closing progress channel") @@ -200,7 +200,9 @@ func (t *S3Transporter) transportWithRetry(ctx context.Context, messagesSlice [] if _, err := gz.Write(msg.Json); err != nil { return err, cancelled } - gz.Write([]byte("\n")) + if _, err := gz.Write([]byte("\n")); err != nil { + return err, cancelled + } } if err := gz.Flush(); err != nil { @@ -210,19 +212,15 @@ func (t *S3Transporter) transportWithRetry(ctx context.Context, messagesSlice [] return err, cancelled } - gz.Reset(ioutil.Discard) + gz.Reset(io.Discard) firstWalStart := messagesSlice[0].WalStart - // Free up messagesSlice now that we're done with it - messagesSlice = nil - // Clear out every allocation in the conversion byteArray := buf.Bytes() buf.Reset() byteReader := bytes.NewReader(byteArray) - byteArray = nil // Partition the S3 keys into days year, month, day, hour, full := ts.DateString() diff --git a/transport/transporters/s3/transporter/transporter_test.go b/transport/transporters/s3/transporter/transporter_test.go index 7b18ff68..1cefb6ae 100644 --- a/transport/transporters/s3/transporter/transporter_test.go +++ b/transport/transporters/s3/transporter/transporter_test.go @@ -56,7 +56,7 @@ func resetTimeSource() { // operations. Subsequent reads off of this io.ReadSeeker will not return any data. func streamSeekToBytes(stream io.ReadSeeker) string { buf := new(bytes.Buffer) - buf.ReadFrom(stream) + _, _ = buf.ReadFrom(stream) return buf.String() } @@ -66,12 +66,12 @@ func gzipAsReadSeeker(messagesSlice []*marshaller.MarshalledMessage) io.ReadSeek gz := gzip.NewWriter(&buf) for _, msg := range messagesSlice { - gz.Write(msg.Json) - gz.Write([]byte("\n")) + _, _ = gz.Write(msg.Json) + _, _ = gz.Write([]byte("\n")) } - gz.Flush() - gz.Close() + _ = gz.Flush() + _ = gz.Close() byteArray := buf.Bytes() return bytes.NewReader(byteArray) @@ -98,8 +98,8 @@ func (s *putObjectInputMatcher) Matches(x interface{}) bool { actualBase64 := base64.StdEncoding.EncodeToString([]byte(actualBodyString)) expectedBase64 := base64.StdEncoding.EncodeToString([]byte(s.BodyString)) fmt.Println("Base64 encoding of PutObjectInput.Body Matcher test: ") - fmt.Println(fmt.Sprintf("actual: %s", actualBase64)) - fmt.Println(fmt.Sprintf("expected: %s", expectedBase64)) + fmt.Printf("actual: %s\n", actualBase64) + fmt.Printf("expected: %s\n", expectedBase64) return *actual.Bucket == *s.Bucket && *actual.Key == *s.Key && @@ -151,7 +151,7 @@ func TestSinglePutOk(t *testing.T) { WalStart: 1234, Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects expectedInput := s3.PutObjectInput{ @@ -217,7 +217,7 @@ func TestSinglePutMultipleRecordsOk(t *testing.T) { WalStart: 1234, Transaction: "123", } - b.Add(&firstMarshalledMessage) + _, _ = b.Add(&firstMarshalledMessage) secondMarshalledMessage := marshaller.MarshalledMessage{ Operation: "UPDATE", @@ -226,7 +226,7 @@ func TestSinglePutMultipleRecordsOk(t *testing.T) { WalStart: 1235, Transaction: "123", } - b.Add(&secondMarshalledMessage) + _, _ = b.Add(&secondMarshalledMessage) // Expects expectedInput := s3.PutObjectInput{ @@ -292,7 +292,7 @@ func TestSingleRecordSinglePutWithFailuresNoError(t *testing.T) { WalStart: 1234, Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects expectedInputOne := s3.PutObjectInput{ @@ -369,7 +369,7 @@ func TestSingleRecordDoublePutRetriesExhaustedWithError(t *testing.T) { WalStart: 1234, Transaction: "123", } - b.Add(&marshalledMessage) + _, _ = b.Add(&marshalledMessage) // Expects expectedInputOne := s3.PutObjectInput{ diff --git a/transport/transporters/stdout/factory.go b/transport/transporters/stdout/factory.go index 71e99901..1864890e 100644 --- a/transport/transporters/stdout/factory.go +++ b/transport/transporters/stdout/factory.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package stdout @@ -23,8 +23,8 @@ import ( "github.com/Nextdoor/pg-bifrost.git/stats" "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/Nextdoor/pg-bifrost.git/transport/transporters/stdout/transporter" - "github.com/sirupsen/logrus" "github.com/cevaris/ordered_map" + "github.com/sirupsen/logrus" ) var ( diff --git a/transport/transporters/stdout/transporter/transporter.go b/transport/transporters/stdout/transporter/transporter.go index 78266e15..88204833 100644 --- a/transport/transporters/stdout/transporter/transporter.go +++ b/transport/transporters/stdout/transporter/transporter.go @@ -12,12 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package transporter import ( "fmt" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -68,7 +69,7 @@ func (t StdoutTransporter) shutdown() { defer func() { // recover if channel is already closed - recover() + _ = recover() }() t.log.Debug("closing progress channel") @@ -120,7 +121,7 @@ func (t StdoutTransporter) StartTransporting() { } for _, msg := range messagesSlice { - fmt.Println(fmt.Sprintf("%d: %s", t.id, string(msg.Json))) + fmt.Printf("%d: %s\n", t.id, string(msg.Json)) } // report transactions written in this batch diff --git a/transport/transporters/stdout/transporter/transporter_test.go b/transport/transporters/stdout/transporter/transporter_test.go index f5b600e6..f5985597 100644 --- a/transport/transporters/stdout/transporter/transporter_test.go +++ b/transport/transporters/stdout/transporter/transporter_test.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package transporter @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "time" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -29,7 +31,6 @@ import ( "github.com/cevaris/ordered_map" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "time" ) var ( @@ -67,14 +68,14 @@ func TestBatchHasCommit(t *testing.T) { } b := batch.NewGenericBatch("", 1) - b.Add(&_msgPtr) - b.Add(&_commitPtr) + _, _ = b.Add(&_msgPtr) + _, _ = b.Add(&_commitPtr) in <- b result := <-txns omap := ordered_map.NewOrderedMap() - omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count:1}) + omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count: 1}) assert.Equal(t, omap, result, "A message sent to the transporter should have a matching progress result.") } @@ -98,13 +99,13 @@ func TestBatchHasNoCommit(t *testing.T) { } b := batch.NewGenericBatch("", 1) - b.Add(&_msgPtr) + _, _ = b.Add(&_msgPtr) in <- b result := <-txns omap := ordered_map.NewOrderedMap() - omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count:1}) + omap.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count: 1}) assert.Equal(t, omap, result, "A message sent to the transporter should have a matching progress result.") } diff --git a/utils/time.go b/utils/time.go index 4e17bb2c..b34fd03f 100644 --- a/utils/time.go +++ b/utils/time.go @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - */ +*/ package utils @@ -57,8 +57,8 @@ func (rt RealTime) DateString() ( fullFormat := now.Format("20060102150405") return strconv.Itoa(yearInt), - intDateToNormalString(monthInt), - intDateToNormalString(dayInt), - intDateToNormalString(hourInt), - fullFormat + intDateToNormalString(monthInt), + intDateToNormalString(dayInt), + intDateToNormalString(hourInt), + fullFormat } diff --git a/utils/utils.go b/utils/utils.go index 0ed944a9..a89f6dd2 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -18,12 +18,13 @@ package utils import ( "fmt" + "hash/crc32" + "github.com/Nextdoor/pg-bifrost.git/replication/client/conn" "github.com/cevaris/ordered_map" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgconn" "github.com/pkg/errors" - "hash/crc32" "context" )