Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to make queryPath optional #162

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions .github/workflows/dlp-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
PROJECT_ID: "dlp-dataflow-deid-ci-392604"
DATASET_ID: "demo_dataset"
PARQUET_DATASET_ID: "parquet_results"
REID_WITHOUT_QUERY_DATASET_ID: "reid_without_query_results"
REGION: "us-central1"
GCS_BUCKET: "dlp-dataflow-deid-ci-392604-demo-data"
GCS_NOTIFICATION_TOPIC: "projects/dlp-dataflow-deid-ci-392604/topics/dlp-dataflow-deid-ci-gcs-notification-topic"
Expand Down Expand Up @@ -73,6 +74,8 @@ jobs:
output7: ${{ steps.gen-uuid.outputs.parquet_deid_job_name }}
output8: ${{ steps.gen-uuid.outputs.parquet_dataset_id }}
output9: ${{ steps.gen-uuid.outputs.output_gcs_bucket }}
output10: ${{ steps.gen-uuid.outputs.reid_without_query_dataset_id }}


steps:
- name: Generate UUID for workflow
Expand All @@ -89,6 +92,7 @@ jobs:
echo "parquet_deid_job_name=parquet-deid-$new_uuid" >> "$GITHUB_OUTPUT"
echo "parquet_dataset_id=${{ env.PARQUET_DATASET_ID }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "output_gcs_bucket=${{ env.OUTPUT_GCS_BUCKET }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "reid_without_query_dataset_id=${{ env.REID_WITHOUT_QUERY_DATASET_ID}}_$modified_uuid" >> "$GITHUB_OUTPUT"

create-dataset:
needs:
Expand All @@ -104,9 +108,12 @@ jobs:
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
REID_WITHOUT_QUERY_DATASET_ID: ${{ needs.generate-uuid.outputs.output10 }}
run: |
bq --location=US mk -d --description "GitHub CI workflow dataset" ${{ env.DATASET_ID }}
bq --location=US mk -d --description "GitHub CI workflow dataset to store parquet results" ${{ env.PARQUET_DATASET_ID }}
bq --location=US mk -d --description "GitHub CI workflow dataset to store reid results" ${{ env.REID_WITHOUT_QUERY_DATASET_ID }}


dup05 marked this conversation as resolved.
Show resolved Hide resolved
create-output-bucket:
needs:
Expand Down Expand Up @@ -741,6 +748,89 @@ jobs:
echo "# records in input query are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_FILE_NAME}}_re_id: $row_count."

re-identification-without-query:
needs:
- generate-uuid
- create-dataset
- de-identification

runs-on:
- self-hosted

timeout-minutes: 30

steps:
- uses: actions/checkout@v2

- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 11

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Run DLP Pipeline
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
REID_WITHOUT_QUERY_DATASET_ID: ${{ needs.generate-uuid.outputs.output10 }}
run: |
gradle run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 \
--maxNumWorkers=2 \
--runner=DataflowRunner \
--tableRef=${{env.PROJECT_ID}}:${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}} \
--dataset=${{env.REID_WITHOUT_QUERY_DATASET_ID}} \
--autoscalingAlgorithm=THROUGHPUT_BASED \
--workerMachineType=n1-highmem-4 \
--deidentifyTemplateName=${{env.REID_TEMPLATE_PATH}} \
--DLPMethod=REID \
--keyRange=1024 \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}}"

- name: Verify BQ table
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
REID_WITHOUT_QUERY_DATASET_ID: ${{ needs.generate-uuid.outputs.output10 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.REID_WITHOUT_QUERY_DATASET_ID}}`.__TABLES__ WHERE table_id="${{env.INPUT_FILE_NAME}}_re_id"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
done
echo "Verified number of tables in BQ with id ${{env.INPUT_FILE_NAME}}_re_id: $table_count ."

- name: Verify distinct rows
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
REID_WITHOUT_QUERY_DATASET_ID: ${{ needs.generate-uuid.outputs.output10 }}
run: |
rc_orig_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(ID) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}}`')
rc_orig=$(echo "$rc_orig_json" | jq -r '.[].f0_')
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(ID) FROM `${{env.PROJECT_ID}}.${{env.REID_WITHOUT_QUERY_DATASET_ID}}.${{env.INPUT_FILE_NAME}}_re_id`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
done
echo "# records in input query are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_FILE_NAME}}_re_id: $row_count."

clean-up:
if: "!cancelled()"
needs:
Expand All @@ -752,6 +842,7 @@ jobs:
- de-identification-streaming-write
- de-identification-with-gcs-output
- re-identification
- re-identification-without-query

