diff --git a/.circleci/config.yml b/.circleci/config.yml index 1573731c..1294f39e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,36 +11,27 @@ defaults_itests: &defaults_itests docker_layer_caching: true step_library: -- &step_restore_vendor_cache - restore_cache: - keys: - - vendor-{{ checksum "Gopkg.lock" }} - -- &step_save_vendor_cache - save_cache: - key: vendor-{{ checksum "Gopkg.lock" }} - paths: - - vendor - -- &step_test - run: - name: Run go vet and tests - command: | - CI=true make test - - &step_itests run: name: Run integration tests in docker command: | git submodule sync git submodule update --init || (rm -fr .git/config .git/modules && git submodule deinit -f . && git submodule update --init) - CI=true make itests-circleci + make itests-circleci - &step_build_artifact run: name: Run make build command: | - make build + # builds pg-bifrost in a docker container + make docker_build + + # Save the docker container for re-use + mkdir -p docker-cache + docker save -o docker-cache/built-image.tar pg-bifrost:latest + + # pull binary out of docker container + make docker_get_binary /usr/bin/md5sum -b target/pg-bifrost | cut -d' ' -f1 > target/pg-bifrost.md5 cat target/pg-bifrost.md5 /usr/bin/sha1sum -b target/pg-bifrost | cut -d' ' -f1 > target/pg-bifrost.sha1 @@ -48,23 +39,14 @@ step_library: /usr/bin/sha256sum -b target/pg-bifrost | cut -d' ' -f1 > target/pg-bifrost.sha256 cat target/pg-bifrost.sha256 -- &step_build_docker_containers - run: - name: Building integration test docker containers - command: | - make docker - - jobs: build: &build <<: *defaults steps: - checkout - - - *step_restore_vendor_cache + - setup_remote_docker: + docker_layer_caching: true - *step_build_artifact - - *step_test - - *step_save_vendor_cache - persist_to_workspace: root: /go/src/github.com/Nextdoor paths: @@ -84,7 +66,7 @@ jobs: sudo curl -L https://dl.google.com/go/go1.11.4.linux-amd64.tar.gz -o go1.11.4.linux-amd64.tar.gz sudo tar -xf go1.11.4.linux-amd64.tar.gz sudo mv go /usr/local - - *step_build_docker_containers + sudo docker load < docker-cache/built-image.tar - *step_itests - store_test_results: path: ./itests/test_output/ diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..60b039cd --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +itests +.idea +.circleci +target +README.md +docs +docker-cache \ No newline at end of file diff --git a/.gitignore b/.gitignore index 174fb11c..2d481eef 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,6 @@ _vendor-*/ # exclude build files target/ -itests/containers/pg-bifrost/app/pg-bifrost # exclude itest outputs itests/tests/*/output/* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..4c8c3f6d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# Test and build binary +FROM golang:1.11.4-stretch as intermediate + +# Build dependencies +RUN go get github.com/golang/mock/gomock +RUN go install github.com/golang/mock/mockgen +RUN go get github.com/golang/dep/cmd/dep +RUN go install github.com/golang/dep/cmd/dep + +WORKDIR /go/src/github.com/Nextdoor/pg-bifrost.git/ + +# Copy over gopkg and get deps. This will ensure that +# we don't get the deps each time but only when the files +# change. +COPY Gopkg.lock Gopkg.toml ./ +RUN dep ensure --vendor-only + +COPY . . + +# The CI flag is used to control the auto generation of +# code from interfaces (running go generate). In dev we +# want that to happen automatically but in the CI build +# we only want to use the code that was checked in. When +# CI=true generate is not run. +ARG is_ci +ENV CI=$is_ci + +# Run tests and make the binary +RUN make test && make build + +# Package binary in a scratch container +FROM scratch +COPY --from=intermediate /go/src/github.com/Nextdoor/pg-bifrost.git/target/pg-bifrost / +CMD ["/pg-bifrost"] diff --git a/Makefile b/Makefile index eb635aa9..43c3d6e5 100644 --- a/Makefile +++ b/Makefile @@ -2,15 +2,16 @@ GO_LDFLAGS ?= -w -extldflags "-static" +GIT_REVISION := $(shell git rev-parse --short HEAD) +GIT_TAG_VERSION := $(shell git tag -l --points-at HEAD) + ifeq ($(CI),true) GO_TEST_EXTRAS ?= "-coverprofile=c.out" - GIT_REVISION := $(shell git rev-parse --short HEAD) - VERSION := $(shell git tag -l --points-at HEAD) - GO_LDFLAGS += -X main.GitRevision=$(GIT_REVISION) -X main.Version=$(VERSION) + GO_LDFLAGS += -X main.GitRevision=$(GIT_REVISION) -X main.Version=$(GIT_TAG_VERSION) endif vendor: - dep ensure + dep ensure --vendor-only vet: @echo "Running go vet ..." @@ -29,15 +30,13 @@ test: vendor generate vet itests-circleci: @echo "Running integration tests" + TEST_NAME=test_basic docker-compose -f itests/docker-compose.yml build cd ./itests && ./circleci_split_itests.sh itests: @echo "Running integration tests" - cd ./itests && ./integration_tests.bats -r tests - -docker: - @echo "Building docker integration test images" TEST_NAME=test_basic docker-compose -f itests/docker-compose.yml build + cd ./itests && ./integration_tests.bats -r tests clean: @echo "Removing vendor deps" @@ -53,6 +52,20 @@ build: vendor generate @echo "Creating GO binary" mkdir -p target CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(GO_LDFLAGS)" -o target/pg-bifrost github.com/Nextdoor/pg-bifrost.git/main - ln -f target/pg-bifrost itests/containers/pg-bifrost/app/pg-bifrost -.PHONY: clean test itests +# Standard settings that will be used later +DOCKER := $(shell which docker) + +docker_build: + @echo "Building pg-bifrost docker image" + @$(DOCKER) build -t "pg-bifrost:latest" --build-arg is_ci="${CI}" . + +docker_get_binary: + @echo "Copying binary from docker image" + mkdir -p target + @$(DOCKER) rm "pg-bifrost-build" || true + @$(DOCKER) create --name "pg-bifrost-build" "pg-bifrost:latest" /pg-bifrost + @$(DOCKER) cp "pg-bifrost-build":/pg-bifrost target/ + @$(DOCKER) rm "pg-bifrost-build" + +.PHONY: clean test itests docker_build docker_get_binary diff --git a/README.md b/README.md index 333e4e6e..420f9e2f 100644 --- a/README.md +++ b/README.md @@ -159,11 +159,8 @@ The integration tests are setup and run with: git submodule sync git submodule update --init -# Build binary -make build - -# Create docker images -make docker +# Build binary inside a docker container +make docker_build # Run the integration tests make itests @@ -172,19 +169,13 @@ make itests Example: ``` -[Slava pg-bifrost.git] $ make build -Running go generate ... -Creating GO binary -mkdir -p target -GOOS=linux GOARCH=amd64 go build -o target/pg-bifrost github.com/Nextdoor/pg-bifrost.git/main -ln -f target/pg-bifrost itests/containers/pg-bifrost/app/pg-bifrost - -[Slava pg-bifrost.git] $ make docker -Building docker integration test images -TEST_NAME=test_basic docker-compose -f itests/docker-compose.yml build -Building bifrost -Step 1/3 : FROM alpine:3.8 -... +[Slava pg-bifrost.git] $ make docker_build +Building pg-bifrost docker image +Sending build context to Docker daemon 16.78MB +Step 1/15 : FROM golang:1.11.4-stretch as intermediate + ---> dd46c1256829 + ... + [Slava pg-bifrost.git] $ make itests Running integration tests diff --git a/itests/containers/pg-bifrost/Dockerfile b/itests/containers/pg-bifrost/Dockerfile deleted file mode 100644 index af90bb56..00000000 --- a/itests/containers/pg-bifrost/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM alpine:3.8 -ADD app /app -RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 -VOLUME /perf \ No newline at end of file diff --git a/itests/containers/pg-bifrost/app/.dummyfile b/itests/containers/pg-bifrost/app/.dummyfile deleted file mode 100644 index e69de29b..00000000 diff --git a/itests/docker-compose.yml b/itests/docker-compose.yml index cb1d2007..ea689613 100644 --- a/itests/docker-compose.yml +++ b/itests/docker-compose.yml @@ -26,7 +26,7 @@ services: bifrost: container_name: bifrost - build: containers/pg-bifrost + image: pg-bifrost:latest env_file: - "./containers/defaults.env" - "./tests/${TEST_NAME}/envfile.env" @@ -37,7 +37,7 @@ services: aliases: - kinesis - postgres - command: /app/pg-bifrost replicate kinesis + command: /pg-bifrost replicate kinesis kinesis-poller: container_name: kinesis-poller diff --git a/replication/client/client_test.go b/replication/client/client_test.go index f4baffd6..1fc359d4 100644 --- a/replication/client/client_test.go +++ b/replication/client/client_test.go @@ -607,6 +607,7 @@ func TestStartWithSendStandbyStatus(t *testing.T) { // Setup progChan := make(chan uint64, 1000) statsChan := make(chan stats.Stat, 1000) + expectChan := make(chan interface{}, 100) sh := shutdown.NewShutdownHandler() replicator := New(sh, statsChan, mockManager, 10) @@ -621,7 +622,8 @@ func TestStartWithSendStandbyStatus(t *testing.T) { mockConn.EXPECT().WaitForReplicationMessage(gomock.Any()).Return(nil, nil).Do( func(_ interface{}) { time.Sleep(time.Millisecond * 5) - }).MinTimes(10) + expectChan <- 1 + }).MinTimes(1) progChan <- uint64(standbyWalStart) @@ -630,6 +632,9 @@ func TestStartWithSendStandbyStatus(t *testing.T) { // Wait a little bit for replicator to process progress select { case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass } } @@ -678,11 +683,13 @@ func TestClosedProgressChan(t *testing.T) { } } - timeout = time.NewTimer(50 * time.Millisecond) - - _, ok := <-replicator.shutdownHandler.TerminateCtx.Done() - if ok { - assert.Fail(t, "context not cancelled") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.shutdownHandler.TerminateCtx.Done(): + if ok { + assert.Fail(t, "context not cancelled") + } } } @@ -872,10 +879,14 @@ func TestDeadlineExceeded(t *testing.T) { // test time.Sleep(10 * time.Millisecond) replicator.shutdownHandler.CancelFunc() - time.Sleep(15 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } @@ -900,7 +911,14 @@ func TestTerminationContext(t *testing.T) { replicator.shutdownHandler.CancelFunc() // wait for shutdown - time.Sleep(5 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } + } } func TestGetConnectionError(t *testing.T) { @@ -922,10 +940,13 @@ func TestGetConnectionError(t *testing.T) { go replicator.Start(progChan) - time.Sleep(5 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } @@ -939,10 +960,13 @@ func TestGetConnectionErrorInLoop(t *testing.T) { go replicator.Start(progChan) - time.Sleep(5 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } @@ -961,10 +985,13 @@ func TestPanic(t *testing.T) { go replicator.Start(progChan) - time.Sleep(5 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } @@ -995,10 +1022,13 @@ func TestProgressChanClosed(t *testing.T) { go replicator.Start(progChan) - time.Sleep(5 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } @@ -1021,16 +1051,20 @@ func TestProgressChanClosedDeadline(t *testing.T) { go replicator.Start(progChan) // Wait for replicator to run - time.Sleep(5 * time.Millisecond) - _, ok := <-replicator.outputChan - if ok { - assert.Fail(t, "output channel not properly closed") + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case _, ok := <-replicator.outputChan: + if ok { + assert.Fail(t, "output channel not properly closed") + } } } func TestHeartbeatRequested(t *testing.T) { mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t) defer mockCtrl.Finish() + expectChan := make(chan interface{}, 100) mockManager.EXPECT().GetConn().Return(mockConn, nil).MinTimes(2) @@ -1041,11 +1075,20 @@ func TestHeartbeatRequested(t *testing.T) { // expect to reply status0, _ := pgx.NewStandbyStatus(progress0) status0.ReplyRequested = 1 - mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).MinTimes(1) + mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).MinTimes(1).Do( + func(_ interface{}) { + expectChan <- 1 + }) go replicator.Start(progChan) - time.Sleep(20 * time.Millisecond) + // Wait for test to run + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass + } } func TestHeartbeatRequestedError(t *testing.T) { @@ -1071,7 +1114,6 @@ func TestHeartbeatRequestedError(t *testing.T) { if ok { assert.Fail(t, "output channel not properly closed") } - } } @@ -1080,6 +1122,7 @@ func TestSendKeepaliveChanFull(t *testing.T) { mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t) defer mockCtrl.Finish() mockManager.EXPECT().GetConn().Return(mockConn, nil).Times(4) + expectChan := make(chan interface{}, 100) // Setup return // table public.customers: INSERT: id[integer]:1 first_name[text]:'Hello' last_name[text]:'World' @@ -1103,13 +1146,21 @@ func TestSendKeepaliveChanFull(t *testing.T) { progress0 := uint64(10) status0, _ := pgx.NewStandbyStatus(progress0) status0.ReplyRequested = 1 - mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).MinTimes(1) + mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).Times(1).Do( + func(_ interface{}) { + expectChan <- 1 + }) // Do test go replicator.Start(progChan) - // Wait for shutdown - time.Sleep(time.Millisecond * 15) + // Wait for expect + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass + } } func TestSendKeepaliveChanFullError(t *testing.T) { @@ -1117,6 +1168,8 @@ func TestSendKeepaliveChanFullError(t *testing.T) { mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t) defer mockCtrl.Finish() + expectChan := make(chan interface{}, 1) + mockManager.EXPECT().GetConn().Return(mockConn, nil).Times(3) // Setup return @@ -1142,19 +1195,28 @@ func TestSendKeepaliveChanFullError(t *testing.T) { replicator.shutdownHandler.CancelFunc() time.Sleep(time.Millisecond * 1) }).Times(1) - mockManager.EXPECT().Close().Times(1) + mockManager.EXPECT().Close().Times(1).Do(func() { + expectChan <- 1 + }) // Do test go replicator.Start(progChan) // Wait for test to run - time.Sleep(time.Millisecond * 15) + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass + } } func TestSendStandbyStatusError(t *testing.T) { mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t) defer mockCtrl.Finish() + expectChan := make(chan interface{}, 100) + mockManager.EXPECT().GetConn().Return(mockConn, nil).MinTimes(2) // server asks for heartbeat @@ -1167,11 +1229,18 @@ func TestSendStandbyStatusError(t *testing.T) { err := errors.New("expected error") mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).Return(err).Times(1) - mockManager.EXPECT().Close().Times(1) + mockManager.EXPECT().Close().Times(1).Do(func() { + expectChan <- 1 + }) go replicator.Start(progChan) - time.Sleep(20 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass + } } func TestOldOverallProgress(t *testing.T) { @@ -1179,6 +1248,8 @@ func TestOldOverallProgress(t *testing.T) { mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t) defer mockCtrl.Finish() + expectChan := make(chan interface{}, 1) + mockManager.EXPECT().GetConn().Return(mockConn, nil).MinTimes(1) mockConn.EXPECT().WaitForReplicationMessage(gomock.Any()).Return(nil, nil).Do( @@ -1191,11 +1262,19 @@ func TestOldOverallProgress(t *testing.T) { status0, _ := pgx.NewStandbyStatus(progress0) status0.ReplyRequested = 1 - mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).Times(1) + mockConn.EXPECT().SendStandbyStatus(EqStatusWithoutTime(status0)).Times(1).Do( + func(_ interface{}) { + expectChan <- 1 + }) progChan <- progress0 progChan <- progress1 go replicator.Start(progChan) - time.Sleep(20 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "did not pass test in time") + case <-expectChan: + // pass + } } diff --git a/transport/transporters/rabbitmq/transporter/transporter_test.go b/transport/transporters/rabbitmq/transporter/transporter_test.go index 6c2f4456..af31f400 100644 --- a/transport/transporters/rabbitmq/transporter/transporter_test.go +++ b/transport/transporters/rabbitmq/transporter/transporter_test.go @@ -18,6 +18,7 @@ package transporter import ( "context" + "github.com/stretchr/testify/assert" "sync" "testing" "time" @@ -154,6 +155,7 @@ func TestConnectionDies(t *testing.T) { in := make(chan transport.Batch, 1000) txns := make(chan *ordered_map.OrderedMap, 1000) statsChan := make(chan stats.Stat, 1000) + expectChan := make(chan interface{}, 10) connString := "amqp://anyhost:anyport/%2fanyVHost" fakeServer := server.NewServer(connString) @@ -235,7 +237,10 @@ func TestConnectionDies(t *testing.T) { mockTime.EXPECT().UnixNano().Return(int64(1000 * time.Millisecond)) mockTime.EXPECT().UnixNano().Return(int64(2000 * time.Millisecond)) mockTime.EXPECT().UnixNano().Return(int64(3000 * time.Millisecond)) - mockTime.EXPECT().UnixNano().Return(int64(4000 * time.Millisecond)) + mockTime.EXPECT().UnixNano().Return(int64(4000 * time.Millisecond)).Do( + func() { + expectChan <- 1 + }) fakeServer.Stop() @@ -249,8 +254,12 @@ func TestConnectionDies(t *testing.T) { } defer fakeServer.Stop() - time.Sleep(time.Millisecond * 25) - + select { + case <-time.After(400 * time.Millisecond): + assert.Fail(t, "did not pass data through in time") + case <-expectChan: + // pass + } // Verify stats expected := []stats.Stat{ stats.NewStatCount("rabbitmq_transport", "success", int64(1), int64(1000*time.Millisecond)),