-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stateful buffer for lambda sink #5354
base: main
Are you sure you want to change the base?
Add stateful buffer for lambda sink #5354
Conversation
|
||
// The partial buffer that may not yet have reached threshold. | ||
// Access must be synchronized | ||
private Buffer statefulBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should handle concurrent actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, all i have used "synchronized" on the methods where this is used. In s3 sink, they use reentrant lock to handle something similar.
for (Buffer buf : buffersToFlush) { | ||
combinedRecords.addAll(buf.getRecords()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be not clear on the overall approach here but Why are we combining the buffers here? also, may be not a big deal but we are creating additional copy of the payload here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have addressed the comment, this might not be needed for flushing to lambda but needed for handlingFailures. Lets say the "full buffer" records have some failures, only those have to be tagged and forwarded(failure behaviour), the remaning records should persist for the next doOutput should not go through the failure handling mechanism. I have refactored the lambdaCommonHandler to take in Buffer and moved this logic within the catch.
|
||
verify(numberOfRecordsFailedCounter, times(1)).increment(1); | ||
// Utility to set private fields | ||
private static void setPrivateField(Object target, String fieldName, Object value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a data-prepper-test-common
module that has these kind of common reusable methods used for test cases. We can reuse these utilities from there and avoid adding these methods in every plugin.
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
ced7a2d
to
3d9fb37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
0795cdc
to
debad91
Compare
Description
Add Stateful Buffering to LambdaSink
Issues Resolved
Resolves #5353
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.