Skip to content

Commit

Permalink
Offset-based deduplication for unbounded source and Dataflow runner
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 14, 2025
1 parent f1f079b commit d204f6a
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.internal;

import static com.google.api.client.util.Base64.encodeBase64String;
import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.addString;
import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
Expand Down Expand Up @@ -45,6 +46,9 @@
public class CustomSources {
private static final String SERIALIZED_SOURCE = "serialized_source";
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
@VisibleForTesting
static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION =
"serialized_offset_based_deduplication";

private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);

Expand Down Expand Up @@ -93,6 +97,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour
}
checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
addBoolean(
cloudSource.getSpec(),
SERIALIZED_OFFSET_BASED_DEDUPLICATION,
unboundedSource.offsetDeduplication());
} else {
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
}

public byte[] getCurrentRecordOffset() {
if (activeReader == null) {
return new byte[0];
}
return activeReader.getCurrentRecordOffset();
}

public void start(
@Nullable Object key,
Work work,
Expand Down Expand Up @@ -439,6 +446,10 @@ public Map<Long, Runnable> flushState() {
throw new RuntimeException("Exception while encoding checkpoint", e);
}
sourceStateBuilder.setState(stream.toByteString());
byte[] offsetLimit = checkpointMark.getOffsetLimit();
if (offsetLimit.length > 0) {
sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit));
}
}
outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,13 @@ public long add(WindowedValue<T> data) throws IOException {
.setMetadata(metadata);
keyedOutput.addMessages(builder.build());
keyedOutput.addMessagesIds(id);
return (long) key.size() + value.size() + metadata.size() + id.size();

ByteString offset = ByteString.copyFrom(context.getCurrentRecordOffset());
if (offset.size() > 0) {
keyedOutput.addMessageOffsets(offset);
}

return (long) key.size() + value.size() + metadata.size() + id.size() + offset.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ message KeyedMessageBundle {
optional fixed64 sharding_key = 4;
repeated Message messages = 2;
repeated bytes messages_ids = 3;
repeated bytes message_offsets = 5;
}

message LatencyAttribution {
Expand Down Expand Up @@ -410,6 +411,7 @@ message SourceState {
optional bytes state = 1;
repeated fixed64 finalize_ids = 2;
optional bool only_finalize = 3;
optional bytes offset_limit = 4;
}

message WatermarkHold {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public boolean requiresDeduping() {
return false;
}

/**
* Returns whether this source is configured for offset-based deduplication by the runner.
*/
public boolean offsetDeduplication() {
return false;
}

/**
* A marker representing the progress and state of an {@link
* org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}.
Expand Down Expand Up @@ -139,6 +146,11 @@ public void finalizeCheckpoint() throws IOException {
// nothing to do
}
}

/* Get offset limit for unbounded source split checkpoint. */
default byte[] getOffsetLimit() {
return new byte[0];
}
}

/**
Expand Down Expand Up @@ -203,6 +215,11 @@ public byte[] getCurrentRecordId() throws NoSuchElementException {
return EMPTY;
}

/* Returns the offset for the current record of this unbounded reader. */
public byte[] getCurrentRecordOffset() {
return EMPTY;
}

/**
* Returns a timestamp before or at the timestamps of all future elements read by this reader.
*
Expand Down

0 comments on commit d204f6a

Please sign in to comment.