Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37194][tests] Fix NPE issue in WatermarkITCase #26048

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

showuon
Copy link
Member

@showuon showuon commented Jan 22, 2025

What is the purpose of the change

Fix NPE issue in WatermarkITCase.
The NPE is because here, the latter operation (i.e. receivedWatermarks.computeIfAbsent) might not be run immediately after the former operation (i.e. attemptIds.compute).

And during the main testing thread, we only wait for attemptIds size is expected. So, after the attemptIds size is expected, when we try to verify assertOperatorReceivedWatermarkValues, we might get null value because in the other thread, the receivedWatermarks.computeIfAbsent is not executed, yet.

To fix this issue, we just need to wait for the receivedWatermarks size, not the attemptIds size. But since this issue exists in many other test cases in WatermarkITCase, I extracted the common codes into a helper method: submitJobAndWaitForOperator3Initialized.

Brief change log

Fix NPE issue in WatermarkITCase.

Verifying this change

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable


// wait all operator3 tasks have initialized completely
tryWaitUntilCondition(
() -> Operator3ProcessFunction.receivedWatermarks.size() == DEFAULT_PARALLELISM);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the main fix, by waiting for receivedWatermarks size, not attemptIds size.

Copy link
Contributor

@codenohup codenohup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks for the fix.

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@davidradl
Copy link
Contributor

Reviewed by Chi on 23/01/2025 Need a committer to review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants