Skip to content

Commit

Permalink
Retry on configurable exception (#6991)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuriyHolinko authored Jan 24, 2025
1 parent cd3b0e7 commit d2b8497
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 41 deletions.
3 changes: 2 additions & 1 deletion buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ val latestReleasedVersion: String by lazy {

class AllowNewAbstractMethodOnAutovalueClasses : AbstractRecordingSeenMembers() {
override fun maybeAddViolation(member: JApiCompatibility): Violation? {
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS, JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS)
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS,
JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS, JApiCompatibilityChangeType.ANNOTATION_ADDED)
if (member.compatibilityChanges.filter { !allowableAutovalueChanges.contains(it.type) }.isEmpty() &&
member is JApiMethod && isAutoValueClass(member.getjApiClass()))
{
Expand Down
8 changes: 7 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
Comparing source compatibility of opentelemetry-sdk-common-1.47.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.46.0.jar
No changes.
**** MODIFIED CLASS: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.RetryPolicy (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.function.Predicate<java.io.IOException> getRetryExceptionPredicate()
+++ NEW ANNOTATION: javax.annotation.Nullable
**** MODIFIED CLASS: PUBLIC ABSTRACT STATIC io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder setRetryExceptionPredicate(java.util.function.Predicate<java.io.IOException>)
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null\\}"
+ ".*" // Maybe additional grpcChannel field, signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "exportAsJson=false, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null\\}"
+ ".*" // Maybe additional signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -68,6 +70,7 @@ public final class JdkHttpSender implements HttpSender {
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
private final Predicate<IOException> retryExceptionPredicate;

// Visible for testing
JdkHttpSender(
Expand All @@ -91,6 +94,10 @@ public final class JdkHttpSender implements HttpSender {
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
this.retryPolicy = retryPolicy;
this.retryExceptionPredicate =
Optional.ofNullable(retryPolicy)
.map(RetryPolicy::getRetryExceptionPredicate)
.orElse(JdkHttpSender::isRetryableException);
}

JdkHttpSender(
Expand Down Expand Up @@ -235,7 +242,7 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
}
}
if (exception != null) {
boolean retryable = isRetryableException(exception);
boolean retryable = retryExceptionPredicate.test(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Interceptor;
Expand All @@ -33,7 +34,7 @@ public final class RetryInterceptor implements Interceptor {

private final RetryPolicy retryPolicy;
private final Function<Response, Boolean> isRetryable;
private final Function<IOException, Boolean> isRetryableException;
private final Predicate<IOException> retryExceptionPredicate;
private final Sleeper sleeper;
private final BoundedLongGenerator randomLong;

Expand All @@ -42,7 +43,9 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
this(
retryPolicy,
isRetryable,
RetryInterceptor::isRetryableException,
retryPolicy.getRetryExceptionPredicate() == null
? RetryInterceptor::isRetryableException
: retryPolicy.getRetryExceptionPredicate(),
TimeUnit.NANOSECONDS::sleep,
bound -> ThreadLocalRandom.current().nextLong(bound));
}
Expand All @@ -51,12 +54,12 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
RetryInterceptor(
RetryPolicy retryPolicy,
Function<Response, Boolean> isRetryable,
Function<IOException, Boolean> isRetryableException,
Predicate<IOException> retryExceptionPredicate,
Sleeper sleeper,
BoundedLongGenerator randomLong) {
this.retryPolicy = retryPolicy;
this.isRetryable = isRetryable;
this.isRetryableException = isRetryableException;
this.retryExceptionPredicate = retryExceptionPredicate;
this.sleeper = sleeper;
this.randomLong = randomLong;
}
Expand Down Expand Up @@ -109,7 +112,7 @@ public Response intercept(Chain chain) throws IOException {
}
}
if (exception != null) {
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
boolean retryable = retryExceptionPredicate.test(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down Expand Up @@ -144,6 +147,11 @@ private static String responseStringRepresentation(Response response) {
return joiner.toString();
}

// Visible for testing
boolean shouldRetryOnException(IOException e) {
return retryExceptionPredicate.test(e);
}

// Visible for testing
static boolean isRetryableException(IOException e) {
if (e instanceof SocketTimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpRetryException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand All @@ -48,34 +51,38 @@ class RetryInterceptorTest {

@Mock private RetryInterceptor.Sleeper sleeper;
@Mock private RetryInterceptor.BoundedLongGenerator random;
private Function<IOException, Boolean> isRetryableException;
private Predicate<IOException> retryExceptionPredicate;

private RetryInterceptor retrier;
private OkHttpClient client;

@BeforeEach
void setUp() {
// Note: cannot replace this with lambda or method reference because we need to spy on it
isRetryableException =
Logger logger = java.util.logging.Logger.getLogger(RetryInterceptor.class.getName());
logger.setLevel(Level.FINER);
retryExceptionPredicate =
spy(
new Function<IOException, Boolean>() {
new Predicate<IOException>() {
@Override
public Boolean apply(IOException exception) {
return RetryInterceptor.isRetryableException(exception);
public boolean test(IOException e) {
return RetryInterceptor.isRetryableException(e)
|| (e instanceof HttpRetryException
&& e.getMessage().contains("timeout retry"));
}
});

RetryPolicy retryPolicy =
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.setRetryExceptionPredicate(retryExceptionPredicate)
.build();

retrier =
new RetryInterceptor(
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.build(),
r -> !r.isSuccessful(),
isRetryableException,
sleeper,
random);
retryPolicy, r -> !r.isSuccessful(), retryExceptionPredicate, sleeper, random);
client = new OkHttpClient.Builder().addInterceptor(retrier).build();
}

Expand Down Expand Up @@ -154,7 +161,7 @@ void connectTimeout() throws Exception {
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -174,7 +181,7 @@ void connectException() throws Exception {
.execute())
.isInstanceOfAny(ConnectException.class, SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -190,16 +197,16 @@ private static int freePort() {
@Test
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
// Override isRetryableException so that no exception is retryable
when(isRetryableException.apply(any())).thenReturn(false);
// Override retryPredicate so that no exception is retryable
when(retryExceptionPredicate.test(any())).thenReturn(false);

// Connecting to a non-routable IP address to trigger connection timeout
assertThatThrownBy(
() ->
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(1)).apply(any());
verify(retryExceptionPredicate, times(1)).test(any());
verify(sleeper, never()).sleep(anyLong());
}

Expand All @@ -214,20 +221,51 @@ private OkHttpClient connectTimeoutClient() {
void isRetryableException() {
// Should retry on connection timeouts, where error message is "Connect timed out" or "connect
// timed out"
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("Connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("connect timed out")))
.isTrue();
// Shouldn't retry on read timeouts, where error message is "Read timed out"
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("Read timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Read timed out")))
.isFalse();
// Shouldn't retry on write timeouts, where error message is "timeout", or other IOException
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("timeout")))
// Shouldn't retry on write timeouts or other IOException
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("timeout"))).isFalse();
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException())).isTrue();
assertThat(retrier.shouldRetryOnException(new IOException("error"))).isFalse();

// Testing configured predicate
assertThat(retrier.shouldRetryOnException(new HttpRetryException("error", 400))).isFalse();
assertThat(retrier.shouldRetryOnException(new HttpRetryException("timeout retry", 400)))
.isTrue();
}

@Test
void isRetryableExceptionDefaultBehaviour() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(RetryPolicy.getDefault(), OkHttpHttpSender::isRetryable);
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("Connect timed out")))
.isFalse();
}

@Test
void isRetryableExceptionCustomRetryPredicate() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(
RetryPolicy.builder()
.setRetryExceptionPredicate((IOException e) -> e.getMessage().equals("retry"))
.build(),
OkHttpHttpSender::isRetryable);

assertThat(retryInterceptor.shouldRetryOnException(new IOException("some message"))).isFalse();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("retry"))).isTrue();
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isFalse();
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException())).isTrue();
assertThat(RetryInterceptor.isRetryableException(new IOException("error"))).isFalse();
}

