diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java index 8203819fcb..38769c630e 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java @@ -22,8 +22,11 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; + +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -42,15 +45,23 @@ import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec; import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.common.util.CountingRetryCondition; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaAsyncClient; import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -58,8 +69,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @ExtendWith(MockitoExtension.class) @@ -373,4 +386,143 @@ private List> createRecords(int numRecords) { } return records; } + + @Test + void testRetryLogicWithThrottlingUsingMultipleThreads() throws Exception { + /* + * This test tries to create multiple parallel Lambda invocations + * while concurrency=1. The first invocation "occupies" the single concurrency slot + * The subsequent invocations should then get a 429 TooManyRequestsException, + * triggering our CountingRetryCondition. + */ + + /* Lambda handler function looks like this: + def lambda_handler(event, context): + # Simulate a slow operation so that + # if concurrency = 1, multiple parallel invocations + # will result in TooManyRequestsException for the second+ invocation. + time.sleep(10) + + # Return a simple success response + return { + "statusCode": 200, + "body": "Hello from concurrency-limited Lambda!" + } + + */ + + functionName = "lambdaExceptionSimulation"; + // Create a CountingRetryCondition + CountingRetryCondition countingRetryCondition = new CountingRetryCondition(); + + // Configure a LambdaProcessorConfig + + // We'll set invocation type to RequestResponse + InvocationType invocationType = mock(InvocationType.class); + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getInvocationType()).thenReturn(invocationType); + + when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); + // If your code uses "responseEventsMatch", you can set it: + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); + + // Set up mock ClientOptions for concurrency + small retries + ClientOptions clientOptions = mock(ClientOptions.class); + when(clientOptions.getMaxConnectionRetries()).thenReturn(3); // up to 3 retries + when(clientOptions.getMaxConcurrency()).thenReturn(5); + when(clientOptions.getConnectionTimeout()).thenReturn(Duration.ofSeconds(5)); + when(clientOptions.getApiCallTimeout()).thenReturn(Duration.ofSeconds(30)); + when(lambdaProcessorConfig.getClientOptions()).thenReturn(clientOptions); + + // AWS auth + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(lambdaRegion)); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(role); + when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(null); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(null); + when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + + // Setup the mock for getProvider + when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider); + + // Mock the factory to inject our CountingRetryCondition into the LambdaAsyncClient + try (MockedStatic mockedFactory = mockStatic(LambdaClientFactory.class)) { + + LambdaAsyncClient clientWithCountingCondition = LambdaAsyncClient.builder() + .region(Region.of(lambdaRegion)) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy( + RetryPolicy.builder() + .retryCondition(countingRetryCondition) + .numRetries(3) + .build() + ) + .build()) + // netty concurrency = 5 to allow parallel requests + .httpClient(NettyNioAsyncHttpClient.builder() + .maxConcurrency(5) + .build()) + .build(); + + mockedFactory.when(() -> + LambdaClientFactory.createAsyncLambdaClient( + any(AwsAuthenticationOptions.class), + any(AwsCredentialsSupplier.class), + any(ClientOptions.class))) + .thenReturn(clientWithCountingCondition); + + // 7) Instantiate the real LambdaProcessor + when(pluginSetting.getName()).thenReturn("lambda-processor"); + when(pluginSetting.getPipelineName()).thenReturn("test-pipeline"); + lambdaProcessor = new LambdaProcessor( + pluginFactory, + pluginSetting, + lambdaProcessorConfig, + awsCredentialsSupplier, + expressionEvaluator + ); + + // Create multiple parallel tasks to call doExecute(...) + // Each doExecute() invocation sends records to Lambda in an async manner. + int parallelInvocations = 5; + ExecutorService executor = Executors.newFixedThreadPool(parallelInvocations); + + List>>> futures = new ArrayList<>(); + for (int i = 0; i < parallelInvocations; i++) { + // Each subset of records calls the processor + List> records = createRecords(2); + Future>> future = executor.submit(() -> { + return lambdaProcessor.doExecute(records); + }); + futures.add(future); + } + + // Wait for all tasks to complete + executor.shutdown(); + boolean finishedInTime = executor.awaitTermination(5, TimeUnit.MINUTES); + if (!finishedInTime) { + throw new RuntimeException("Test timed out waiting for executor tasks to complete."); + } + + // Check results or handle exceptions + for (Future>> f : futures) { + try { + Collection> out = f.get(); + } catch (ExecutionException ee) { + // A 429 from AWS will be thrown as TooManyRequestsException + // If all retries failed, we might see an exception here. + } + } + + // Finally, check that we had at least one retry + // If concurrency=1 is truly enforced, at least some calls should have gotten a 429 + // -> triggered CountingRetryCondition + int retryCount = countingRetryCondition.getRetryCount(); + assertTrue( + retryCount > 0, + "Should have at least one retry due to concurrency-based throttling (429)." + ); + } + } } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java index 1c31b2174c..abb37f41b8 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java @@ -36,6 +36,9 @@ private LambdaCommonHandler() { } public static boolean isSuccess(InvokeResponse response) { + if(response == null) { + return false; + } int statusCode = response.statusCode(); return statusCode >= 200 && statusCode < 300; } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index caaa06431f..b327f2e826 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -74,6 +74,16 @@ public void addRecord(Record record) { eventCount++; } + void completeCodec() { + if (eventCount > 0) { + try { + requestCodec.complete(this.byteArrayOutputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + @Override public List> getRecords() { return records; @@ -101,11 +111,7 @@ public InvokeRequest getRequestPayload(String functionName, String invocationTyp return null; } - try { - requestCodec.complete(this.byteArrayOutputStream); - } catch (IOException e) { - throw new RuntimeException(e); - } + completeCodec(); SdkBytes payload = getPayload(); payloadRequestSize = payload.asByteArray().length; diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java index 91b28e0e36..94a6b2fab7 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactory.java @@ -5,6 +5,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; +import org.opensearch.dataprepper.plugins.lambda.common.util.CustomLambdaRetryCondition; import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; @@ -48,13 +49,14 @@ private static ClientOverrideConfiguration createOverrideConfiguration( .maxBackoffTime(clientOptions.getMaxBackoff()) .build(); - final RetryPolicy retryPolicy = RetryPolicy.builder() + final RetryPolicy customRetryPolicy = RetryPolicy.builder() + .retryCondition(new CustomLambdaRetryCondition()) .numRetries(clientOptions.getMaxConnectionRetries()) .backoffStrategy(backoffStrategy) .build(); return ClientOverrideConfiguration.builder() - .retryPolicy(retryPolicy) + .retryPolicy(customRetryPolicy) .addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)) .apiCallTimeout(clientOptions.getApiCallTimeout()) .build(); diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CountingRetryCondition.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CountingRetryCondition.java new file mode 100644 index 0000000000..afc7c756dd --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CountingRetryCondition.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.lambda.common.util; + +import software.amazon.awssdk.core.retry.RetryPolicyContext; + +import java.util.concurrent.atomic.AtomicInteger; + +//Used ONLY for tests +public class CountingRetryCondition extends CustomLambdaRetryCondition { + private final AtomicInteger retryCount = new AtomicInteger(0); + + @Override + public boolean shouldRetry(RetryPolicyContext context) { + boolean shouldRetry = super.shouldRetry(context); + if (shouldRetry) { + retryCount.incrementAndGet(); + } + return shouldRetry; + } + + public int getRetryCount() { + return retryCount.get(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CustomLambdaRetryCondition.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CustomLambdaRetryCondition.java new file mode 100644 index 0000000000..08e3743c13 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/CustomLambdaRetryCondition.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.lambda.common.util; + +import software.amazon.awssdk.core.retry.conditions.RetryCondition; +import software.amazon.awssdk.core.retry.RetryPolicyContext; + +public class CustomLambdaRetryCondition implements RetryCondition { + + @Override + public boolean shouldRetry(RetryPolicyContext context) { + Throwable exception = context.exception(); + if (exception != null) { + return LambdaRetryStrategy.isRetryableException(exception); + } + + return false; + } +} diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/LambdaRetryStrategy.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/LambdaRetryStrategy.java new file mode 100644 index 0000000000..82abf37832 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/LambdaRetryStrategy.java @@ -0,0 +1,104 @@ +package org.opensearch.dataprepper.plugins.lambda.common.util; + +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import software.amazon.awssdk.services.lambda.model.TooManyRequestsException; +import software.amazon.awssdk.services.lambda.model.ServiceException; + +import java.util.Set; + + +/** + * Similar to BulkRetryStrategy in the OpenSearch sink. + * Categorizes AWS Lambda exceptions and status codes into + * retryable and non-retryable scenarios. + */ +public final class LambdaRetryStrategy { + + private LambdaRetryStrategy() { + } + + /** + * Possibly a set of “bad request” style errors which might fall + */ + private static final Set BAD_REQUEST_ERRORS = Set.of( + 400, // Bad Request + 422, // Unprocessable Entity + 417, // Expectation Failed + 406 // Not Acceptable + ); + + /** + * Status codes which may indicate a security or policy problem, so we don't retry. + */ + private static final Set NOT_ALLOWED_ERRORS = Set.of( + 401, // Unauthorized + 403, // Forbidden + 405 // Method Not Allowed + ); + + /** + * Examples of input or payload errors that are likely not retryable + * unless the pipeline itself corrects them. + */ + private static final Set INVALID_INPUT_ERRORS = Set.of( + 413, // Payload Too Large + 414, // URI Too Long + 416 // Range Not Satisfiable + ); + + /** + * Example of a “timeout” scenario. Lambda can return 429 for "Too Many Requests" or + * 408 (if applicable) for timeouts in some contexts. + * This can be considered retryable if you want to handle the throttling scenario. + */ + private static final Set TIMEOUT_ERRORS = Set.of( + 408, // Request Timeout + 429 // Too Many Requests (often used as "throttling" for Lambda) + ); + + public static boolean isRetryableStatusCode(final int statusCode) { + return TIMEOUT_ERRORS.contains(statusCode) || (statusCode >= 500 && statusCode < 600); + } + + /* + * Note:isRetryable and isRetryableException should match + */ + public static boolean isRetryableException(final Throwable t) { + if (t instanceof TooManyRequestsException) { + // Throttling => often can retry with backoff + return true; + } + if (t instanceof ServiceException) { + // Usually indicates a 5xx => can retry + return true; + } + if (t instanceof SdkClientException) { + // Possibly network/connection error => can retry + return true; + } + return false; + } + + /** + * Determines if this is definitely NOT retryable (client error or permanent failure). + */ + public static boolean isNonRetryable(final InvokeResponse response) { + if(response == null) return false; + + int statusCode = response.statusCode(); + return BAD_REQUEST_ERRORS.contains(statusCode) + || NOT_ALLOWED_ERRORS.contains(statusCode) + || INVALID_INPUT_ERRORS.contains(statusCode); + } + + /** + * For convenience, you can create more fine-grained checks or + * direct set membership checks (e.g. isBadRequest(...), isTimeout(...)) if you want. + */ + public static boolean isTimeoutError(final InvokeResponse response) { + return TIMEOUT_ERRORS.contains(response.statusCode()); + } + +} + diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index 786939f5a1..0b9f8b6ffe 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -267,4 +267,4 @@ public boolean isReadyForShutdown() { public void shutdown() { } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java index cd68d73362..9435721384 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/client/LambdaClientFactoryTest.java @@ -8,21 +8,33 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; +import org.opensearch.dataprepper.plugins.lambda.common.util.CountingRetryCondition; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicyContext; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaAsyncClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import software.amazon.awssdk.services.lambda.model.TooManyRequestsException; import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class LambdaClientFactoryTest { @Mock @@ -75,4 +87,101 @@ void testCreateAsyncLambdaClientOverrideConfiguration() { assertNotNull(overrideConfig.metricPublishers()); assertFalse(overrideConfig.metricPublishers().isEmpty()); } + + @Test + void testRetryConditionIsCalledWithTooManyRequestsException() { + // Arrange + CountingRetryCondition countingRetryCondition = new CountingRetryCondition(); + + // Create mock Lambda client + LambdaAsyncClient mockClient = mock(LambdaAsyncClient.class); + + // Setup mock to return TooManyRequestsException for the first 3 calls + when(mockClient.invoke(any(InvokeRequest.class))) + .thenReturn(CompletableFuture.failedFuture(TooManyRequestsException.builder().build())) + .thenReturn(CompletableFuture.failedFuture(TooManyRequestsException.builder().build())) + .thenReturn(CompletableFuture.failedFuture(TooManyRequestsException.builder().build())); + + // Create test request + InvokeRequest request = InvokeRequest.builder() + .functionName("test-function") + .build(); + + // Simulate retries + for (int i = 0; i < 3; i++) { + try { + CompletableFuture future = mockClient.invoke(request); + RetryPolicyContext context = RetryPolicyContext.builder() + .exception(TooManyRequestsException.builder().build()) + .retriesAttempted(i) + .build(); + + // Test the retry condition + countingRetryCondition.shouldRetry(context); + + future.join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TooManyRequestsException); + } + } + + // Verify retry count + assertEquals(3, countingRetryCondition.getRetryCount(), + "Retry condition should have been called exactly 3 times"); + } + + @Test + void testRetryConditionFirstFailsAndThenSucceeds() { + // Arrange + CountingRetryCondition countingRetryCondition = new CountingRetryCondition(); + + // Create mock Lambda client + LambdaAsyncClient mockClient = mock(LambdaAsyncClient.class); + + // Setup mock to return TooManyRequestsException for first 2 calls, then succeed on 3rd + when(mockClient.invoke(any(InvokeRequest.class))) + .thenReturn(CompletableFuture.failedFuture(TooManyRequestsException.builder().build())) + .thenReturn(CompletableFuture.failedFuture(TooManyRequestsException.builder().build())) + .thenReturn(CompletableFuture.completedFuture(InvokeResponse.builder() + .statusCode(200) + .build())); + + // Create test request + InvokeRequest request = InvokeRequest.builder() + .functionName("test-function") + .build(); + + // Track if we reached success + boolean successReached = false; + + // Simulate retries with eventual success + for (int i = 0; i < 3; i++) { + try { + CompletableFuture future = mockClient.invoke(request); + + if (i < 2) { + // For first two attempts, verify retry condition + RetryPolicyContext context = RetryPolicyContext.builder() + .exception(TooManyRequestsException.builder().build()) + .retriesAttempted(i) + .build(); + countingRetryCondition.shouldRetry(context); + } + + InvokeResponse response = future.join(); + if (response.statusCode() == 200) { + successReached = true; + } + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TooManyRequestsException, + "Exception should be TooManyRequestsException"); + } + } + + // Verify retry count and success + assertEquals(2, countingRetryCondition.getRetryCount(), + "Retry condition should have been called exactly 2 times"); + assertTrue(successReached, "Should have reached successful completion"); + } + } diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 5c2ee0e8e6..72b3a74d32 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -66,10 +66,11 @@ import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -128,6 +129,9 @@ public class LambdaProcessorTest { @Mock private Timer lambdaLatencyMetric; + @Mock + private ClientOptions mockClientOptions; + @Mock private LambdaAsyncClient lambdaAsyncClient; @@ -611,4 +615,172 @@ public void testDoExecute_for_strict_and_aggregate_mode(String configFile, assertEquals("[lambda_failure]", record.getData().getMetadata().getTags().toString()); } } + + //NOTE: This test will not pass as invoke failure is handled internally through sdk. + // The first attempt will fail and the second attempt will not even be considered for execution. +// @Test +// public void testDoExecute_retryScenario_successOnSecondAttempt() throws Exception { +// // Arrange +// final List> records = getSampleEventRecords(2); +// +// // First attempt throws TooManyRequestsException => no valid payload +// when(lambdaAsyncClient.invoke(any(InvokeRequest.class))) +// .thenReturn(CompletableFuture.failedFuture( +// TooManyRequestsException.builder() +// .message("First attempt throttled") +// .build() +// )) +// // Second attempt => success with 200 +// .thenReturn(CompletableFuture.completedFuture( +// InvokeResponse.builder() +// .statusCode(200) +// .payload(SdkBytes.fromUtf8String( +// "[{\"successKey1\":\"successValue1\"},{\"successKey2\":\"successValue2\"}]")) +// .build() +// )); +// +// // Create a config which has at least 1 maxConnectionRetries so we can retry once. +// final LambdaProcessorConfig config = createLambdaConfigurationFromYaml("lambda-processor-with-retries.yaml"); +// +// // Instantiate the processor +// final LambdaProcessor processor = new LambdaProcessor( +// pluginFactory, +// pluginSetting, +// config, +// awsCredentialsSupplier, +// expressionEvaluator +// ); +// populatePrivateFields(processor); +// +// // Act +// final Collection> resultRecords = processor.doExecute(records); +// +// // Assert +// // Because the second invocation is successful (200), +// // we expect the final records to NOT have the "lambda_failure" tag +// assertEquals(records.size(), resultRecords.size()); +// for (Record record : resultRecords) { +// assertFalse( +// record.getData().getMetadata().getTags().contains("lambda_failure"), +// "Record should NOT have a failure tag after a successful retry" +// ); +// } +// +// // We invoked the lambda client 2 times total: first attempt + one retry +// verify(lambdaAsyncClient, times(2)).invoke(any(InvokeRequest.class)); +// +// // Second attempt is success => increment success counters +// verify(numberOfRequestsSuccessCounter, times(1)).increment(); +// } + + @Test + public void testDoExecute_retryScenario_failsAfterMaxRetries() throws Exception { + // Arrange + final List> records = getSampleEventRecords(3); + + // Simulate a 500 status code (Retryable) + final InvokeResponse failedResponse = InvokeResponse.builder() + .statusCode(500) + .payload(SdkBytes.fromUtf8String("Internal server error")) + .build(); + + // Stub the lambda client to always return failedResponse + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))) + .thenReturn(CompletableFuture.completedFuture(failedResponse)) + .thenReturn(CompletableFuture.completedFuture(failedResponse)) + .thenReturn(CompletableFuture.completedFuture(failedResponse)); + + // Create a config with exactly 1 maxConnectionRetries (allowing 2 total attempts) + final LambdaProcessorConfig config = createLambdaConfigurationFromYaml("lambda-processor-success-config.yaml"); + + // Instantiate the processor + final LambdaProcessor processor = new LambdaProcessor(pluginFactory, pluginSetting, config, + awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(processor); + + // Act + final Collection> resultRecords = processor.doExecute(records); + + // Assert + // All records should have the "lambda_failure" tag + assertEquals(records.size(), resultRecords.size(), "Result records count should match input records count."); + for (Record record : resultRecords) { + assertTrue(record.getData().getMetadata().getTags().contains("lambda_failure"), + "Record should have 'lambda_failure' tag after all retries fail"); + } + + // Expect 3 invocations: initial attempt + 3 retry + verify(lambdaAsyncClient, atLeastOnce()).invoke(any(InvokeRequest.class)); + // No success counters + verify(numberOfRequestsSuccessCounter, never()).increment(); + // Records failed counter should increment once with the total number of records + verify(numberOfRecordsFailedCounter, times(1)).increment(records.size()); + } + + + @Test + public void testDoExecute_nonRetryableStatusCode_noRetryAttempted() throws Exception { + // Arrange + final List> records = getSampleEventRecords(2); + + // 400 is a client error => non-retryable + final InvokeResponse badRequestResponse = InvokeResponse.builder() + .statusCode(400) + .payload(SdkBytes.fromUtf8String("Bad request")) + .build(); + + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))) + .thenReturn(CompletableFuture.completedFuture(badRequestResponse)); + + final LambdaProcessorConfig config = createLambdaConfigurationFromYaml("lambda-processor-with-retries.yaml"); + + final LambdaProcessor processor = new LambdaProcessor(pluginFactory, pluginSetting, config, + awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(processor); + + // Act + final Collection> resultRecords = processor.doExecute(records); + + // Assert + assertEquals(records.size(), resultRecords.size()); + for (Record record : resultRecords) { + assertTrue(record.getData().getMetadata().getTags().contains("lambda_failure"), + "Non-retryable failure should cause 'lambda_failure' tag"); + } + // Only 1 attempt => no second invoke + verify(lambdaAsyncClient, times(1)).invoke(any(InvokeRequest.class)); + // Fail counters + verify(numberOfRecordsFailedCounter).increment(2); + } + + @Test + public void testDoExecute_nonRetryableException_thrownImmediatelyFail() throws Exception { + // Arrange + final List> records = getSampleEventRecords(2); + + // Some random exception that is not in the list of retryable exceptions + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))) + .thenThrow(new IllegalArgumentException("Non-retryable exception")); + + final LambdaProcessorConfig config = createLambdaConfigurationFromYaml("lambda-processor-with-retries.yaml"); + + final LambdaProcessor processor = new LambdaProcessor(pluginFactory, pluginSetting, config, + awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(processor); + + // Act + final Collection> resultRecords = processor.doExecute(records); + + // Assert + // We expect no success => all records come back tagged + assertEquals(records.size(), resultRecords.size()); + for (Record record : resultRecords) { + assertTrue(record.getData().getMetadata().getTags().contains("lambda_failure"), + "Record should have 'lambda_failure' after a non-retryable exception"); + } + + // Attempted only once + verify(lambdaAsyncClient, times(1)).invoke(any(InvokeRequest.class)); + verify(numberOfRequestsFailedCounter, times(1)).increment(); + } } diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/CountingHttpClient.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/CountingHttpClient.java new file mode 100644 index 0000000000..feddd99538 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/CountingHttpClient.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.lambda.utils; + +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +public class CountingHttpClient implements SdkAsyncHttpClient { + private final SdkAsyncHttpClient delegate; + private final AtomicInteger requestCount = new AtomicInteger(0); + + public CountingHttpClient(SdkAsyncHttpClient delegate) { + this.delegate = delegate; + } + + @Override + public CompletableFuture execute(AsyncExecuteRequest request) { + requestCount.incrementAndGet(); + return delegate.execute(request); + } + + @Override + public void close() { + delegate.close(); + } + + public int getRequestCount() { + return requestCount.get(); + } +} + diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/LambdaRetryStrategyTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/LambdaRetryStrategyTest.java new file mode 100644 index 0000000000..064b24d8fc --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/utils/LambdaRetryStrategyTest.java @@ -0,0 +1,74 @@ +package org.opensearch.dataprepper.plugins.lambda.utils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; +import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; +import org.opensearch.dataprepper.plugins.lambda.common.util.LambdaRetryStrategy; +import org.slf4j.Logger; +import software.amazon.awssdk.services.lambda.LambdaAsyncClient; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class LambdaRetryStrategyTest { + + @Mock + private LambdaAsyncClient lambdaAsyncClient; + + @Mock + private Buffer buffer; + + @Mock + private LambdaCommonConfig config; + + @Mock + private Logger logger; + + @BeforeEach + void setUp() { +// when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(CompletableFuture.completedFuture(InvokeResponse.builder().statusCode(200).build())); + when(config.getClientOptions()).thenReturn(mock(ClientOptions.class)); + when(config.getClientOptions().getMaxConnectionRetries()).thenReturn(3); + when(config.getClientOptions().getBaseDelay()).thenReturn(Duration.ofMillis(100)); + when(config.getFunctionName()).thenReturn("testFunction"); + when(config.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); + } + + @Test + void testIsRetryable() { + assertTrue(LambdaRetryStrategy.isRetryableStatusCode(429)); + assertTrue(LambdaRetryStrategy.isRetryableStatusCode(500)); + assertFalse(LambdaRetryStrategy.isRetryableStatusCode(200)); + } + + @Test + void testIsNonRetryable() { + assertTrue(LambdaRetryStrategy.isNonRetryable(InvokeResponse.builder().statusCode(400).build())); + assertTrue(LambdaRetryStrategy.isNonRetryable(InvokeResponse.builder().statusCode(403).build())); + assertFalse(LambdaRetryStrategy.isNonRetryable(InvokeResponse.builder().statusCode(500).build())); + assertFalse(LambdaRetryStrategy.isNonRetryable(null)); + } + + @Test + void testIsTimeoutError() { + assertTrue(LambdaRetryStrategy.isTimeoutError(InvokeResponse.builder().statusCode(408).build())); + assertTrue(LambdaRetryStrategy.isTimeoutError(InvokeResponse.builder().statusCode(429).build())); + assertFalse(LambdaRetryStrategy.isTimeoutError(InvokeResponse.builder().statusCode(200).build())); + } + +} diff --git a/data-prepper-plugins/aws-lambda/src/test/resources/lambda-processor-with-retries.yaml b/data-prepper-plugins/aws-lambda/src/test/resources/lambda-processor-with-retries.yaml new file mode 100644 index 0000000000..c518d0d335 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/resources/lambda-processor-with-retries.yaml @@ -0,0 +1,15 @@ +function_name: "lambdaProcessorTest" +response_events_match: true +tags_on_failure: [ "lambda_failure" ] +batch: + key_name: "osi_key" + threshold: + event_count: 100 + maximum_size: 1mb + event_collect_timeout: 335 +client: + max_retries: 50 + max_concurrency: 5 +aws: + region: "us-east-1" + sts_role_arn: "arn:aws:iam::1234567890:role/sample-pipeine-role" \ No newline at end of file diff --git a/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java b/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java index eeaedf4ec1..69960f7dd7 100644 --- a/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java +++ b/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java @@ -68,6 +68,7 @@ public void add(final T record) throws Exception { public void flush() throws Exception { try { + LOG.debug("Flushing buffer accumulator"); flushAccumulatedToBuffer(); } catch (final TimeoutException timeoutException) { flushWithBackoff(); @@ -80,11 +81,13 @@ private boolean flushWithBackoff() throws Exception{ boolean flushedSuccessfully; for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) { + LOG.debug("Retrying buffer flush on retry count {}", retryCount); final ScheduledFuture flushBufferFuture = scheduledExecutorService.schedule(() -> { try { flushAccumulatedToBuffer(); return true; } catch (final TimeoutException e) { + LOG.debug("Timed out retrying buffer accumulator"); return false; } }, nextDelay, TimeUnit.MILLISECONDS); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java index 56377c1f22..a97b68d0f3 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -26,8 +26,8 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig private static final Long DEFAULT_RETENTION_PERIOD = 604800000L; static final boolean DEFAULT_AUTO_COMMIT = false; static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb"); - static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500); - static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b"); + static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(1000); + static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("2kb"); static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb"); static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index f8ec9c4d91..336f29fe0f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -64,6 +64,8 @@ public class KafkaBuffer extends AbstractBuffer> { private final AbstractBuffer> innerBuffer; private final ExecutorService executorService; private final Duration drainTimeout; + + private final List consumers; private AtomicBoolean shutdownInProgress; private ByteDecoder byteDecoder; @@ -83,7 +85,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); this.shutdownInProgress = new AtomicBoolean(false); final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName()); - final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), + this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker); this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId())); this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE)); @@ -233,6 +235,9 @@ public void shutdown() { executorService.shutdownNow(); } + LOG.info("Closing {} consumers", consumers.size()); + consumers.forEach(KafkaCustomConsumer::closeConsumer); + innerBuffer.shutdown(); } finally { resetMdc(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index a84f800d8d..a07d2f5130 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -204,6 +204,7 @@ ConsumerRecords doPoll() throws Exception { void consumeRecords() throws Exception { try { ConsumerRecords records = doPoll(); + LOG.debug("Consumed records with count {}", records.count()); if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { Map offsets = new HashMap<>(); AcknowledgementSet acknowledgementSet = null; @@ -367,6 +368,7 @@ public void run() { boolean retryingAfterException = false; while (!shutdownInProgress.get()) { + LOG.debug("Still running Kafka consumer in start of loop"); try { if (retryingAfterException) { LOG.debug("Pause consuming from Kafka topic due a previous exception."); @@ -382,12 +384,15 @@ public void run() { paused = false; consumer.resume(consumer.assignment()); } + LOG.debug("Still running Kafka consumer preparing to commit offsets and consume records"); synchronized(this) { commitOffsets(false); resetOffsets(); } consumeRecords(); + LOG.debug("Exited consume records"); topicMetrics.update(consumer); + LOG.debug("Updated consumer metrics"); retryingAfterException = false; } catch (Exception exp) { LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp); @@ -475,6 +480,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } long numRetries = 0; while (true) { + LOG.debug("In while loop for processing records, paused = {}", paused); try { if (numRetries == 0) { bufferAccumulator.add(record); @@ -485,7 +491,9 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } catch (Exception e) { if (!paused && numRetries++ > maxRetriesOnException) { paused = true; + LOG.debug("Preparing to call pause"); consumer.pause(consumer.assignment()); + LOG.debug("Pause was called"); } if (e instanceof SizeOverflowException) { topicMetrics.getNumberOfBufferSizeOverflows().increment(); @@ -493,8 +501,10 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re LOG.debug("Error while adding record to buffer, retrying ", e); } try { + LOG.debug("Sleeping due to exception"); Thread.sleep(RETRY_ON_EXCEPTION_SLEEP_MS); if (paused) { + LOG.debug("Calling doPoll()"); ConsumerRecords records = doPoll(); if (records.count() > 0) { LOG.warn("Unexpected records received while the consumer is paused. Resetting the partitions to retry from last read pointer"); @@ -509,6 +519,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } if (paused) { + LOG.debug("Resuming consumption"); consumer.resume(consumer.assignment()); paused = false; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index 0d091b8af7..1981f6a60a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -12,6 +12,7 @@ import io.confluent.kafka.serializers.KafkaJsonDeserializer; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.serialization.Deserializer; @@ -167,6 +168,7 @@ public static void setConsumerTopicProperties(final Properties properties, final properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes()); + properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); } private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java index 1fd03f8aff..60f5d282bd 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.Objects; @@ -17,6 +19,7 @@ import java.util.HashMap; public class KafkaTopicConsumerMetrics { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class); static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; @@ -82,7 +85,10 @@ private void initializeMetricNamesMap(final boolean topicNameInMetrics) { double max = 0.0; for (Map.Entry> entry : metricValues.entrySet()) { Map consumerMetrics = entry.getValue(); - synchronized(consumerMetrics) { + synchronized (consumerMetrics) { + if (consumerMetrics.get(metricName) == null) { + LOG.debug("No consumer metric for recordsLagMax found"); + } max = Math.max(max, consumerMetrics.get(metricName)); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 99f2afa76b..f7cae5e416 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -334,6 +334,7 @@ public void testShutdown_Successful() throws InterruptedException { kafkaBuffer.shutdown(); verify(executorService).shutdown(); verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); + verify(consumer).closeConsumer(); } @Test diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index d7ab374684..f2587a079d 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; @@ -167,17 +168,20 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) { progressState.setWaitForExport(sourceConfig.isExportEnabled()); progressState.setPrimaryKeyMap(getPrimaryKeyMap()); if (sourceConfig.getEngine() == EngineType.MYSQL) { - final MySqlStreamState mySqlStreamState = progressState.getMySqlStreamState(); + final MySqlStreamState mySqlStreamState = new MySqlStreamState(); getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition); mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames())); + progressState.setMySqlStreamState(mySqlStreamState); } else { // Postgres // Create replication slot, which will mark the starting point for stream final String publicationName = generatePublicationName(); final String slotName = generateReplicationSlotName(); ((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName); - progressState.getPostgresStreamState().setPublicationName(publicationName); - progressState.getPostgresStreamState().setReplicationSlotName(slotName); + final PostgresStreamState postgresStreamState = new PostgresStreamState(); + postgresStreamState.setPublicationName(publicationName); + postgresStreamState.setReplicationSlotName(slotName); + progressState.setPostgresStreamState(postgresStreamState); } StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); sourceCoordinator.createPartition(streamPartition); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 130f004960..22935fc6e3 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -25,6 +25,8 @@ public class LogicalReplicationClient implements ReplicationLogClient { private static final Logger LOG = LoggerFactory.getLogger(LogicalReplicationClient.class); + static final String PROTO_VERSION_KEY = "proto_version"; + static final String VERSION_ONE = "1"; static final String PUBLICATION_NAMES_KEY = "publication_names"; private final ConnectionManager connectionManager; @@ -36,10 +38,10 @@ public class LogicalReplicationClient implements ReplicationLogClient { private volatile boolean disconnectRequested = false; public LogicalReplicationClient(final ConnectionManager connectionManager, - final String replicationSlotName, - final String publicationName) { - this.publicationName = publicationName; + final String publicationName, + final String replicationSlotName) { this.connectionManager = connectionManager; + this.publicationName = publicationName; this.replicationSlotName = replicationSlotName; } @@ -54,6 +56,7 @@ public void connect() { .replicationStream() .logical() .withSlotName(replicationSlotName) + .withSlotOption(PROTO_VERSION_KEY, VERSION_ONE) .withSlotOption(PUBLICATION_NAMES_KEY, publicationName); if (startLsn != null) { logicalStreamBuilder.withStartPosition(startLsn); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index f9881d0063..3d5c1a04b1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -158,7 +158,7 @@ void processRelationMessage(ByteBuffer msg) { } void processCommitMessage(ByteBuffer msg) { - int flag = msg.getInt(); + int flag = msg.get(); long commitLsn = msg.getLong(); long endLsn = msg.getLong(); long epochMicro = msg.getLong(); diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java index 4600d1bdeb..415c4d54c8 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraService.java @@ -166,7 +166,7 @@ private StringBuilder createIssueFilterCriteria(JiraSourceConfig configuration, .collect(Collectors.joining(DELIMITER, PREFIX, SUFFIX))) .append(CLOSING_ROUND_BRACKET); } - log.error("Created issue filter criteria JiraQl query: {}", jiraQl); + log.info("Created issue filter criteria JiraQl query: {}", jiraQl); return jiraQl; } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index 3cb7b9501c..4328181fea 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; import lombok.Getter; import org.opensearch.dataprepper.plugins.source.jira.configuration.AuthenticationConfig; import org.opensearch.dataprepper.plugins.source.jira.configuration.FilterConfig; @@ -30,6 +31,11 @@ public class JiraSourceConfig implements CrawlerSourceConfig { @JsonProperty("hosts") private List hosts; + @AssertTrue(message = "Jira hosts must be a list of length 1") + boolean isValidHosts() { + return hosts != null && hosts.size() == 1; + } + /** * Authentication Config to Access Jira */ diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java index 92420ac319..28a71f55ee 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/JiraRestClient.java @@ -51,13 +51,11 @@ public class JiraRestClient { public static final List RETRY_ATTEMPT_SLEEP_TIME = List.of(1, 2, 5, 10, 20, 40); private static final String TICKET_FETCH_LATENCY_TIMER = "ticketFetchLatency"; private static final String SEARCH_CALL_LATENCY_TIMER = "searchCallLatency"; - private static final String PROJECTS_FETCH_LATENCY_TIMER = "projectFetchLatency"; private static final String ISSUES_REQUESTED = "issuesRequested"; private final RestTemplate restTemplate; private final JiraAuthConfig authConfig; private final Timer ticketFetchLatencyTimer; private final Timer searchCallLatencyTimer; - private final Timer projectFetchLatencyTimer; private final Counter issuesRequestedCounter; private final PluginMetrics jiraPluginMetrics = PluginMetrics.fromNames("jiraRestClient", "aws"); private int sleepTimeMultiplier = 1000; @@ -68,8 +66,6 @@ public JiraRestClient(RestTemplate restTemplate, JiraAuthConfig authConfig) { ticketFetchLatencyTimer = jiraPluginMetrics.timer(TICKET_FETCH_LATENCY_TIMER); searchCallLatencyTimer = jiraPluginMetrics.timer(SEARCH_CALL_LATENCY_TIMER); - projectFetchLatencyTimer = jiraPluginMetrics.timer(PROJECTS_FETCH_LATENCY_TIMER); - issuesRequestedCounter = jiraPluginMetrics.counter(ISSUES_REQUESTED); } @@ -119,20 +115,24 @@ private ResponseEntity invokeRestApi(URI uri, Class responseType) thro } catch (HttpClientErrorException ex) { HttpStatus statusCode = ex.getStatusCode(); String statusMessage = ex.getMessage(); - log.error("An exception has occurred while getting response from Jira search API {}", ex.getMessage()); + log.error(NOISY, "An exception has occurred while getting response from Jira search API with statusCode {} and error message: {}", statusCode, statusMessage); if (statusCode == HttpStatus.FORBIDDEN) { throw new UnAuthorizedException(statusMessage); } else if (statusCode == HttpStatus.UNAUTHORIZED) { - log.error(NOISY, "Token expired. We will try to renew the tokens now", ex); + log.warn(NOISY, "Token expired. We will try to renew the tokens now."); authConfig.renewCredentials(); - } else if (statusCode == HttpStatus.TOO_MANY_REQUESTS) { - log.error(NOISY, "Hitting API rate limit. Backing off with sleep timer.", ex); + } else if (statusCode == HttpStatus.TOO_MANY_REQUESTS || statusCode == HttpStatus.SERVICE_UNAVAILABLE || statusCode == HttpStatus.GATEWAY_TIMEOUT) { + log.error(NOISY, "Received {}. Backing off with sleep timer for {} seconds.", statusCode, RETRY_ATTEMPT_SLEEP_TIME.get(retryCount)); + } else { + log.error(NOISY, "Received an unexpected status code {} response from Jira.", statusCode, ex); } try { Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * sleepTimeMultiplier); } catch (InterruptedException e) { - throw new RuntimeException("Sleep in the retry attempt got interrupted", e); + throw new RuntimeException("Sleep in the retry attempt got interrupted."); } + } catch (Exception ex) { + log.error(NOISY, "An exception has occurred while getting a response from the Jira search API", ex); } retryCount++; } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java index ddcf1c8468..9640ba3818 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java @@ -100,7 +100,7 @@ public String getJiraAccountCloudId() { if (e.getRawStatusCode() == HttpStatus.UNAUTHORIZED.value()) { renewCredentials(); } - log.error("Error occurred while accessing resources: ", e); + log.error("Error occurred while accessing resources. Status code: {}. Error message: {}", e.getStatusCode(), e.getMessage()); } } throw new UnAuthorizedException(String.format("Access token expired. Unable to renew even after %s attempts", RETRY_ATTEMPT)); @@ -153,6 +153,7 @@ public void renewCredentials() { this.accessToken = (String) oauth2Config.getAccessToken().getValue(); this.refreshToken = (String) oauth2Config.getRefreshToken().getValue(); this.expireTime = Instant.now().plusSeconds(10); + log.info("Access Token and Refresh Token pair is now refreshed."); } throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/utils/AddressValidation.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/utils/AddressValidation.java index d6cc166226..e82acb2a07 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/utils/AddressValidation.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/utils/AddressValidation.java @@ -38,7 +38,7 @@ public static InetAddress getInetAddress(String url) { try { return InetAddress.getByName(new URL(url).getHost()); } catch (UnknownHostException | MalformedURLException e) { - log.error(INVALID_URL, e); + log.error("{}: {}", INVALID_URL, url); throw new BadRequestException(e.getMessage(), e); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java index a54e50d36f..4a013e9373 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java @@ -75,7 +75,7 @@ public LeaderProgressState convertToPartitionState(final String serializedPartit try { return objectMapper.readValue(serializedPartitionProgressState, LeaderProgressState.class); } catch (final JsonProcessingException e) { - LOG.error("Unable to convert string to partition progress state class ", e); + LOG.error("Unable to convert string to partition progress state class due to {}. Partition progress state string: {}.", e.getMessage(), serializedPartitionProgressState); return null; } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java index f2fc7e4b40..e738c0e19c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java @@ -89,7 +89,7 @@ public void run() { try { Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); } catch (InterruptedException ex) { - log.warn("Thread interrupted while waiting to retry", ex); + log.warn("Thread interrupted while waiting to retry due to {}", ex.getMessage()); } } }