Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: InsertValue will error if input is None #18

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions slurry/sections/_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class InsertValue(TrioSection):
"""Inserts a single user supplied value into the pipeline on startup and then
passes through any further received items unmodified.

If no input is used, the single value will be sent, and InsertValue will close.

:param value: Item to send on startup.
:type value: Any
"""
Expand All @@ -127,5 +129,6 @@ def __init__(self, value: Any) -> None:

async def refine(self, input, output):
await output(self.value)
async for item in input:
await output(item)
if input:
async for item in input:
await output(item)
33 changes: 24 additions & 9 deletions tests/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import trio

from slurry import Pipeline
from slurry.sections import Repeat, Metronome, InsertValue
from slurry.sections import Repeat, Metronome, InsertValue, _producers

from .fixtures import produce_alphabet

Expand Down Expand Up @@ -51,19 +51,27 @@ async def test_repeat_input(autojump_clock):
break
assert results == [('a', 1), ('a', 2), ('b', 2.5), ('b', 3.5), ('c', 4)]

async def test_metronome():
async def test_metronome(autojump_clock, monkeypatch):
monkeypatch.setattr(_producers, "time", trio.current_time)
async with Pipeline.create(
produce_alphabet(5, max=3),
produce_alphabet(5, max=6, delay=1),
Metronome(5)
) as pipeline, pipeline.tap() as aiter:
results = []
start_time = trio.current_time()
async for item in aiter:
results.append((item, trio.current_time() - start_time))
if len(results) == 2:
break
assert [x[0] for x in results] == ['a', 'b']
assert 5 - results[1][1] + results[0][1] < 0.1
results.append((item, trio.current_time()))
assert results == [(letter, 5.0 * (i + 1)) for i, letter in enumerate("abcde")]

async def test_metronome_no_input(autojump_clock, monkeypatch):
monkeypatch.setattr(_producers, "time", trio.current_time)
async with Pipeline.create(
Metronome(5, "a")
) as pipeline, pipeline.tap() as aiter:
results = []
for _ in range(5):
item = await aiter.__anext__()
results.append((item, trio.current_time()))
assert results == [("a", 5.0 * (i + 1)) for i in range(5)]

async def test_insert_value(autojump_clock):
async with Pipeline.create(
Expand All @@ -73,3 +81,10 @@ async def test_insert_value(autojump_clock):
start_time = trio.current_time()
results = [(v, trio.current_time() - start_time) async for v in aiter]
assert results == [('n', 0), ('a', 1), ('b', 2), ('c', 3)]

async def test_insert_value_no_input(autojump_clock):
async with Pipeline.create(
InsertValue('n')
) as pipeline, pipeline.tap() as aiter:
results = [v async for v in aiter]
assert results == ['n']
Loading