private Response sendRequest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* Configuration for exporter exponential retry policy.
Expand Down Expand Up @@ -66,6 +69,13 @@ public static RetryPolicyBuilder builder() {
/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();

/**
* Returns the predicate used to determine if thrown exception is retryableor {@code null} if no
* predicate was set.
*/
@Nullable
public abstract Predicate<IOException> getRetryExceptionPredicate();

/** Builder for {@link RetryPolicy}. */
@AutoValue.Builder
public abstract static class RetryPolicyBuilder {
Expand Down Expand Up @@ -96,6 +106,10 @@ public abstract static class RetryPolicyBuilder {
*/
public abstract RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier);

/** Set the predicate to determine if retry should happen based on exception. */
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);

abstract RetryPolicy autoBuild();

/** Build and return a {@link RetryPolicy} with the values of this builder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void build() {
assertThat(retryPolicy.getInitialBackoff()).isEqualTo(Duration.ofMillis(2));
assertThat(retryPolicy.getMaxBackoff()).isEqualTo(Duration.ofSeconds(1));
assertThat(retryPolicy.getBackoffMultiplier()).isEqualTo(1.1);
assertThat(retryPolicy.getRetryExceptionPredicate()).isEqualTo(null);
}

@Test
Expand Down

0 comments on commit d2b8497

Please sign in to comment.