-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37206][Runtime] Fix initialization of batching timer service in async state operators #26071
base: master
Are you sure you want to change the base?
Conversation
* Sets the current key. Timers that are due to be fired are collected and will be triggered. | ||
*/ | ||
@Override | ||
public void setCurrentKey(K currentKey) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core difference between timer service for async operators and the original one is here. Replace direct call of triggerTarget.onXXXXXTime()
with a wrapper of maintainContextAndProcess
@fredia @codenohup Would you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fix! @Zakelly
I have verified this PR by running a WordCount job in DataStream V2 API and batch mode.
Thanks @codenohup ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, overall LGTM, only one minor comment.
} else if (factory instanceof AsyncKeyedStateBackendAdaptor) { | ||
theFactory = | ||
(BatchExecutionKeyedStateBackend<K>) | ||
((AsyncKeyedStateBackendAdaptor<K>) factory).getKeyedStateBackend(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check ((AsyncKeyedStateBackendAdaptor<K>) factory).getKeyedStateBackend()
is BatchExecutionKeyedStateBackend
here?
And update the description of IllegalStateException
below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated accordingly.
…n async state operators
What is the purpose of the change
We have ship two abstract stream operators for async state processing, where a async timer service (manager) is introduced. Actually these operators can run in sync mode, this is especially the case for batch execution mode. Currently there is an issue when initializing batching timer service in those operators. This PR fixes this to enable batch execution for operators with async state.
Brief change log
BatchExecutionInternalTimeServiceWithAsyncState
and properly create this in async state operators.Verifying this change
This change added test
BatchExecutionInternalTimeServiceWithAsyncStateTest
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation