The Produce / Spawn / Consume Idiom¶
The Piper class supports three parameters — consume, produce, and
spawn — that control how many input items are consumed per evaluation and
how many output items are generated. These allow non-1:1 mappings between
input and output streams.
The constraint¶
For any pair of connected Pipers the following must hold:
$$ \text{produce}{\text{upstream}} \times \text{spawn}{\text{upstream}} = \text{consume}{\text{downstream}} \times \text{spawn}{\text{downstream}} $$
If this constraint is violated the pipeline will deadlock or produce incorrect results.
consume¶
The consume parameter specifies how many consecutive input items from
each upstream Piper are batched together before a single evaluation.
flowchart LR
subgraph "consume=1 (default)"
A1["item 1"] --> F1["func(item1)"]
A2["item 2"] --> F2["func(item2)"]
A3["item 3"] --> F3["func(item3)"]
end
flowchart LR
subgraph "consume=3"
B1["item 1"] --> G["func([item1,\nitem2,\nitem3])"]
B2["item 2"] --> G
B3["item 3"] --> G
end
Use case: aggregate or batch operations — e.g., computing a rolling average over windows of N items.
from papy import Worker, Piper, Plumber
def average(inbox):
"""Average a batch of numbers."""
items = inbox[0] # inbox is a list of `consume` items
return sum(items) / len(items)
p_avg = Piper(Worker(average), consume=3)
With consume=3, the Piper collects 3 items before calling the worker.
The input length must be a multiple of consume.
produce¶
The produce parameter specifies how many output items are generated from
each single evaluation. The worker function must return a sequence (list or
tuple) of that length, and each element becomes a separate downstream item.
flowchart LR
subgraph "produce=3"
IN["item 1"] --> F["func(item1)\nreturns [a, b, c]"]
F --> O1["a"]
F --> O2["b"]
F --> O3["c"]
end
Use case: splitting a single item into multiple items — e.g., splitting a FASTA record into individual sequences.
def split_into_three(inbox):
"""Split one item into three."""
x = inbox[0]
return [x, x * 10, x * 100]
p_split = Piper(Worker(split_into_three), produce=3)
Note
If repeat=True, the single return value is repeated produce times
instead of being iterated. This is useful when you want to duplicate
items rather than unpack sequences.
spawn¶
The spawn parameter creates multiple implicit copies of the same Piper
in the pipeline. Each copy processes a different "slice" of the output from
the upstream Piper. This is useful when the upstream Piper produces
multiple items per input and each should be processed independently.
flowchart LR
subgraph "upstream produce=2"
A["item"] --> F["func(item)\nreturns [a, b]"]
end
subgraph "downstream spawn=2"
F --> S1["copy 1: process(a)"]
F --> S2["copy 2: process(b)"]
end
Use case: a branch point where a single item is split and each part goes through the same processing.
def split(inbox):
x = inbox[0]
return [x + 1, x - 1]
def process(inbox):
return inbox[0] ** 2
p_split = Piper(Worker(split), produce=2)
p_proc = Piper(Worker(process), spawn=2)
Complete example¶
A pipeline that reads numbers, splits each into two (doubled and tripled), processes each branch, then merges:
from papy import Worker, Piper, Plumber
def split_two(inbox):
x = inbox[0]
return [x * 2, x * 3]
def square(inbox):
return inbox[0] ** 2
def merge(inbox):
a, b = inbox[0], inbox[1]
print(f"{a} + {b} = {a + b}")
if __name__ == '__main__':
p_split = Piper(Worker(split_two), produce=2)
p_square = Piper(Worker(square), spawn=2)
p_merge = Piper(Worker(merge), consume=2)
plumber = Plumber()
plumber.add_pipe((p_split, p_square, p_merge))
plumber.start([range(1, 4)])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
The constraint is satisfied: produce(2) * spawn(1) = consume(1) * spawn(2) = 2
on the p_split → p_square connection, and produce(1) * spawn(2) = consume(2) * spawn(1) = 2
on the p_square → p_merge connection.
Summary table¶
| Parameter | Default | Effect |
|---|---|---|
consume |
1 | Batch N input items into one evaluation |
produce |
1 | Split one evaluation result into N output items |
spawn |
1 | Create N implicit copies of the Piper |
repeat |
False | If True, repeat result instead of iterating it (with produce > 1) |