Skip to content

The Produce / Spawn / Consume Idiom

The Piper class supports three parameters — consume, produce, and spawn — that control how many input items are consumed per evaluation and how many output items are generated. These allow non-1:1 mappings between input and output streams.

The constraint

For any pair of connected Pipers the following must hold:

$$ \text{produce}{\text{upstream}} \times \text{spawn}{\text{upstream}} = \text{consume}{\text{downstream}} \times \text{spawn}{\text{downstream}} $$

If this constraint is violated the pipeline will deadlock or produce incorrect results.

consume

The consume parameter specifies how many consecutive input items from each upstream Piper are batched together before a single evaluation.

flowchart LR
    subgraph "consume=1 (default)"
        A1["item 1"] --> F1["func(item1)"]
        A2["item 2"] --> F2["func(item2)"]
        A3["item 3"] --> F3["func(item3)"]
    end
flowchart LR
    subgraph "consume=3"
        B1["item 1"] --> G["func([item1,\nitem2,\nitem3])"]
        B2["item 2"] --> G
        B3["item 3"] --> G
    end

Use case: aggregate or batch operations — e.g., computing a rolling average over windows of N items.

from papy import Worker, Piper, Plumber

def average(inbox):
    """Average a batch of numbers."""
    items = inbox[0]  # inbox is a list of `consume` items
    return sum(items) / len(items)

p_avg = Piper(Worker(average), consume=3)

With consume=3, the Piper collects 3 items before calling the worker. The input length must be a multiple of consume.

produce

The produce parameter specifies how many output items are generated from each single evaluation. The worker function must return a sequence (list or tuple) of that length, and each element becomes a separate downstream item.

flowchart LR
    subgraph "produce=3"
        IN["item 1"] --> F["func(item1)\nreturns [a, b, c]"]
        F --> O1["a"]
        F --> O2["b"]
        F --> O3["c"]
    end

Use case: splitting a single item into multiple items — e.g., splitting a FASTA record into individual sequences.

def split_into_three(inbox):
    """Split one item into three."""
    x = inbox[0]
    return [x, x * 10, x * 100]

p_split = Piper(Worker(split_into_three), produce=3)

Note

If repeat=True, the single return value is repeated produce times instead of being iterated. This is useful when you want to duplicate items rather than unpack sequences.

def duplicate(inbox):
    return inbox[0]  # returns a single value, not a sequence

p_dup = Piper(Worker(duplicate), produce=3, repeat=True)
# input: [5] -> output: [5, 5, 5]

spawn

The spawn parameter creates multiple implicit copies of the same Piper in the pipeline. Each copy processes a different "slice" of the output from the upstream Piper. This is useful when the upstream Piper produces multiple items per input and each should be processed independently.

flowchart LR
    subgraph "upstream produce=2"
        A["item"] --> F["func(item)\nreturns [a, b]"]
    end
    subgraph "downstream spawn=2"
        F --> S1["copy 1: process(a)"]
        F --> S2["copy 2: process(b)"]
    end

Use case: a branch point where a single item is split and each part goes through the same processing.

def split(inbox):
    x = inbox[0]
    return [x + 1, x - 1]

def process(inbox):
    return inbox[0] ** 2

p_split = Piper(Worker(split), produce=2)
p_proc = Piper(Worker(process), spawn=2)

Complete example

A pipeline that reads numbers, splits each into two (doubled and tripled), processes each branch, then merges:

from papy import Worker, Piper, Plumber

def split_two(inbox):
    x = inbox[0]
    return [x * 2, x * 3]

def square(inbox):
    return inbox[0] ** 2

def merge(inbox):
    a, b = inbox[0], inbox[1]
    print(f"{a} + {b} = {a + b}")

if __name__ == '__main__':
    p_split = Piper(Worker(split_two), produce=2)
    p_square = Piper(Worker(square), spawn=2)
    p_merge = Piper(Worker(merge), consume=2)

    plumber = Plumber()
    plumber.add_pipe((p_split, p_square, p_merge))
    plumber.start([range(1, 4)])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

The constraint is satisfied: produce(2) * spawn(1) = consume(1) * spawn(2) = 2 on the p_split → p_square connection, and produce(1) * spawn(2) = consume(2) * spawn(1) = 2 on the p_square → p_merge connection.

Summary table

Parameter Default Effect
consume 1 Batch N input items into one evaluation
produce 1 Split one evaluation result into N output items
spawn 1 Create N implicit copies of the Piper
repeat False If True, repeat result instead of iterating it (with produce > 1)