Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp testing #190

Merged
merged 6 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/01-tasks/notebook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ dag:
You can run this pipeline as:
runnable execute -f examples/01-tasks/notebook.yaml

start_at: notebook
start_at: hello
steps:
notebook:
hello:
type: task
command_type: notebook
command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project.
Expand Down
4 changes: 2 additions & 2 deletions examples/01-tasks/python_tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ dag:
2 directories, 1 file

The hello.execution.log has the captured stdout of "Hello World!".
start_at: hello_task
start_at: hello
steps:
hello_task:
hello:
type: task
command: examples.common.functions.hello # dotted path to the function.
next: success
16 changes: 8 additions & 8 deletions examples/02-sequential/on_failure_succeed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ dag:

Run this pipeline as:
runnable execute -f examples/02-sequential/on_failure_succeed.yaml
start_at: step_1
start_at: step 1
steps:
step_1:
step 1:
type: task
command_type: shell
command: exit 1 # This will fail!
next: step_2
on_failure: step_4
step_2:
next: step 2
on_failure: step 4
step 2:
type: stub
next: step_3
step_3:
next: step 3
step 3:
type: stub
next: success
step_4:
step 4:
type: stub
next: success
8 changes: 4 additions & 4 deletions examples/03-parameters/passing_parameters_notebook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ dag:

Run the below example as:
runnable execute examples/03-parameters/passing_parameters_notebook.yaml
start_at: write_parameters_from_notebook
start_at: set_parameter
steps:
write_parameters_from_notebook:
set_parameter:
type: task
command_type: notebook
command: examples/common/write_parameters.ipynb
Expand All @@ -26,8 +26,8 @@ dag:
- name: pydantic_param
- name: score
kind: metric
next: read_parameters
read_parameters:
next: get_parameters
get_parameters:
type: task
command: examples.common.functions.read_parameter
next: read_parameters_in_notebook
Expand Down
9 changes: 4 additions & 5 deletions examples/03-parameters/passing_parameters_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ dag:

Run the pipeline as:
runnable execute -f examples/03-parameters/passing_parameters_python.yaml
start_at: write_parameters
start_at: set_parameter
steps:
write_parameters:
set_parameter:
type: task
command: examples.common.functions.write_parameter
returns:
Expand All @@ -30,9 +30,8 @@ dag:
- name: pydantic_param
- name: score
kind: metric

next: read_parameters
read_parameters:
next: get_parameters
get_parameters:
type: task
command: examples.common.functions.read_parameter
next: success
4 changes: 2 additions & 2 deletions examples/03-parameters/passing_parameters_shell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ dag:
Run the pipeline as:
runnable execute -f examples/03-parameters/passing_parameters_shell.yaml

start_at: write_parameters_in_shell
start_at: write_parameter
steps:
write_parameters_in_shell:
write_parameter:
type: task
command_type: shell
command: |
Expand Down
65 changes: 65 additions & 0 deletions examples/03-parameters/static_parameters_fail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
The below example showcases setting up known initial parameters for a pipeline
of only python tasks

The initial parameters as defined in the yaml file are:
simple: 1
complex_param:
x: 10
y: "hello world!!"

runnable allows using pydantic models for deeply nested parameters and
casts appropriately based on annotation. eg: read_initial_params_as_pydantic

If no annotation is provided, the parameter is assumed to be a dictionary.
eg: read_initial_params_as_json

You can set the initial parameters from environment variables as well.
eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable

Run this pipeline as:
python examples/03-parameters/static_parameters_python.py

