Skip to content

Commit

Permalink
workflows for streaming jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dup05 committed Mar 4, 2024
1 parent f75f070 commit 57c96df
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 108 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/cleanup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ inputs:
dataset:
description: "dataset"
required: true
input_gcs_bucket:
description: "Bucket with run time created files"
required: true
job_type:
description: "Batch/Streaming"
required: true

runs:
using: "composite"
steps:
- name: Cleanup
- name: Cleanup BQ Tables
shell: bash
run: bq rm -r -f -d ${{inputs.project_id}}:${{inputs.dataset}}
run: bq rm -r -f -d ${{inputs.project_id}}:${{inputs.dataset}}

# - name: Cleanup GCS files
# if: always() && inputs.job_type == 'streaming'
# shell: bash
# run: gcloud storage rm gs://${{inputs.input_gcs_bucket}}/*
90 changes: 9 additions & 81 deletions .github/workflows/configs/load_tests_details.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,90 +3,18 @@
"name": "Batch1_csv",
"type": "batch",
"file_type": "CSV",
"file_size": "100MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largecsv100MB.csv",
"file_size": "10KB",
"gcs_file_path": "gs://input_dlp_load_test_2/tiny_csv.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch2_csv",
"type": "batch",
"file_type": "CSV",
"file_size": "500MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largecsv500MB.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch3_csv",
"type": "batch",
"name": "Streamimng1_csv",
"type": "streaming",
"file_type": "CSV",
"file_size": "1GB",
"gcs_file_path": "gs://input_dlp_load_test_2/largecsv1GB.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch4_csv",
"type": "batch",
"file_type": "CSV",
"file_size": "2GB",
"gcs_file_path": "gs://input_dlp_load_test_2/largecsv2GB.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch5_csv",
"type": "batch",
"file_type": "CSV",
"file_size": "4GB",
"gcs_file_path": "gs://input_dlp_load_test_2/largecsv4GB.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch1_avro",
"type": "batch",
"file_type": "AVRO",
"file_size": "100MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largeavro100MB.avro",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch2_avro",
"type": "batch",
"file_type": "AVRO",
"file_size": "500MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largeavro500MB.avro",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch3_avro",
"type": "batch",
"file_type": "AVRO",
"file_size": "750MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largeavro750MB.avro",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch4_avro",
"type": "batch",
"file_type": "AVRO",
"file_size": "1500MB",
"gcs_file_path": "gs://input_dlp_load_test_2/largeavro1500MB.avro",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

},
{
"name": "Batch5_avro",
"type": "batch",
"file_type": "AVRO",
"file_size": "3GB",
"gcs_file_path": "gs://input_dlp_load_test_2/largeavro3GB.avro",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"

"file_size": "10KB/min",
"gcs_file_path": "gs://input_load_test_streaming_job/*.csv",
"deid_template": "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019",
"source_file_bucket": "input_dlp_load_test_2",
"source_file_pattern": "gs://input_dlp_load_test_2/tiny_csv.csv"
}
]
46 changes: 46 additions & 0 deletions .github/workflows/execute-copy-workflow/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: "Execute copy workflow"
description: "Copies files from raw bucket to specified input bucket"

inputs:
raw_bucket:
description: "GCS Raw bucket name"
required: true
source_file:
description: "File name pattern"
required: true
input_gcs_bucket:
description: "GCS bucket name"
required: true
job_id:
description: "Job ID"
required: true
workflow_name:
description: "Workflow name"
required: true
region:
description: "Region"
required: true


runs:
using: "composite"
steps:
- name: Execute the workflow
shell: bash
run: |
not_finished=true
num_executions=1
while [ $num_executions -le 10 ];
do
echo "Executing workflow: "
gcloud workflows run ${{inputs.workflow_name}} \
--call-log-level=log-errors-only \
--data='{"input_bucket": "${{inputs.input_gcs_bucket}}","raw_bucket": "${{inputs.raw_bucket}}","source_file": "${{inputs.source_file}}"}'
num_executions=$((num_executions+1))
sleep 60s
done
- name: Drain the pipeline
shell: bash
run: |
gcloud dataflow jobs drain ${{inputs.job_id}} --region ${{inputs.region}}
21 changes: 21 additions & 0 deletions .github/workflows/load-testing-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
REGION: "us-central1"
INSPECT_TEMPLATE: "projects/dlp-dataflow-load-test/inspectTemplates/dlp-demo-inspect-latest-1706594483019"
DEID_TEMPLATE: "projects/dlp-dataflow-load-test/deidentifyTemplates/dlp-demo-deid-latest-1706594483019"
PUB_SUB_TOPIC: "projects/dlp-dataflow-load-test/topics/load_test_pub_sub_topic"


