diff --git a/src/main/java/com/google/swarm/tokenization/DLPTextToBigQueryStreamingV2.java b/src/main/java/com/google/swarm/tokenization/DLPTextToBigQueryStreamingV2.java index 18f42f6d..74f6611b 100644 --- a/src/main/java/com/google/swarm/tokenization/DLPTextToBigQueryStreamingV2.java +++ b/src/main/java/com/google/swarm/tokenization/DLPTextToBigQueryStreamingV2.java @@ -46,17 +46,27 @@ import com.google.swarm.tokenization.txt.ConvertTxtToDLPRow; import com.google.swarm.tokenization.txt.ParseTextLogDoFn; import com.google.swarm.tokenization.txt.TxtReaderSplitDoFn; + +import java.text.DecimalFormat; import java.util.List; import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -64,6 +74,10 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,30 +314,25 @@ private static void runInspectAndDeidPipeline( .get(Util.deidSuccessGCS) .apply(GroupByKey.create()) .apply( - "WriteORCToGCS", + "WriteParquetToGCS", ParDo.of(new ParquetWriterDoFn(options.getOutputBucket(), parquetSchemaMapping)) - .withSideInputs(parquetSchemaMapping)); - - // .apply( - // FileIO.>writeDynamic() - // .by((SerializableFunction, - // String>) KV::getKey) - // .via( - // Contextful.fn( - // (SerializableFunction, GenericRecord>) KV::getValue - // ), - // ParquetIO.sink(schema) - // - // .withCompressionCodec(CompressionCodecName.SNAPPY) - // - // - // ) - // .withTempDirectory("/tmp/temp-beam") - // .to(options.getOutputBucket()) - // .withNumShards(1) - // .withDestinationCoder(StringUtf8Coder.of()) - // ); + .withSideInputs(parquetSchemaMapping)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), GenericRecordCoder.of())) + .apply( + FileIO.>writeDynamic() + .by((SerializableFunction, String>) KV::getKey) + .via( + Contextful.fn( + (SerializableFunction, GenericRecord>) KV::getValue + ), + ParquetIO.sink(Schema.parse("{\"type\":\"record\",\"name\":\"schema\",\"fields\":[{\"name\":\"primitive_field\",\"type\":\"string\"},{\"name\":\"complex_field\",\"type\":{\"type\":\"record\",\"name\":\"complex_field\",\"fields\":[{\"name\":\"nested_field1\",\"type\":\"string\"},{\"name\":\"nested_field2\",\"type\":\"string\"}]}}]}")).withCompressionCodec(CompressionCodecName.SNAPPY) + ) + .withTempDirectory(options.getOutputBucket() + "/temp-beam") + .to(options.getOutputBucket()) + .withNaming(key -> new ParquetFileNaming(key, ".parquet")) + .withNumShards(1) + .withDestinationCoder(StringUtf8Coder.of()) + ); } else { inspectDeidRecords .get(Util.deidSuccessGCS) @@ -347,6 +356,32 @@ private static void runInspectAndDeidPipeline( } } + public static class ParquetFileNaming implements FileIO.Write.FileNaming { + private String fileName; + private String suffix; + + public ParquetFileNaming(String fileName, String suffix) { + this.fileName = fileName; + this.suffix = suffix; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized String getFilename( + @UnknownKeyFor @NonNull @Initialized BoundedWindow window, + @UnknownKeyFor @NonNull @Initialized PaneInfo pane, + @UnknownKeyFor @NonNull @Initialized int numShards, + @UnknownKeyFor @NonNull @Initialized int shardIndex, + @UnknownKeyFor @NonNull @Initialized Compression compression) { + StringBuilder res = new StringBuilder(this.fileName); + String numShardsStr = String.valueOf(numShards); + DecimalFormat df = + new DecimalFormat("000000000000".substring(0, Math.max(5, numShardsStr.length()))); + res.append("-").append(df.format(shardIndex)).append("-of-").append(df.format(numShards)); + res.append(this.suffix); + return res.toString(); + } + } + private static void runReidPipeline( Pipeline p, DLPTextToBigQueryStreamingV2PipelineOptions options) { // TODO: there is no reason for this method to key elements by table reference because diff --git a/src/main/java/com/google/swarm/tokenization/parquet/ParquetWriterDoFn.java b/src/main/java/com/google/swarm/tokenization/parquet/ParquetWriterDoFn.java index c3aaf6e4..25c938ce 100644 --- a/src/main/java/com/google/swarm/tokenization/parquet/ParquetWriterDoFn.java +++ b/src/main/java/com/google/swarm/tokenization/parquet/ParquetWriterDoFn.java @@ -115,15 +115,5 @@ public void processElement(ProcessContext context) throws IOException { }; tableRowIterable.forEach(assignGenericRowValue); - - File parquetFile = new File(filePath); - - DatumWriter writer = new GenericDatumWriter<>(schema); - try (DataFileWriter fileWriter = new DataFileWriter<>(writer)) { - fileWriter.create(schema, parquetFile); - for (GenericRecord record : genericRecordList) { - fileWriter.append(record); - } - } } }