diff --git a/examples/01-tasks/notebook.yaml b/examples/01-tasks/notebook.yaml index d0c48965..1c9d83e8 100644 --- a/examples/01-tasks/notebook.yaml +++ b/examples/01-tasks/notebook.yaml @@ -22,9 +22,9 @@ dag: You can run this pipeline as: runnable execute -f examples/01-tasks/notebook.yaml - start_at: notebook + start_at: hello steps: - notebook: + hello: type: task command_type: notebook command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. diff --git a/examples/01-tasks/python_tasks.yaml b/examples/01-tasks/python_tasks.yaml index c5e5d6d4..78391b7a 100644 --- a/examples/01-tasks/python_tasks.yaml +++ b/examples/01-tasks/python_tasks.yaml @@ -15,9 +15,9 @@ dag: 2 directories, 1 file The hello.execution.log has the captured stdout of "Hello World!". - start_at: hello_task + start_at: hello steps: - hello_task: + hello: type: task command: examples.common.functions.hello # dotted path to the function. next: success diff --git a/examples/02-sequential/on_failure_succeed.yaml b/examples/02-sequential/on_failure_succeed.yaml index 27a19b58..02200ff7 100644 --- a/examples/02-sequential/on_failure_succeed.yaml +++ b/examples/02-sequential/on_failure_succeed.yaml @@ -15,20 +15,20 @@ dag: Run this pipeline as: runnable execute -f examples/02-sequential/on_failure_succeed.yaml - start_at: step_1 + start_at: step 1 steps: - step_1: + step 1: type: task command_type: shell command: exit 1 # This will fail! - next: step_2 - on_failure: step_4 - step_2: + next: step 2 + on_failure: step 4 + step 2: type: stub - next: step_3 - step_3: + next: step 3 + step 3: type: stub next: success - step_4: + step 4: type: stub next: success diff --git a/examples/03-parameters/passing_parameters_notebook.yaml b/examples/03-parameters/passing_parameters_notebook.yaml index aa56d28f..9e5ba0f8 100644 --- a/examples/03-parameters/passing_parameters_notebook.yaml +++ b/examples/03-parameters/passing_parameters_notebook.yaml @@ -11,9 +11,9 @@ dag: Run the below example as: runnable execute examples/03-parameters/passing_parameters_notebook.yaml - start_at: write_parameters_from_notebook + start_at: set_parameter steps: - write_parameters_from_notebook: + set_parameter: type: task command_type: notebook command: examples/common/write_parameters.ipynb @@ -26,8 +26,8 @@ dag: - name: pydantic_param - name: score kind: metric - next: read_parameters - read_parameters: + next: get_parameters + get_parameters: type: task command: examples.common.functions.read_parameter next: read_parameters_in_notebook diff --git a/examples/03-parameters/passing_parameters_python.yaml b/examples/03-parameters/passing_parameters_python.yaml index 1919e786..1d45646b 100644 --- a/examples/03-parameters/passing_parameters_python.yaml +++ b/examples/03-parameters/passing_parameters_python.yaml @@ -16,9 +16,9 @@ dag: Run the pipeline as: runnable execute -f examples/03-parameters/passing_parameters_python.yaml - start_at: write_parameters + start_at: set_parameter steps: - write_parameters: + set_parameter: type: task command: examples.common.functions.write_parameter returns: @@ -30,9 +30,8 @@ dag: - name: pydantic_param - name: score kind: metric - - next: read_parameters - read_parameters: + next: get_parameters + get_parameters: type: task command: examples.common.functions.read_parameter next: success diff --git a/examples/03-parameters/passing_parameters_shell.yaml b/examples/03-parameters/passing_parameters_shell.yaml index 51977425..350fd5ea 100644 --- a/examples/03-parameters/passing_parameters_shell.yaml +++ b/examples/03-parameters/passing_parameters_shell.yaml @@ -12,9 +12,9 @@ dag: Run the pipeline as: runnable execute -f examples/03-parameters/passing_parameters_shell.yaml - start_at: write_parameters_in_shell + start_at: write_parameter steps: - write_parameters_in_shell: + write_parameter: type: task command_type: shell command: | diff --git a/examples/03-parameters/static_parameters_fail.py b/examples/03-parameters/static_parameters_fail.py new file mode 100644 index 00000000..ae93c5e7 --- /dev/null +++ b/examples/03-parameters/static_parameters_fail.py @@ -0,0 +1,65 @@ +""" +The below example showcases setting up known initial parameters for a pipeline +of only python tasks + +The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + +runnable allows using pydantic models for deeply nested parameters and +casts appropriately based on annotation. eg: read_initial_params_as_pydantic + +If no annotation is provided, the parameter is assumed to be a dictionary. +eg: read_initial_params_as_json + +You can set the initial parameters from environment variables as well. +eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + +Run this pipeline as: + python examples/03-parameters/static_parameters_python.py + +""" + +import os + +from examples.common.functions import raise_ex +from runnable import NotebookTask, Pipeline, PythonTask + + +def main(): + read_params_in_notebook = NotebookTask( + name="read_params_in_notebook", + notebook="examples/common/read_parameters.ipynb", + terminate_with_success=True, + ) + + notebook_pipeline = Pipeline( + steps=[ + read_params_in_notebook, + ], + ) + read_params_and_fail = PythonTask( + function=raise_ex, + name="read_params_and_fail", + terminate_with_success=True, + ) + + read_params_and_fail.on_failure = notebook_pipeline + + python_pipeline = Pipeline( + steps=[ + read_params_and_fail, + ], + ) + + python_pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return python_pipeline + + +if __name__ == "__main__": + # Any parameter prefixed by "RUNNABLE_PRM_" will be picked up by runnable + os.environ["RUNNABLE_PRM_envvar"] = "from env" + main() diff --git a/examples/03-parameters/static_parameters_fail.yaml b/examples/03-parameters/static_parameters_fail.yaml new file mode 100644 index 00000000..66c67926 --- /dev/null +++ b/examples/03-parameters/static_parameters_fail.yaml @@ -0,0 +1,35 @@ +dag: + description: | + The below example showcases setting up known initial parameters for a pipeline + of only python tasks + + The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + + runnable allows using pydantic models for deeply nested parameters and + casts appropriately based on annotation. eg: read_initial_params_as_pydantic + + If no annotation is provided, the parameter is assumed to be a dictionary. + eg: read_initial_params_as_json + + You can set the initial parameters from environment variables as well. + eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + + Run this pipeline by: + runnable execute -f 03-parameters/static_parameters_python.yaml \ + -p examples/common/initial_parameters.yaml + start_at: read_params_and_fail + steps: + read_params_and_fail: + type: task + command: examples.common.functions.raise_ex + next: success + on_failure: read_params_in_notebook + read_params_in_notebook: + type: task + command_type: notebook + command: examples/common/read_parameters.ipynb + next: success diff --git a/examples/03-parameters/static_parameters_python.py b/examples/03-parameters/static_parameters_python.py index 0a38c7b9..6f0cc9be 100644 --- a/examples/03-parameters/static_parameters_python.py +++ b/examples/03-parameters/static_parameters_python.py @@ -47,19 +47,10 @@ def read_initial_params_as_pydantic( name="read_params_as_pydantic", ) - """ - Signature of read_initial_params_as_json - def read_initial_params_as_json( - integer: int, - floater: float, - stringer: str, - pydantic_param: Dict[str, Union[int, str]], - ): - """ read_params_as_json = PythonTask( function=read_initial_params_as_json, terminate_with_success=True, - name="read_params_json", + name="read_params_as_json", ) pipeline = Pipeline( diff --git a/examples/03-parameters/static_parameters_python.yaml b/examples/03-parameters/static_parameters_python.yaml index 8448cc1f..c57b1f4a 100644 --- a/examples/03-parameters/static_parameters_python.yaml +++ b/examples/03-parameters/static_parameters_python.yaml @@ -26,8 +26,8 @@ dag: read_params_as_pydantic: type: task command: examples.common.functions.read_initial_params_as_pydantic - next: read_params_json - read_params_json: + next: read_params_as_json + read_params_as_json: type: task command: examples.common.functions.read_initial_params_as_json next: success diff --git a/examples/04-catalog/catalog_python.py b/examples/04-catalog/catalog_python.py new file mode 100644 index 00000000..088ed768 --- /dev/null +++ b/examples/04-catalog/catalog_python.py @@ -0,0 +1,45 @@ +from examples.common.functions import read_files, write_files +from runnable import Catalog, Pipeline, PythonTask, ShellTask + + +def main(): + write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"]) + generate_data = PythonTask( + name="generate_data_python", + function=write_files, + catalog=write_catalog, + ) + + delete_files_command = """ + rm df.csv || true && \ + rm data_folder/data.txt || true + """ + # delete from local files after generate + # since its local catalog, we delete to show "get from catalog" + delete_local_after_generate = ShellTask( + name="delete_after_generate", + command=delete_files_command, + ) + + read_catalog = Catalog(get=["df.csv", "data_folder/data.txt"]) + read_data_python = PythonTask( + name="read_data_python", + function=read_files, + catalog=read_catalog, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[ + generate_data, + delete_local_after_generate, + read_data_python, + ] + ) + _ = pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/04-catalog/catalog_python.yaml b/examples/04-catalog/catalog_python.yaml new file mode 100644 index 00000000..54990700 --- /dev/null +++ b/examples/04-catalog/catalog_python.yaml @@ -0,0 +1,27 @@ +dag: + start_at: generate_data_python + steps: + generate_data_python: + type: task + command: examples.common.functions.write_files + catalog: + put: + - df.csv + - data_folder/data.txt + next: delete_files_after_generate + delete_files_after_generate: + type: task + command_type: shell + command: | + rm df.csv || true && \ + rm data_folder/data.txt || true + next: read_data_python + read_data_python: + type: task + command_type: python + command: examples.common.functions.read_files + catalog: + get: + - df.csv + - data_folder/data.txt + next: success diff --git a/examples/06-parallel/parallel.py b/examples/06-parallel/parallel.py index 4be49423..0aaed3a2 100644 --- a/examples/06-parallel/parallel.py +++ b/examples/06-parallel/parallel.py @@ -1,16 +1,3 @@ -""" -This example demonstrates the use of the Parallel step. - -The branches of the parallel step are themselves pipelines and can be defined -as shown in 02-sequential/traversal.py. - -WARNING, the function returning the pipeline should not executed -during the definition of the branch in parallel steps. - -Run this pipeline as: - python examples/06-parallel/parallel.py -""" - from examples.common.functions import hello from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub diff --git a/examples/06-parallel/parallel_branch_fail.py b/examples/06-parallel/parallel_branch_fail.py new file mode 100644 index 00000000..fd0b4732 --- /dev/null +++ b/examples/06-parallel/parallel_branch_fail.py @@ -0,0 +1,87 @@ +from examples.common.functions import hello, raise_ex +from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub + + +def traversal_success(): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + stub_task = Stub(name="hello stub") + + python_task = PythonTask( + name="hello python", + function=hello, + ) + + shell_task = ShellTask( + name="hello shell", + command="echo 'Hello World!'", + ) + + notebook_task = NotebookTask( + name="hello notebook", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has a mix of tasks. + # The order of execution follows the order of the tasks in the list. + pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + + return pipeline + + +def traversal_fail(): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + stub_task = Stub(name="hello stub") + + python_task = PythonTask( + name="hello python", + function=raise_ex, + ) + + shell_task = ShellTask( + name="hello shell", + command="echo 'Hello World!'", + ) + + notebook_task = NotebookTask( + name="hello notebook", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has a mix of tasks. + # The order of execution follows the order of the tasks in the list. + pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + + return pipeline + + +def main(): + parallel_step = Parallel( + name="parallel_step", + terminate_with_success=True, + branches={"branch1": traversal_success(), "branch2": traversal_fail()}, + ) + + pipeline = Pipeline(steps=[parallel_step]) + + pipeline.execute() + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/06-parallel/parallel_branch_fail.yaml b/examples/06-parallel/parallel_branch_fail.yaml new file mode 100644 index 00000000..1891df9a --- /dev/null +++ b/examples/06-parallel/parallel_branch_fail.yaml @@ -0,0 +1,72 @@ +success_branch: &success_branch + description: | + Use this pattern to define repeatable branch + + This pipeline is the same as the one defined in examples/02-sequential/traversal.yaml + start_at: hello stub + steps: + hello stub: + type: stub + next: hello python + hello python: + type: task + command_type: python + command: examples.common.functions.hello # dotted path to the function. + next: hello shell + hello shell: + type: task + command_type: shell + command: echo "Hello World!" # Command to run + next: hello notebook + hello notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + + +fail_branch: &fail_branch + description: | + Use this pattern to define repeatable branch + + This pipeline is the same as the one defined in examples/02-sequential/traversal.yaml + start_at: hello stub + steps: + hello stub: + type: stub + next: hello python + hello python: + type: task + command_type: python + command: examples.common.functions.raise_ex # dotted path to the function. + next: hello shell + hello shell: + type: task + command_type: shell + command: echo "Hello World!" # Command to run + next: hello notebook + hello notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + + +dag: + description: | + This example demonstrates the use of the Parallel step. + + parallel step takes a mapping of branches which are pipelines themselves. + + Run this pipeline as: + runnable execute -f examples/06-parallel/parallel.yaml + + + start_at: parallel_step + steps: + parallel_step: + type: parallel + next: success + branches: + branch1: *success_branch + branch2: *fail_branch diff --git a/examples/07-map/map.py b/examples/07-map/map.py index 4a5809e5..206462de 100644 --- a/examples/07-map/map.py +++ b/examples/07-map/map.py @@ -1,27 +1,3 @@ -""" -map states allows to repeat a branch for each value of an iterable. - -The below example can written, in python, as: - -chunks = [1, 2, 3] - -for chunk in chunks: - # Any task within the pipeline can access the value of chunk as an argument. - processed = process_chunk(chunk) - - # The value of processed for every iteration is the value returned by the steps - # of the current execution. For example, the value of processed - # for chunk=1, is chunk*10 = 10 for downstream steps. - read_processed_chunk(chunk, processed) - -# Outside of loop, processed is a list of all the processed chunks. -# This is also called as the reduce pattern. -assert processed == [chunk * 10 for chunk in chunks] - -Run this pipeline as: - python examples/07-map/map.py -""" - from examples.common.functions import ( assert_default_reducer, process_chunk, @@ -105,7 +81,7 @@ def main(): # Create a map state which iterates over a list of chunks. # chunk is the value of the iterable. map_state = Map( - name="map state", + name="map_state", iterate_on="chunks", iterate_as="chunk", branch=iterable_branch(execute=False), diff --git a/examples/07-map/map_fail.py b/examples/07-map/map_fail.py new file mode 100644 index 00000000..fce32506 --- /dev/null +++ b/examples/07-map/map_fail.py @@ -0,0 +1,109 @@ +from examples.common.functions import ( + assert_default_reducer, + process_chunk_fail, + read_processed_chunk, +) +from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask + + +def iterable_branch(execute: bool = True): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + # The python function to process a single chunk of data. + # In the example, we are multiplying the chunk by 10. + process_chunk_task_python = PythonTask( + name="execute_python", + function=process_chunk_fail, + returns=["processed_python"], + ) + + # return parameters within a map branch have to be unique + # The notebook takes in the value of processed_python as an argument. + # and returns a new parameter "processed_notebook" which is 10*processed_python + process_chunk_task_notebook = NotebookTask( + name="execute_notebook", + notebook="examples/common/process_chunk.ipynb", + returns=["processed_notebook"], + ) + + # following the pattern, the shell takes in the value of processed_notebook as an argument. + # and returns a new parameter "processed_shell" which is 10*processed_notebook. + shell_command = """ + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + """ + + process_chunk_task_shell = ShellTask( + name="execute_shell", + command=shell_command, + returns=["processed_shell"], + ) + + # A downstream step of process_ which reads the parameter "processed". + # The value of processed is within the context of the branch. + # For example, for chunk=1, the value of processed_python is chunk*10 = 10 + # the value of processed_notebook is processed_python*10 = 100 + # the value of processed_shell is processed_notebook*10 = 1000 + read_chunk = PythonTask( + name="read processed chunk", + function=read_processed_chunk, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[ + process_chunk_task_python, + process_chunk_task_notebook, + process_chunk_task_shell, + read_chunk, + ], + ) + + if execute: + pipeline.execute() + + return pipeline + + +def main(): + # Create a map state which iterates over a list of chunks. + # chunk is the value of the iterable. + map_state = Map( + name="map_state", + iterate_on="chunks", + iterate_as="chunk", + branch=iterable_branch(execute=False), + ) + + # Outside of the loop, processed is a list of all the processed chunks. + # This is also called as the reduce pattern. + # the value of processed_python is [10, 20, 30] + # the value of processed_notebook is [100, 200, 300] + # the value of processed_shell is [1000, 2000, 3000] + collect = PythonTask( + name="collect", + function=assert_default_reducer, + terminate_with_success=True, + ) + + pipeline = Pipeline(steps=[map_state, collect]) + + pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/07-map/map_fail.yaml b/examples/07-map/map_fail.yaml new file mode 100644 index 00000000..6ad53a1f --- /dev/null +++ b/examples/07-map/map_fail.yaml @@ -0,0 +1,74 @@ +branch: &branch + start_at: execute_python + steps: + execute_python: + type: task + command: examples.common.functions.process_chunk_fail + returns: + - name: processed_python + next: execute_notebook + execute_notebook: + type: task + command_type: notebook + command: examples/common/process_chunk.ipynb + returns: + - name: processed_notebook + next: execute_shell + execute_shell: + type: task + command_type: shell + command: | + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + returns: + - name: processed_shell + next: read_chunk + read_chunk: + type: task + command: examples.common.functions.read_processed_chunk + next: success + + +dag: + description: | + map states allows to repeat a branch for each value of an iterable. + + The below example can written, in python, as: + + chunks = [1, 2, 3] + + for chunk in chunks: + # Any task within the pipeline can access the value of chunk as an argument. + processed = process_chunk(chunk) + + # The value of processed for every iteration is the value returned by the steps + # of the current execution. For example, the value of processed + # for chunk=1, is chunk*10 = 10 for downstream steps. + read_processed_chunk(chunk, processed) + + # Outside of loop, processed is a list of all the processed chunks. + # This is also called as the reduce pattern. + assert processed == [chunk * 10 for chunk in chunks] + + Run this pipeline as: + runnable execute -f examples/07-map/map.yaml \ + -p examples/common/initial_parameters.yaml + + start_at: map_state + steps: + map_state: + type: map + branch: *branch + iterate_on: chunks + iterate_as: chunk + next: collect + collect: + type: task + command: examples.common.functions.assert_default_reducer + next: success diff --git a/examples/configs/mocked-config-debug.yaml b/examples/08-mocking/mocked-config-debug.yaml similarity index 100% rename from examples/configs/mocked-config-debug.yaml rename to examples/08-mocking/mocked-config-debug.yaml diff --git a/examples/configs/mocked-config-simple.yaml b/examples/08-mocking/mocked-config-simple.yaml similarity index 100% rename from examples/configs/mocked-config-simple.yaml rename to examples/08-mocking/mocked-config-simple.yaml diff --git a/examples/configs/mocked-config-unittest.yaml b/examples/08-mocking/mocked-config-unittest.yaml similarity index 100% rename from examples/configs/mocked-config-unittest.yaml rename to examples/08-mocking/mocked-config-unittest.yaml diff --git a/examples/configs/mocked-config.yaml b/examples/08-mocking/mocked-config.yaml similarity index 100% rename from examples/configs/mocked-config.yaml rename to examples/08-mocking/mocked-config.yaml diff --git a/examples/configs/mocked_map_parameters.yaml b/examples/08-mocking/mocked_map_parameters.yaml similarity index 100% rename from examples/configs/mocked_map_parameters.yaml rename to examples/08-mocking/mocked_map_parameters.yaml diff --git a/examples/common/functions.py b/examples/common/functions.py index c21ce968..1b8f4190 100644 --- a/examples/common/functions.py +++ b/examples/common/functions.py @@ -126,6 +126,16 @@ def process_chunk(chunk: int): return chunk * 10 +def process_chunk_fail(chunk: int): + """ + An example function that processes a chunk of data. + We are multiplying the chunk by 10. + """ + if chunk == 1: + raise Exception("This is a failure") + return chunk * 10 + + def read_processed_chunk( chunk: int, processed_python: int, processed_notebook: int, processed_shell: int ): diff --git a/examples/common/read_parameters.ipynb b/examples/common/read_parameters.ipynb index b8935014..38a6b3fb 100644 --- a/examples/common/read_parameters.ipynb +++ b/examples/common/read_parameters.ipynb @@ -15,7 +15,8 @@ "stringer = None\n", "floater = None\n", "pydantic_param = None\n", - "score = None" + "score = None\n", + "envvar = None" ] }, { @@ -27,7 +28,8 @@ "source": [ "assert integer == 1\n", "assert stringer == \"hello\"\n", - "assert floater == 3.14" + "assert floater == 3.14\n", + "assert envvar == \"from env\"" ] }, { diff --git a/extensions/nodes/nodes.py b/extensions/nodes/nodes.py index 62929162..7018fce4 100644 --- a/extensions/nodes/nodes.py +++ b/extensions/nodes/nodes.py @@ -661,6 +661,10 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs): self._context.run_log_store.add_step_log(step_log, self._context.run_id) + # If we failed, we return without any collection + if not step_log.status == defaults.SUCCESS: + return + # Apply the reduce function and reduce the returns of the task nodes. # The final value of the parameter is the result of the reduce function. reducer_f = self.get_reducer_function() diff --git a/runnable/cli.py b/runnable/cli.py index 1a93181e..f7699267 100644 --- a/runnable/cli.py +++ b/runnable/cli.py @@ -65,7 +65,7 @@ def execute( run_id: Annotated[ str, typer.Argument( - envvar="RUNNABLE_RUN_ID", + envvar=defaults.ENV_RUN_ID, help="An optional run_id, one would be generated if its not provided", ), ] = "", diff --git a/runnable/graph.py b/runnable/graph.py index d27dd798..8fc0644b 100644 --- a/runnable/graph.py +++ b/runnable/graph.py @@ -76,7 +76,6 @@ def get_node_by_internal_name(self, internal_name: str) -> "BaseNode": for _, value in self.nodes.items(): if value.internal_name == internal_name: return value - print("graph", internal_name) raise exceptions.NodeNotFoundError(internal_name) def __str__(self): # pragma: no cover diff --git a/runnable/tasks.py b/runnable/tasks.py index a59e0097..43a1e05c 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -31,7 +31,26 @@ logging.getLogger("stevedore").setLevel(logging.CRITICAL) -# TODO: This has to be an extension +class TeeIO(io.StringIO): + """ + A custom class to write to the buffer and the output stream at the same time. + """ + + def __init__(self, output_stream=sys.stdout): + super().__init__() + self.output_stream = output_stream + + def write(self, s): + super().write(s) # Write to the buffer + self.output_stream.write(s) # Write to the output stream + + def flush(self): + super().flush() + self.output_stream.flush() + + +buffer = TeeIO() +sys.stdout = buffer class TaskReturns(BaseModel): @@ -152,6 +171,7 @@ def execution_context( key: value for key, value in params.items() if isinstance(value, JsonParameter) + or isinstance(value, MetricParameter) } parameters_in = copy.deepcopy(params) @@ -274,7 +294,7 @@ def execute_command( f"Calling {func} from {module} with {filtered_parameters}" ) - out_file = io.StringIO() + out_file = TeeIO() with contextlib.redirect_stdout(out_file): user_set_parameters = f( **filtered_parameters @@ -284,16 +304,15 @@ def execute_command( raise exceptions.CommandCallError( f"Function call: {self.command} did not succeed.\n" ) from e - - attempt_log.input_parameters = params.copy() - - if map_variable: - attempt_log.input_parameters.update( - { - k: JsonParameter(value=v, kind="json") - for k, v in map_variable.items() - } - ) + finally: + attempt_log.input_parameters = params.copy() + if map_variable: + attempt_log.input_parameters.update( + { + k: JsonParameter(value=v, kind="json") + for k, v in map_variable.items() + } + ) if self.returns: if not isinstance(user_set_parameters, tuple): # make it a tuple @@ -448,6 +467,7 @@ def execute_command( ) as params, self.expose_secrets() as _, ): + attempt_log.input_parameters = params.copy() copy_params = copy.deepcopy(params) if map_variable: @@ -476,7 +496,7 @@ def execute_command( } kwds.update(ploomber_optional_args) - out_file = io.StringIO() + out_file = TeeIO() with contextlib.redirect_stdout(out_file): pm.execute_notebook(**kwds) task_console.print(out_file.getvalue()) @@ -635,6 +655,7 @@ def execute_command( ) as params: subprocess_env.update({k: v.get_value() for k, v in params.items()}) + attempt_log.input_parameters = params.copy() # Json dumps all runnable environment variables for key, value in subprocess_env.items(): if isinstance(value, str): diff --git a/tests/assertions.py b/tests/assertions.py new file mode 100644 index 00000000..094d2dfd --- /dev/null +++ b/tests/assertions.py @@ -0,0 +1,160 @@ +import json +import os +import re +from functools import lru_cache +from pathlib import Path + +from runnable import defaults + + +@lru_cache +def load_run_log(run_id): + from runnable import context + + run_log = context.run_context.run_log_store.get_run_log_by_id(run_id, full=True) + return run_log + + +def should_be_successful(): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + assert ( + run_log.status == defaults.SUCCESS + ), f"Expected successful, got {run_log.status}" + + +def should_be_failed(): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + assert run_log.status == defaults.FAIL, f"Expected successful, got {run_log.status}" + + +def should_have_num_steps(num_steps: int) -> None: + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + assert ( + len(run_log.steps) == num_steps + ), f"Expected {num_steps} steps, got {len(run_log.steps)}" + + +def should_step_be_successful(step_name: str): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + assert step.status == defaults.SUCCESS, f"Expected successful, got {step.status}" + + +def should_step_be_failed(step_name: str): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + assert step.status == defaults.FAIL, f"Expected failed, got {step.status}" + + +def should_step_have_parameters(step_name: str, parameters: dict): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + func_parameters = { + parameter: value.value + for parameter, value in step.attempts[0].input_parameters.items() + } + + assert parameters == func_parameters + + +def should_branch_step_have_parameters( + parent_step_name: str, branch_name: str, branch_step_name: str, key: str, value +): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[parent_step_name] + + branch = step.branches[f"{parent_step_name}.{branch_name}"] + step_name = f"{parent_step_name}.{branch_name}.{branch_step_name}" + func_parameters = { + parameter: value.value + for parameter, value in branch.steps[step_name] + .attempts[0] + .input_parameters.items() + } + + assert func_parameters[key] == value + + +def should_step_have_output_parameters(step_name: str, parameters: dict): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + func_parameters = { + parameter: value.value + for parameter, value in step.attempts[0].output_parameters.items() + } + + assert parameters == func_parameters + + +def should_have_catalog_execution_logs(): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + + step_names = run_log.steps.keys() + contents = os.listdir(f".catalog/{run_id}") + + for step_name in step_names: + logfile_name = "".join(x for x in step_name if x.isalnum()) + pattern = rf"{re.escape(logfile_name)}...\.execution\.log" + + assert any( + re.search(pattern, s) for s in contents + ), "No match found in the list." + + +def should_have_catalog_contents(files: list[str] = None): + run_id = os.environ[defaults.ENV_RUN_ID] + + contents = os.listdir(f".catalog/{run_id}") + print(contents) + + for file_name in files or []: + pattern = rf"{file_name}" + + assert any( + re.search(pattern, s) for s in contents + ), "No match found in the list." + + +def should_branch_have_steps(step_name, branch_name: str, num_steps: int): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + branch = step.branches[f"{step_name}.{branch_name}"] + + assert len(branch.steps) == num_steps + + +def should_branch_be_successful(step_name, branch_name: str): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + branch = step.branches[f"{step_name}.{branch_name}"] + + assert branch.status == defaults.SUCCESS + + +def should_branch_be_failed(step_name, branch_name: str): + run_id = os.environ[defaults.ENV_RUN_ID] + run_log = load_run_log(run_id) + step = run_log.steps[step_name] + branch = step.branches[f"{step_name}.{branch_name}"] + + assert branch.status == defaults.FAIL + + +def should_have_notebook_output(name: str): + run_id = os.environ[defaults.ENV_RUN_ID] + + catalog_location = Path(f".catalog/{run_id}") + path = catalog_location / name + + assert path.is_file() diff --git a/tests/test_examples.py b/tests/test_examples.py index 7c5924e4..81b8848e 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2,9 +2,14 @@ import os import subprocess from contextlib import contextmanager +from functools import partial import pytest +import tests.assertions as conditions +from runnable import defaults +from runnable.utils import generate_run_id + def list_python_examples(): for example in python_examples: @@ -15,9 +20,11 @@ def list_python_examples(): def runnable_context(): from runnable import context as runnable_context - yield - del os.environ["RUNNABLE_RUN_ID"] - runnable_context.run_context = None + try: + yield + finally: + del os.environ[defaults.ENV_RUN_ID] + runnable_context.run_context = None @contextmanager @@ -47,7 +54,9 @@ def chunked_fs_context(): @contextmanager def mocked_context(): with runnable_context(): - os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/08-mocking/default.yaml" + os.environ["RUNNABLE_CONFIGURATION_FILE"] = ( + "examples/08-mocking/mocked-config-simple.yaml" + ) os.environ["RUNNABLE_PRM_envvar"] = "from env" yield del os.environ["RUNNABLE_CONFIGURATION_FILE"] @@ -81,27 +90,525 @@ def argo_context(): del os.environ["RUNNABLE_CONFIGURATION_FILE"] -contexts = [default_context, chunked_fs_context, mocked_context, argo_context] +contexts = [ + default_context, + chunked_fs_context, +] # , mocked_context, argo_context] +# file, fails, ignore_contexts, parameters_file, assertions python_examples = [ - # ("01-tasks/notebook", False, []), - ("01-tasks/python_tasks", False, []), - ("01-tasks/scripts", False, []), - ("01-tasks/stub", False, []), - ("02-sequential/default_fail", True, []), - # ("02-sequential/on_failure_fail", True, []), # need https://github.com/AstraZeneca/runnable/issues/156 - # ("02-sequential/on_failure_succeed", False, []), # https://github.com/AstraZeneca/runnable/issues/156 - ("02-sequential/traversal", False, []), - ("03-parameters/passing_parameters_notebook", False, []), - ("03-parameters/passing_parameters_python", False, []), - ("03-parameters/passing_parameters_shell", False, []), - ("03-parameters/static_parameters_non_python", False, []), - ("03-parameters/static_parameters_python", False, []), - ("04-catalog/catalog", False, [mocked_context]), - ("06-parallel/parallel", False, []), - ("06-parallel/nesting", False, []), - ("07-map/map", False, []), - ("07-map/custom_reducer", False, []), + ( + "01-tasks/python_tasks", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 2), + partial(conditions.should_have_catalog_execution_logs), + ], + ), + ( + "01-tasks/scripts", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 2), + partial(conditions.should_have_catalog_execution_logs), + ], + ), + ( + "01-tasks/stub", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 4), + partial(conditions.should_have_catalog_execution_logs), + ], + ), + ( + "01-tasks/notebook", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 2), + partial(conditions.should_have_catalog_execution_logs), + partial( + conditions.should_have_notebook_output, + "examples/common/simple_notebook-hello_out.ipynb", + ), + ], + ), + ( + "02-sequential/traversal", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 5), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "hello stub"), + partial(conditions.should_step_be_successful, "hello python"), + partial(conditions.should_step_be_successful, "hello shell"), + partial(conditions.should_step_be_successful, "hello notebook"), + ], + ), + ( + "02-sequential/default_fail", + True, + [], + "", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_failed), + partial(conditions.should_step_be_successful, "step 1"), + partial(conditions.should_step_be_failed, "step 2"), + ], + ), + ( + "02-sequential/on_failure_fail", + True, + [], + "", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_failed), + partial(conditions.should_step_be_successful, "step 4"), + partial(conditions.should_step_be_failed, "step 1"), + ], + ), + ( + "02-sequential/on_failure_succeed", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "step 4"), + partial(conditions.should_step_be_failed, "step 1"), + ], + ), + ( + "03-parameters/static_parameters_python", + False, + [], + "examples/common/initial_parameters.yaml", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "read_params_as_pydantic"), + partial(conditions.should_step_be_successful, "read_params_as_json"), + partial( + conditions.should_step_have_parameters, + "read_params_as_pydantic", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_parameters, + "read_params_as_json", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + ], + ), + ( + "03-parameters/static_parameters_non_python", + False, + [], + "examples/common/initial_parameters.yaml", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "read_params_in_notebook"), + partial(conditions.should_step_be_successful, "read_params_in_shell"), + partial( + conditions.should_step_have_parameters, + "read_params_in_notebook", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_parameters, + "read_params_in_shell", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + ], + ), + ( + "03-parameters/static_parameters_fail", + False, + [], + "examples/common/initial_parameters.yaml", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_failed, "read_params_and_fail"), + partial(conditions.should_step_be_successful, "read_params_in_notebook"), + partial( + conditions.should_step_have_parameters, + "read_params_in_notebook", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_parameters, + "read_params_and_fail", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "chunks": [1, 2, 3], + "envvar": "from env", + }, + ), + ], + ), + ( + "03-parameters/passing_parameters_python", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "set_parameter"), + partial( + conditions.should_step_have_parameters, + "set_parameter", + { + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "set_parameter", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "df": "df", + }, + ), + partial( + conditions.should_step_have_parameters, + "get_parameters", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "df": "df", + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "get_parameters", + {}, + ), + ], + ), + ( + "03-parameters/passing_parameters_notebook", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 4), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "set_parameter"), + partial( + conditions.should_step_have_parameters, + "set_parameter", + { + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "set_parameter", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "df": "df", + }, + ), + partial( + conditions.should_step_have_parameters, + "get_parameters", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "df": "df", + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "get_parameters", + {}, + ), + partial( + conditions.should_step_have_parameters, + "read_parameters_in_notebook", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "read_parameters_in_notebook", + {}, + ), + ], + ), + ( + "03-parameters/passing_parameters_shell", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 4), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "write_parameter"), + partial( + conditions.should_step_have_parameters, + "write_parameter", + { + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "write_parameter", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + }, + ), + partial( + conditions.should_step_have_parameters, + "read_parameters", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "read_parameters", + {}, + ), + partial( + conditions.should_step_have_parameters, + "read_parameters_in_shell", + { + "integer": 1, + "floater": 3.14, + "stringer": "hello", + "pydantic_param": {"x": 10, "foo": "bar"}, + "score": 0.9, + "envvar": "from env", + }, + ), + partial( + conditions.should_step_have_output_parameters, + "read_parameters_in_shell", + {}, + ), + ], + ), + ( + "04-catalog/catalog_python", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 4), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_be_successful), + partial(conditions.should_step_be_successful, "generate_data_python"), + partial(conditions.should_step_be_successful, "read_data_python"), + partial( + conditions.should_have_catalog_contents, + ["df.csv", "data_folder"], + ), + ], + ), + ( + "06-parallel/parallel", + False, + [], + "", + [ + partial(conditions.should_have_num_steps, 2), + partial(conditions.should_be_successful), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_branch_have_steps, "parallel_step", "branch1", 5), + partial(conditions.should_branch_have_steps, "parallel_step", "branch2", 5), + partial(conditions.should_branch_be_successful, "parallel_step", "branch1"), + partial(conditions.should_branch_be_successful, "parallel_step", "branch2"), + ], + ), + ( + "06-parallel/parallel_branch_fail", + True, + [], + "", + [ + partial(conditions.should_have_num_steps, 2), + partial(conditions.should_be_failed), + partial(conditions.should_have_catalog_execution_logs), + partial(conditions.should_branch_have_steps, "parallel_step", "branch1", 5), + partial(conditions.should_branch_have_steps, "parallel_step", "branch2", 3), + partial(conditions.should_branch_be_successful, "parallel_step", "branch1"), + partial(conditions.should_branch_be_failed, "parallel_step", "branch2"), + ], + ), + ( + "07-map/map", + False, + [], + "examples/common/initial_parameters.yaml", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_be_successful), + partial(conditions.should_branch_have_steps, "map_state", "1", 5), + partial(conditions.should_branch_have_steps, "map_state", "2", 5), + partial(conditions.should_branch_have_steps, "map_state", "3", 5), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "1", + "execute_python", + key="chunk", + value=1, + ), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "1", + "execute_notebook", + key="processed_python", + value=10, + ), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "1", + "execute_shell", + key="processed_notebook", + value=100, + ), + ], + ), + ( + "07-map/map_fail", + True, + [], + "examples/common/initial_parameters.yaml", + [ + partial(conditions.should_have_num_steps, 3), + partial(conditions.should_be_failed), + partial(conditions.should_branch_have_steps, "map_state", "1", 2), + partial(conditions.should_branch_have_steps, "map_state", "2", 5), + partial(conditions.should_branch_have_steps, "map_state", "3", 5), + partial(conditions.should_branch_be_failed, "map_state", "1"), + partial(conditions.should_branch_be_successful, "map_state", "2"), + partial(conditions.should_branch_be_successful, "map_state", "3"), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "1", + "execute_python", + key="chunk", + value=1, + ), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "2", + "execute_notebook", + key="processed_python", + value=20, + ), + partial( + conditions.should_branch_step_have_parameters, + "map_state", + "2", + "execute_shell", + key="processed_notebook", + value=200, + ), + ], + ), ] @@ -112,7 +619,7 @@ def argo_context(): def test_python_examples(example, context, monkeypatch, mocker): print(f"Testing {example}...") - mod, status, ignore_contexts = example + mod, fails, ignore_contexts, _, assertions = example if context in ignore_contexts: return @@ -125,10 +632,13 @@ def test_python_examples(example, context, monkeypatch, mocker): from runnable import exceptions try: + os.environ[defaults.ENV_RUN_ID] = generate_run_id() f() + [asserttion() for asserttion in assertions] + except exceptions.ExecutionFailedError: print("Example failed") - if not status: + if not fails: raise @@ -138,73 +648,77 @@ def test_python_examples(example, context, monkeypatch, mocker): @pytest.mark.e2e def test_yaml_examples(example, context): print(f"Testing {example}...") - file, status, ignore_contexts = example + file, fails, ignore_contexts, parameters_file, assertions = example if context in ignore_contexts: return context = context() example_file = f"examples/{file}.yaml" - parameters_file = "examples/common/initial_parameters.yaml" with context: from runnable import exceptions from runnable.entrypoints import execute_pipeline_yaml_spec + run_id = generate_run_id() + os.environ[defaults.ENV_RUN_ID] = run_id try: execute_pipeline_yaml_spec( - pipeline_file=example_file, parameters_file=parameters_file + pipeline_file=example_file, + parameters_file=parameters_file, + run_id=os.environ[defaults.ENV_RUN_ID], ) + [asserttion() for asserttion in assertions] except exceptions.ExecutionFailedError: - if not status: + if not fails: raise -@pytest.mark.parametrize("example", list_python_examples()) -@pytest.mark.container -def test_python_examples_container(example): - print(f"Testing {example}...") +# @pytest.mark.parametrize("example", list_python_examples()) +# @pytest.mark.container +# def test_python_examples_container(example): +# print(f"Testing {example}...") - mod, status, _ = example - context = container_context() +# mod, fails, _ = example +# context = container_context() - imported_module = importlib.import_module(f"examples.{mod.replace('/', '.')}") - f = getattr(imported_module, "main") - with context: - from runnable import context, exceptions +# imported_module = importlib.import_module(f"examples.{mod.replace('/', '.')}") +# f = getattr(imported_module, "main") +# with context: +# from runnable import context, exceptions - try: - f() - except exceptions.ExecutionFailedError: - print("Example failed") - if not status: - raise - finally: - context.run_context = None +# try: +# f() +# except exceptions.ExecutionFailedError: +# print("Example failed") +# if not fails: +# raise +# finally: +# context.run_context = None -@pytest.mark.parametrize("example", list_python_examples()) -@pytest.mark.container -def test_yaml_examples_container(example): - print(f"Testing {example}...") - file, status, _ = example +# @pytest.mark.parametrize("example", list_python_examples()) +# @pytest.mark.container +# def test_yaml_examples_container(example): +# print(f"Testing {example}...") +# file, fails, _ = example - context = container_context() +# context = container_context() - example_file = f"examples/{file}.yaml" - parameters_file = "examples/common/initial_parameters.yaml" +# example_file = f"examples/{file}.yaml" +# parameters_file = "examples/common/initial_parameters.yaml" - with context: - from runnable import exceptions - from runnable.entrypoints import execute_pipeline_yaml_spec +# with context: +# from runnable import exceptions +# from runnable.entrypoints import execute_pipeline_yaml_spec - try: - execute_pipeline_yaml_spec( - pipeline_file=example_file, parameters_file=parameters_file - ) - except exceptions.ExecutionFailedError: - if not status: - raise +# try: +# execute_pipeline_yaml_spec( +# pipeline_file=example_file, parameters_file=parameters_file +# ) +# except exceptions.ExecutionFailedError: +# if not fails: +# raise # TODO: add tests for jobs