jobs:
Expand Down Expand Up @@ -54,6 +55,12 @@ jobs:
run: |
matrix=$(jq -c . < .github/workflows/configs/load_tests_details.json)
echo "matrix={\"include\":$(echo $matrix)}" >> $GITHUB_OUTPUT
- name: Test if
id: test-step
if: always() && env.REGION == 'us-west2'
run : |
echo "If condition true"
run-load-test:
Expand Down Expand Up @@ -92,6 +99,18 @@ jobs:
job_name: ${{steps.set-job-params.outputs.job_name}}
job_type: ${{ matrix.type }}

- name: execute copy files workflow for streaming jobs
id: copy-files
if: always() && matrix.type == 'streaming'
uses: ./.github/workflows/execute-copy-workflow
with:
raw_bucket: ${{ matrix.source_file_bucket }}
source_file: ${{ matrix.source_file_pattern }}
input_gcs_bucket: "input_load_test_streaming_job"
job_id: ${{steps.submit-dataflow-job.outputs.job_id}}
workflow_name: "generate_files_workflow"
region: ${{env.REGION}}

- name: Poll till job finishes
uses: ./.github/workflows/poll-job
with:
Expand All @@ -114,6 +133,8 @@ jobs:
project_id: ${{env.PROJECT_ID}}
job_id: ${{steps.submit-dataflow-job.outputs.job_id}}
dataset: ${{steps.set-job-params.outputs.dataset}}
input_gcs_bucket: "input_load_test_streaming_job"
job_type: ${{ matrix.type }}

publish-test-results:
needs:
Expand Down
18 changes: 0 additions & 18 deletions .github/workflows/load-tests/action.yml

This file was deleted.

4 changes: 0 additions & 4 deletions .github/workflows/scripts/fetchJobMetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import uuid
import json
from google.cloud import monitoring_v3
from google.cloud import monitoring_dashboard_v1
import time

VALID_METRICS = ["TotalVcpuTime", "TotalMemoryUsage", "numberOfRowDeidentified", "numberOfRowsRead"]
Expand Down Expand Up @@ -172,8 +171,5 @@ def prepare_metrics_data(self, metrics):

test_job_object.write_data_to_bigquery(test_job_object.prepare_metrics_data(job_metrics))




except Exception as e:
print(e)
31 changes: 28 additions & 3 deletions .github/workflows/submit-dataflow-job/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ runs:
shell: bash
run: bq --location=US mk -d --description "GitHub load test dataset" ${{inputs.dataset}}

- name: Run DLP Pipeline
- name: Run Batch DLP Pipeline
if: always() && inputs.job_type == 'batch'
shell: bash
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
Expand All @@ -68,12 +69,36 @@ runs:
--serviceAccount=demo-service-account@dlp-dataflow-load-test.iam.gserviceaccount.com \
--jobName=${{ inputs.job_name }} "
- name: Get Job ID
- name: Run DLP Pipeline
if: always() && inputs.job_type == 'streaming'
shell: bash
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=us-central1 \
--streaming --enableStreamingEngine \
--project=${{inputs.project_id}} \
--tempLocation=gs://${{inputs.input_gcs_bucket}}/temp \
--numWorkers=2 \
--maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=${{inputs.gcs_file_path}} \
--dataset=${{inputs.dataset}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{inputs.inspect_template}} \
--deidentifyTemplateName=${{inputs.deid_template}} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=demo-service-account@dlp-dataflow-load-test.iam.gserviceaccount.com \
--jobName=${{ inputs.job_name }} \
--gcsNotificationTopic=projects/dlp-dataflow-load-test/topics/load_test_pub_sub_topic"
- name: Get Job ID #TODO: Poll till status not active
id: get-job-id
shell: bash
run: |
sleep 30s
deid_job_data=$(gcloud dataflow jobs list --project ${{inputs.project_id}} --status active --format json --filter="name=${{inputs.job_name}}")
deid_job_data=$(gcloud dataflow jobs list --project ${{inputs.project_id}} --region us-central1 --format json --filter="name=${{inputs.job_name}}")
deid_job_id=$(echo "$deid_job_data" | jq -r '.[].id')
echo "job id is $deid_job_id"
echo "job_id=$deid_job_id" >> $GITHUB_OUTPUT
Expand Down

0 comments on commit 57c96df

Please sign in to comment.