diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index c8e6e7c3968..f32095bee84 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -71,7 +71,8 @@ def __init__( is_integration_test=False, blocking=True, additional_pipeline_args=None, - display_data=None): + display_data=None, + timeout=None): """Initialize a pipeline object for test. Args: @@ -96,6 +97,8 @@ def __init__( included when construction the pipeline options object. display_data (Dict[str, Any]): a dictionary of static data associated with this pipeline that can be displayed when it runs. + timeout (float optional): The time in milliseconds to wait for the pipeline to finish + before raising a timeout exception. If None, will wait indefinitely. Raises: ValueError: if either the runner or options argument is not @@ -107,6 +110,7 @@ def __init__( self.options_list = ( self._parse_test_option_args(argv) + additional_pipeline_args) self.blocking = blocking + self.timeout = timeout if options is None: options = PipelineOptions(self.options_list) super().__init__(runner, options, display_data=display_data) @@ -116,9 +120,9 @@ def run(self, test_runner_api=True): test_runner_api=( False if self.not_use_test_runner_api else test_runner_api)) if self.blocking: - state = result.wait_until_finish() + state = result.wait_until_finish(duration=self.timeout) assert state in (PipelineState.DONE, PipelineState.CANCELLED), \ - "Pipeline execution failed." + "Pipeline execution failed." return result