runs-on:
- self-hosted
Expand All @@ -764,9 +855,11 @@ jobs:
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
REID_WITHOUT_QUERY_DATASET_ID: ${{ needs.generate-uuid.outputs.output10 }}
run: |
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.DATASET_ID}}
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.PARQUET_DATASET_ID}}
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.REID_WITHOUT_QUERY_DATASET_ID}}

- name: Clean up pub_sub file
run: |
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ to validate de-identified results:

#### Re-identification from BigQuery


1. Export a SQL query to read and re-identify data from BigQuery. The sample provided below selects 10 records that match the query.

```
Expand Down Expand Up @@ -447,6 +448,7 @@ to validate de-identified results:
stored in `reid_query.sql`. The re-identified results can be found in the BigQuery
dataset (`dataset` parameter) with the name of the input table as the suffix.

The parameter `queryPath` is optional. If not passed, the pipeline will perform re-identification on the entire BigQuery table.

### Pipeline Parameters

Expand All @@ -471,7 +473,7 @@ to validate de-identified results:
| `recordDelimiter` | (Optional) Record delimiter. | INSPECT/DEID |
| `columnDelimiter` | Column delimiter. Only required in case of a custom delimiter. | INSPECT/DEID |
| `tableRef` | BigQuery table to export from in the form `<project>:<dataset>.<table>`. | REID |
| `queryPath` | Query file for re-identification. | REID |
| `queryPath` | (Optional) Query file for re-identification. | REID |
| `headers` | DLP table headers. Required for the JSONL file type. | INSPECT/DEID |
| `numShardsPerDLPRequestBatching` | (Optional) Number of shards for DLP request batches. Can be used to control the parallelism of DLP requests. The default value is 100. | All |
| `numberOfWorkerHarnessThreads` | (Optional) The number of threads per each worker harness process. | All |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public interface DLPTextToBigQueryStreamingV2PipelineOptions
void setTableRef(String tableRef);

@Description("read method default, direct, export")
@Default.Enum("EXPORT")
@Default.Enum("DEFAULT")
Method getReadMethod();

void setReadMethod(Method method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.swarm.tokenization.common;

import autovalue.shaded.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand All @@ -39,6 +40,7 @@ public abstract class BigQueryReadTransform

public abstract Integer keyRange();

@Nullable
public abstract String query();

@AutoValue.Builder
Expand All @@ -64,23 +66,28 @@ public PCollection<KV<String, TableRow>> expand(PBegin input) {

switch (readMethod()) {
case DEFAULT:
return input
.apply(
"ReadFromBigQuery",
BigQueryIO.readTableRows()
.fromQuery(query())
.usingStandardSql()
.withMethod(Method.DEFAULT))
.apply("AddTableNameAsKey", WithKeys.of(tableRef()));
if(query()!=null) {
return input
.apply(
"ReadFromBigQuery",
BigQueryIO.readTableRows()
.fromQuery(query())
.usingStandardSql()
.withMethod(Method.DEFAULT))
.apply("AddTableNameAsKey", WithKeys.of(tableRef()));
}
else {
return input
.apply(
"ReadFromBigQuery",
BigQueryIO.readTableRows()
.from(tableRef())
.withMethod(Method.DEFAULT))
.apply("AddTableNameAsKey", WithKeys.of(tableRef()));
}

case EXPORT:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just club EXPORT and DIRECT_READ together and throw the exception "Only DEFAULT Read Method supported"?

case EXPORT: case DIRECT_READ: throw new IllegalArgumentException("Only DEFAULT READ method supported");

return input
.apply(
"ReadFromBigQuery",
BigQueryIO.readTableRows()
.fromQuery(query())
.usingStandardSql()
.withMethod(Method.DEFAULT))
.apply("AddTableNameAsKey", WithKeys.of(tableRef()));
throw new IllegalArgumentException("Export method not supported");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May not be the intended behaviour but since the EXPORT case used to work with DEFAULT read earlier, are we declaring that somewhere before removing the support altogether?


case DIRECT_READ:
throw new IllegalArgumentException("Direct read not supported");
Goutam1511 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/google/swarm/tokenization/common/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ private static void validateStorageWriteApiAtLeastOnce(BigQueryOptions options)
}

public static String getQueryFromGcs(String gcsPath) {
if(gcsPath == null){
LOG.debug("Query path not provided, entire table will be read");
return null;
}
GcsPath path = GcsPath.fromUri(URI.create(gcsPath));
Storage storage = StorageOptions.getDefaultInstance().getService();
BlobId blobId = BlobId.of(path.getBucket(), path.getObject());
Expand Down