diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index fcfe3fe3ce05..0c818f91991f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import static com.google.api.client.util.Base64.encodeBase64String; +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; import static org.apache.beam.runners.dataflow.util.Structs.addString; import static org.apache.beam.runners.dataflow.util.Structs.addStringList; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; @@ -45,6 +46,9 @@ public class CustomSources { private static final String SERIALIZED_SOURCE = "serialized_source"; @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; + @VisibleForTesting + static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION = + "serialized_offset_based_deduplication"; private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); @@ -93,6 +97,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour } checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits); + addBoolean( + cloudSource.getSpec(), + SERIALIZED_OFFSET_BASED_DEDUPLICATION, + unboundedSource.offsetDeduplication()); } else { throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f10f3b91e7aa..a60b8636b244 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -194,6 +194,13 @@ public boolean workIsFailed() { return Optional.ofNullable(work).map(Work::isFailed).orElse(false); } + public byte[] getCurrentRecordOffset() { + if (activeReader == null) { + return new byte[0]; + } + return activeReader.getCurrentRecordOffset(); + } + public void start( @Nullable Object key, Work work, @@ -439,6 +446,10 @@ public Map flushState() { throw new RuntimeException("Exception while encoding checkpoint", e); } sourceStateBuilder.setState(stream.toByteString()); + byte[] offsetLimit = checkpointMark.getOffsetLimit(); + if (offsetLimit.length > 0) { + sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit)); + } } outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index f83c68ab3c90..015199aac2d4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -215,7 +215,13 @@ public long add(WindowedValue data) throws IOException { .setMetadata(metadata); keyedOutput.addMessages(builder.build()); keyedOutput.addMessagesIds(id); - return (long) key.size() + value.size() + metadata.size() + id.size(); + + ByteString offset = ByteString.copyFrom(context.getCurrentRecordOffset()); + if (offset.size() > 0) { + keyedOutput.addMessageOffsets(offset); + } + + return (long) key.size() + value.size() + metadata.size() + id.size() + offset.size(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 3b3348dbc3fa..5e5de232c872 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -56,6 +56,7 @@ message KeyedMessageBundle { optional fixed64 sharding_key = 4; repeated Message messages = 2; repeated bytes messages_ids = 3; + repeated bytes message_offsets = 5; } message LatencyAttribution { @@ -410,6 +411,7 @@ message SourceState { optional bytes state = 1; repeated fixed64 finalize_ids = 2; optional bool only_finalize = 3; + optional bytes offset_limit = 4; } message WatermarkHold { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 840e4910e2a2..625f99e4bdf7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -93,6 +93,13 @@ public boolean requiresDeduping() { return false; } + /** + * Returns whether this source is configured for offset-based deduplication by the runner. + */ + public boolean offsetDeduplication() { + return false; + } + /** * A marker representing the progress and state of an {@link * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}. @@ -139,6 +146,11 @@ public void finalizeCheckpoint() throws IOException { // nothing to do } } + + /* Get offset limit for unbounded source split checkpoint. */ + default byte[] getOffsetLimit() { + return new byte[0]; + } } /** @@ -203,6 +215,11 @@ public byte[] getCurrentRecordId() throws NoSuchElementException { return EMPTY; } + /* Returns the offset for the current record of this unbounded reader. */ + public byte[] getCurrentRecordOffset() { + return EMPTY; + } + /** * Returns a timestamp before or at the timestamps of all future elements read by this reader. *