Skip to content

Commit

Permalink
AWS S3 Transport with Full Test Suite (#30)
Browse files Browse the repository at this point in the history
* S3 Transport

* S3 Integration Tests

itests - add both kinesis & s3

add sleep after docker-compose down

dynamic context discovery with formatting

* transporter constructor - remove batchSize as not needed

* Unit Tests

first transporter_test test

error handling - better messaging

more tests

* itests - remove added sleeps

* revert back Dockerfile debug modification

* comments

* PR comments 0

* poller-s3 - use an incremental wait instead of expect counts

* revert changes to mock_kinesis.go

* docs - s3 transport doc for key construction

* s3 transporter - use all underscores for filename

* Remove (revert addition of) start sleep

* itests - poller-s3 - update moving_deadline

* itests - ability to run single test

* s3 - use retryPolicy and not client retries

* s3 - retries unit tests with restart io.Reader

* dockerfile - make test only in CI

* s3 - fatal on seek reset error

* itests - exit on error, dependency ordering, test placement

itests - bugfix - only move deadline on new keys

itests - docker-compose rm after success

itests - runner - use exit on error

itests - have containers clean up their generated & mounted files

circleci - parallelism - increase to 2

itests - docker-compose - dependency ordering with yaml lint

itests - s3-poller - config for single file

itests - pollers - configurations for specific tests

itests - test placement - fix base & kinesis tests

* itests - s3-poller - retry on bucket creation

* PR comments 2018-05-08

* itests - docker-compose - data-poller ordering

* PR comments 2018-05-08 part2

* PR comments 2018-05-08 part3
  • Loading branch information
nehalrp authored May 8, 2019
1 parent ebdf215 commit e23c141
Show file tree
Hide file tree
Showing 43 changed files with 5,159 additions and 290 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:

itests: &itests
<<: *defaults_itests
parallelism: 1
parallelism: 2
steps:
- attach_workspace:
at: /tmp/workspace
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ COPY . .
ARG is_ci
ENV CI=$is_ci

# Run tests and make the binary
RUN make test && make build
# Run tests (if in CI build) then make the binary
RUN test -z "$CI" || make test
RUN make build

# Package binary & certs in a scratch container
FROM scratch
Expand Down
19 changes: 16 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ build: vendor generate
mkdir -p target
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(GO_LDFLAGS)" -o target/pg-bifrost github.com/Nextdoor/pg-bifrost.git/main

build_mac: vendor generate
@echo "Creating GO binary"
mkdir -p target
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o target/pg-bifrost github.com/Nextdoor/pg-bifrost.git/main

# Standard settings that will be used later
DOCKER := $(shell which docker)

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ To dump a memory profile, you send pg-bifrost a `SIGUSR2` signal and the `pprof`

In rare scenarios to investigate WAL standby acknowledgement issues to postgres (e.g., thrashing on duplicate data, a missing transaction, etc.) the ledger (the progress tracker's data structure to keep track of received and acknowledged transactions and their WAL messages) can be dumped to STDOUT by sending a `SIGIO` signal to pg-bifrost.

### Transport Specific

**S3**: [docs/transport/s3.md](docs/transport/s3.md)

## Development

### Bugs
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This docker-compose can be used to bring up a basic postgres container for manual testing
version: '3'
services:
postgres:
container_name: postgres
build: itests/containers/postgres
environment:
- POSTGRES_PASSWORD=pgbifrost
ports:
- 5432:5432
28 changes: 28 additions & 0 deletions docs/transport/s3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# transport: s3

The S3 transporter puts logical WAL messages as *gzipped* objects in an S3 bucket partitioned by year, month and day in increasing order of time. This time is at the time of the PUT request creation, and *not* when the data was written to Postgres nor read from the pg-bifrost client.

## Key Construction

The basic construction of an S3 key for this transporter is defined as:
```
<bucket name>/<key space>/<year as YYYY>/<month as MM>/<day as DD>/<datetime>_<first wal record id>.gz
```

### Example

For a given configuration:
```
BIFROST_S3_BUCKET=test_bucket
BIFROST_S3_KEY_SPACE=test_files
```

A batch written at `2000:01:02 12:03:04` with the WAL record ID of the first record of `1336674354448` will be put at:
```
test_bucket/test_files/2000/01/02/2000_01_32_12_03_04_1336674354448.gz
```

You may also omit `BIFROST_S3_KEY_SPACE` which will result in an object being put at:
```$xslt
test_bucket/2000/01/02/2000_01_32_12_03_04_1336674354448.gz
```
16 changes: 12 additions & 4 deletions itests/common.bash
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ _startup() {
log "Running docker-compose build"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose build

log "Running docker-compose up"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose up -d
log "Starting docker-compose data-poller dependencies"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose up -d start-data-poller-dependencies
sleep 2

log "Starting docker-compose bifrost dependencies"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose up -d start-bifrost-dependencies
sleep 2

log "Starting docker-compose bifrost"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose up -d bifrost

log "Checking that containers are running..."
sleep 5
sleep 2

_check_container postgres
_check_container localstack
Expand Down Expand Up @@ -257,8 +265,8 @@ teardown() {

# Print current state of the ledger for debugging
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose kill -s IO bifrost # dump ledger to stdout
sleep 5
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs bifrost
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs data-poller

log "Running docker-compose down"
TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose down
Expand Down
6 changes: 3 additions & 3 deletions itests/containers/data-poller/app/poller-kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
OUT_FILE = os.getenv('OUT_FILE', '/output/test')
STREAM_NAME = os.getenv('STREAM_NAME', 'itests')
ENDPOINT_URL = os.getenv('ENDPOINT_URL', 'http://localstack:4568')
WAIT_TIME = int(os.getenv('WAIT_TIME', '90'))
EXPECTED_COUNT = int(os.getenv('EXPECTED_COUNT', '1'))
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
SHARD_COUNT = int(os.getenv('SHARD_COUNT', '1'))
EXPECTED_COUNT = int(os.getenv('EXPECTED_COUNT', '1'))
WAIT_TIME = int(os.getenv('KINESIS_POLLER_WAIT_TIME', '90'))
SHARD_COUNT = int(os.getenv('KINESIS_POLLER_SHARD_COUNT', '1'))

client = boto3.client('kinesis',
endpoint_url=ENDPOINT_URL,
Expand Down
157 changes: 157 additions & 0 deletions itests/containers/data-poller/app/poller-s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
This script obtains records from Kinesis and writes them to a local file as
as defined by OUT_FILE. It will exit when no additional files have been read for WAIT_TIME.
"""

import os
import sys
import time

import boto3
from botocore import exceptions
from gzip import GzipFile
from io import BytesIO
from retry import retry

# Variables
OUT_FILE = os.getenv('OUT_FILE', '/output/test')
BUCKET_NAME = os.getenv('BUCKET_NAME', 'itests')
CREATE_BUCKET = bool(os.getenv('CREATE_BUCKET', '1'))
ENDPOINT_URL = os.getenv('ENDPOINT_URL', 'http://localstack:4572')
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
EXPECTED_COUNT = int(os.getenv('EXPECTED_COUNT', '1')) # expect number of records (only used for logging)
INITIAL_WAIT_TIME = int(os.getenv('S3_POLLER_INITIAL_WAIT_TIME', '90')) # time to wait for initial list of keys
WAIT_TIME = int(os.getenv('S3_POLLER_WAIT_TIME', '10')) # incremental time to wait for new keys if none have been seen
MAP_KEYS_TO_OUTPUT_FILES = bool(os.getenv('S3_POLLER_MAP_KEYS_TO_OUTPUT_FILES', '')) # whether to create a single output file

client = boto3.client('s3',
endpoint_url=ENDPOINT_URL,
region_name=AWS_REGION)

# Create a bucket
@retry(exceptions.EndpointConnectionError, tries=10, delay=.5)
def _create_bucket(name):
print("Trying to create bucket {}".format(name))
return client.create_bucket(
Bucket=name)


@retry(ValueError, tries=10, delay=.5)
def _get_all_s3_keys(bucket):
"""Get a list of all keys in an S3 bucket."""
keys = []

resp = client.list_objects(Bucket=bucket)

file_list = resp['Contents']

for s3_key in file_list:
keys.append(s3_key['Key'])

return keys


if CREATE_BUCKET:
# Create the bucket
print("Creating a bucket")
try:
_create_bucket(BUCKET_NAME)
except exceptions.EndpointConnectionError:
print("Unable to contact endpoint at {}".format(ENDPOINT_URL))
exit(1)
except exceptions.ClientError as e:
if e.response['Error']['Code'] != 'ResourceInUseException':
raise e


# get initial set of keys with a deadline of INITIAL_WAIT_TIME
all_keys = []
timeout_for_first_keys = time.time() + INITIAL_WAIT_TIME

while True:
if time.time() > timeout_for_first_keys:
print("No data received to poller. Exiting.")
exit(1)

print("Getting initial keys list...")
sys.stdout.flush()
try:
all_keys = _get_all_s3_keys(BUCKET_NAME)
break
except KeyError:
time.sleep(1)
pass

all_keys.sort()

key_i = 0
total = 0

print("Records expected: {}".format(EXPECTED_COUNT))

# Start the moving deadline and iterate over new keys
moving_deadline = time.time() + WAIT_TIME


while time.time() <= moving_deadline:
if key_i >= len(all_keys):
# our pointer is past the length of the keys we have seen, so we wait for more...
print("Waiting for more keys...")
sys.stdout.flush()
time.sleep(1)

remote_keys = _get_all_s3_keys(BUCKET_NAME)
if len(remote_keys) > len(all_keys):
# if there are new keys, update our all_keys list and process
all_keys = list(set(all_keys + remote_keys))
all_keys.sort()

# update deadline as if we had new keys
moving_deadline = time.time() + WAIT_TIME
else:
# else, look back around
continue

record_count = 0

# get object data
resp = client.get_object(
Bucket=BUCKET_NAME,
Key=all_keys[key_i],
)

bytestream = BytesIO(resp['Body'].read())
got_text = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
records = got_text.split('\n')

# filter out any empty lines
records = filter(None, records)

sys.stdout.flush()

# By default we only create a single file no matter how many S3 keys we have
_file_num = 0

if MAP_KEYS_TO_OUTPUT_FILES:
_file_num = key_i

with open(OUT_FILE + "." + str(_file_num), "a") as fp:
for record in records:
fp.write(record)
fp.write('\n')

fp.flush()
record_count += len(records)

# update pointer in keys read
key_i += 1

total += record_count
print("total so far: {}".format(total))

if record_count == 0:
time.sleep(1)
sys.stdout.flush()

print("Records read {}".format(total))
sys.stdout.flush()
1 change: 0 additions & 1 deletion itests/containers/defaults.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ KINESIS_ERROR_PROBABILITY=0.0
PGHOST=postgres
PGPASSWORD=pgbifrost
CREATE_SLOT=true
BIFROST_KINESIS_STREAM=itests
AWS_ACCESS_KEY_ID=DUMMYACCESSKEYID
AWS_SECRET_ACCESS_KEY=DUMMYSECRETACCESSKEY
AWS_REGION=us-east-1
3 changes: 3 additions & 0 deletions itests/contexts/kinesis.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
TRANSPORT_SINK=kinesis

LOCALSTACK_PORT=4568
ENDPOINT=http://localstack:4568

BIFROST_KINESIS_STREAM=itests
7 changes: 7 additions & 0 deletions itests/contexts/s3.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
TRANSPORT_SINK=s3

LOCALSTACK_PORT=4572
ENDPOINT=http://localstack:4572
CREATE_BUCKET=1

BIFROST_S3_BUCKET=itests
Loading

0 comments on commit e23c141

Please sign in to comment.