"""

import os

from examples.common.functions import raise_ex
from runnable import NotebookTask, Pipeline, PythonTask


def main():
read_params_in_notebook = NotebookTask(
name="read_params_in_notebook",
notebook="examples/common/read_parameters.ipynb",
terminate_with_success=True,
)

notebook_pipeline = Pipeline(
steps=[
read_params_in_notebook,
],
)
read_params_and_fail = PythonTask(
function=raise_ex,
name="read_params_and_fail",
terminate_with_success=True,
)

read_params_and_fail.on_failure = notebook_pipeline

python_pipeline = Pipeline(
steps=[
read_params_and_fail,
],
)

python_pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")

return python_pipeline


if __name__ == "__main__":
# Any parameter prefixed by "RUNNABLE_PRM_" will be picked up by runnable
os.environ["RUNNABLE_PRM_envvar"] = "from env"
main()
35 changes: 35 additions & 0 deletions examples/03-parameters/static_parameters_fail.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
dag:
description: |
The below example showcases setting up known initial parameters for a pipeline
of only python tasks

The initial parameters as defined in the yaml file are:
simple: 1
complex_param:
x: 10
y: "hello world!!"

runnable allows using pydantic models for deeply nested parameters and
casts appropriately based on annotation. eg: read_initial_params_as_pydantic

If no annotation is provided, the parameter is assumed to be a dictionary.
eg: read_initial_params_as_json

You can set the initial parameters from environment variables as well.
eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable

Run this pipeline by:
runnable execute -f 03-parameters/static_parameters_python.yaml \
-p examples/common/initial_parameters.yaml
start_at: read_params_and_fail
steps:
read_params_and_fail:
type: task
command: examples.common.functions.raise_ex
next: success
on_failure: read_params_in_notebook
read_params_in_notebook:
type: task
command_type: notebook
command: examples/common/read_parameters.ipynb
next: success
11 changes: 1 addition & 10 deletions examples/03-parameters/static_parameters_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,10 @@ def read_initial_params_as_pydantic(
name="read_params_as_pydantic",
)

"""
Signature of read_initial_params_as_json
def read_initial_params_as_json(
integer: int,
floater: float,
stringer: str,
pydantic_param: Dict[str, Union[int, str]],
):
"""
read_params_as_json = PythonTask(
function=read_initial_params_as_json,
terminate_with_success=True,
name="read_params_json",
name="read_params_as_json",
)

pipeline = Pipeline(
Expand Down
4 changes: 2 additions & 2 deletions examples/03-parameters/static_parameters_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ dag:
read_params_as_pydantic:
type: task
command: examples.common.functions.read_initial_params_as_pydantic
next: read_params_json
read_params_json:
next: read_params_as_json
read_params_as_json:
type: task
command: examples.common.functions.read_initial_params_as_json
next: success
45 changes: 45 additions & 0 deletions examples/04-catalog/catalog_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from examples.common.functions import read_files, write_files
from runnable import Catalog, Pipeline, PythonTask, ShellTask


def main():
write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"])
generate_data = PythonTask(
name="generate_data_python",
function=write_files,
catalog=write_catalog,
)

delete_files_command = """
rm df.csv || true && \
rm data_folder/data.txt || true
"""
# delete from local files after generate
# since its local catalog, we delete to show "get from catalog"
delete_local_after_generate = ShellTask(
name="delete_after_generate",
command=delete_files_command,
)

read_catalog = Catalog(get=["df.csv", "data_folder/data.txt"])
read_data_python = PythonTask(
name="read_data_python",
function=read_files,
catalog=read_catalog,
terminate_with_success=True,
)

pipeline = Pipeline(
steps=[
generate_data,
delete_local_after_generate,
read_data_python,
]
)
_ = pipeline.execute()

return pipeline


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions examples/04-catalog/catalog_python.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dag:
start_at: generate_data_python
steps:
generate_data_python:
type: task
command: examples.common.functions.write_files
catalog:
put:
- df.csv
- data_folder/data.txt
next: delete_files_after_generate
delete_files_after_generate:
type: task
command_type: shell
command: |
rm df.csv || true && \
rm data_folder/data.txt || true
next: read_data_python
read_data_python:
type: task
command_type: python
command: examples.common.functions.read_files
catalog:
get:
- df.csv
- data_folder/data.txt
next: success
13 changes: 0 additions & 13 deletions examples/06-parallel/parallel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,3 @@
"""
This example demonstrates the use of the Parallel step.

The branches of the parallel step are themselves pipelines and can be defined
as shown in 02-sequential/traversal.py.

WARNING, the function returning the pipeline should not executed
during the definition of the branch in parallel steps.

Run this pipeline as:
python examples/06-parallel/parallel.py
"""

from examples.common.functions import hello
from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub

Expand Down
Loading