diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index 3a988a8248c0..ea7989873712 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -42,7 +42,6 @@ dependencies { implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:job-management", configuration: "shadow") - implementation library.java.google_api_services_dataflow implementation library.java.vendored_guava_32_1_2_jre implementation library.java.joda_time implementation library.java.vendored_grpc_1_69_0 @@ -53,6 +52,5 @@ dependencies { testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation library.java.slf4j_api - testImplementation(library.java.google_api_services_dataflow) testRuntimeOnly library.java.slf4j_simple } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index 7e3af8da6d77..44995b979dd0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -17,19 +17,8 @@ */ package org.apache.beam.runners.core.metrics; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.dataflow.model.Base2Exponent; -import com.google.api.services.dataflow.model.BucketOptions; -import com.google.api.services.dataflow.model.DataflowHistogramValue; -import com.google.api.services.dataflow.model.Linear; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DoubleCoder; @@ -37,14 +26,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; -// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, and rename to -// remove Dataflow reference. - /** A set of functions used to encode and decode common monitoring info types. */ public class MonitoringInfoEncodings { private static final Coder VARINT_CODER = VarLongCoder.of(); @@ -178,98 +163,4 @@ public static double decodeDoubleCounter(ByteString payload) { throw new RuntimeException(e); } } - - /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ - public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { - try { - int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets(); - - DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue(); - - if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) { - HistogramData.LinearBuckets buckets = - (HistogramData.LinearBuckets) inputHistogram.getBucketType(); - Linear linear = new Linear(); - linear.setNumberOfBuckets(numberOfBuckets); - linear.setWidth(buckets.getWidth()); - linear.setStart(buckets.getStart()); - outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear)); - } else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) { - HistogramData.ExponentialBuckets buckets = - (HistogramData.ExponentialBuckets) inputHistogram.getBucketType(); - Base2Exponent base2Exp = new Base2Exponent(); - base2Exp.setNumberOfBuckets(numberOfBuckets); - base2Exp.setScale(buckets.getScale()); - outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp)); - } else { - throw new HistogramParsingException( - "Unable to encode Int64 Histogram, bucket is not recognized"); - } - - outputHistogram2.setCount(inputHistogram.getTotalCount()); - - List bucketCounts = new ArrayList<>(); - - Arrays.stream(inputHistogram.getBucketCount()) - .forEach( - val -> { - bucketCounts.add(val); - }); - - outputHistogram2.setBucketCounts(bucketCounts); - - ObjectMapper objectMapper = new ObjectMapper(); - String jsonString = objectMapper.writeValueAsString(outputHistogram2); - - return ByteString.copyFromUtf8(jsonString); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - static class HistogramParsingException extends RuntimeException { - public HistogramParsingException(String message) { - super(message); - } - } - - /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ - public static HistogramData decodeInt64Histogram(ByteString payload) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards - DataflowHistogramValue newHist = new DataflowHistogramValue(); - newHist.setCount(jsonNode.get("count").asLong()); - - List bucketCounts = new ArrayList<>(); - Iterator itr = jsonNode.get("bucketCounts").iterator(); - while (itr.hasNext()) { - Long item = itr.next().asLong(); - bucketCounts.add(item); - } - newHist.setBucketCounts(bucketCounts); - - if (jsonNode.get("bucketOptions").has("linear")) { - Linear linear = new Linear(); - JsonNode linearNode = jsonNode.get("bucketOptions").get("linear"); - linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt()); - linear.setWidth(linearNode.get("width").asDouble()); - linear.setStart(linearNode.get("start").asDouble()); - newHist.setBucketOptions(new BucketOptions().setLinear(linear)); - } else if (jsonNode.get("bucketOptions").has("exponential")) { - Base2Exponent base2Exp = new Base2Exponent(); - JsonNode expNode = jsonNode.get("bucketOptions").get("exponential"); - base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt()); - base2Exp.setScale(expNode.get("scale").asInt()); - newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp)); - } else { - throw new HistogramParsingException( - "Unable to parse Int64 Histogram, bucket is not recognized"); - } - return new HistogramData(newHist); - } catch (IOException e) { - throw new RuntimeException(e); - } - } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java index 100cd4a0e0c2..dde180b150de 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java @@ -17,43 +17,30 @@ */ package org.apache.beam.runners.core.metrics; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.HistogramParsingException; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeDoubleCounter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.junit.Assert.assertEquals; import java.util.Collections; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link MonitoringInfoEncodings}. */ @RunWith(JUnit4.class) public class MonitoringInfoEncodingsTest { - @Rule - public ExpectedLogs monitoringInfoCodingsExpectedLogs = - ExpectedLogs.none(MonitoringInfoEncodings.class); - - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test public void testInt64DistributionEncoding() { DistributionData data = DistributionData.create(1L, 2L, 3L, 4L); @@ -118,36 +105,4 @@ public void testDoubleCounterEncoding() { assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload); assertEquals(1.0, decodeDoubleCounter(payload), 0.001); } - - @Test - public void testHistgramInt64EncodingLinearHist() { - HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5); - - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(5, 10, 15, 20); - ByteString payload = encodeInt64Histogram(inputHistogram); - - assertEquals(inputHistogram, decodeInt64Histogram(payload)); - } - - @Test - public void testHistgramInt64EncodingExpHist() { - HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10); - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(2, 4, 8, 16, 32); - ByteString payload = encodeInt64Histogram(inputHistogram); - assertEquals(inputHistogram, decodeInt64Histogram(payload)); - } - - @Test - public void testHistgramInt64EncodingUnsupportedBucket() { - thrown.expect(HistogramParsingException.class); - thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized"); - - HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of(); - - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(2, 4, 8, 16, 32); - encodeInt64Histogram(inputHistogram); - } } diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 527e13949ecf..57e47cef6502 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -75,7 +75,6 @@ dependencies { permitUnusedDeclared library.java.antlr permitUsedUndeclared library.java.antlr_runtime // Required to load constants from the model, e.g. max timestamp for global window - provided library.java.google_api_services_dataflow shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") shadow project(path: ":model:job-management", configuration: "shadow") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index c1ac4bcfba23..65ccda06be65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.util; -import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.auto.value.AutoValue; import com.google.auto.value.extension.memoized.Memoized; import java.io.Serializable; @@ -25,8 +24,6 @@ import java.util.Arrays; import java.util.Objects; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath; import org.checkerframework.checker.nullness.qual.Nullable; @@ -77,41 +74,6 @@ public HistogramData(BucketType bucketType) { this.sumOfSquaredDeviations = 0; } - /** - * Create a histogram from DataflowHistogramValue proto. - * - * @param histogramProto DataflowHistogramValue proto used to populate stats for the histogram. - */ - public HistogramData(DataflowHistogramValue histogramProto) { - int numBuckets; - if (histogramProto.getBucketOptions().getLinear() != null) { - double start = histogramProto.getBucketOptions().getLinear().getStart(); - double width = histogramProto.getBucketOptions().getLinear().getWidth(); - numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets(); - this.bucketType = LinearBuckets.of(start, width, numBuckets); - this.buckets = new long[bucketType.getNumBuckets()]; - - int idx = 0; - for (long val : histogramProto.getBucketCounts()) { - this.buckets[idx] = val; - this.numBoundedBucketRecords += val; - idx++; - } - } else { - // Assume it's a exponential histogram if its not linear - int scale = histogramProto.getBucketOptions().getExponential().getScale(); - numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets(); - this.bucketType = ExponentialBuckets.of(scale, numBuckets); - this.buckets = new long[bucketType.getNumBuckets()]; - int idx = 0; - for (long val : histogramProto.getBucketCounts()) { - this.buckets[idx] = val; - this.numBoundedBucketRecords += val; - idx++; - } - } - } - public BucketType getBucketType() { return this.bucketType; } @@ -331,10 +293,6 @@ public synchronized long getTopBucketCount() { return numTopRecords; } - public synchronized long[] getBucketCount() { - return buckets; - } - public synchronized double getTopBucketMean() { return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords; } @@ -615,42 +573,6 @@ public double getRangeTo() { // Note: equals() and hashCode() are implemented by the AutoValue. } - /** Used for testing unsupported Bucket formats. */ - @AutoValue - @Internal - @VisibleForTesting - public abstract static class UnsupportedBuckets implements BucketType { - - public static UnsupportedBuckets of() { - return new AutoValue_HistogramData_UnsupportedBuckets(0); - } - - @Override - public int getBucketIndex(double value) { - return 0; - } - - @Override - public double getBucketSize(int index) { - return 0; - } - - @Override - public double getAccumulatedBucketSize(int index) { - return 0; - } - - @Override - public double getRangeFrom() { - return 0; - } - - @Override - public double getRangeTo() { - return 0; - } - } - @Override public synchronized boolean equals(@Nullable Object object) { if (object instanceof HistogramData) { diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 53a3359bfdc4..b213a716dcf9 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -30,7 +30,6 @@ dependencies { provided project(path: ":model:pipeline", configuration: "shadow") provided project(path: ":sdks:java:core", configuration: "shadow") provided project(path: ":sdks:java:transform-service:launcher") - provided library.java.google_api_services_dataflow provided library.java.avro provided library.java.jackson_databind provided library.java.joda_time @@ -80,5 +79,4 @@ dependencies { shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") shadowTestRuntimeClasspath library.java.slf4j_jdk14 permitUnusedDeclared library.java.avro - permitUnusedDeclared library.java.google_api_services_dataflow }