Skip to content

Examples

This page contains complete, runnable examples that demonstrate PaPy's core features. Each example can be saved as a .py file and executed directly.

Warning

Parallel features of PaPy do not work in the interactive interpreter. Wrap your code in if __name__ == '__main__': when using processes.

End-to-end example: Parallel word-frequency pipeline

This complete example builds a pipeline that processes text through three stages — tokenization, counting, and reporting — with the first two stages running in parallel via a shared NuMap process pool.

flowchart LR
    TEXT["15 lines\nof text"] --> T["tokenize\n(parallel)"]
    T --> C["count words\n(parallel)"]
    C --> R["accumulate\n& report"]

Save this as word_frequency.py and run with python word_frequency.py:

from papy import Worker, Piper, Plumber
from numap import NuMap

# -- Worker functions --
# Each receives inbox[0] from the upstream Piper.

def tokenize(inbox):
    """Split a line into lowercase words, stripping punctuation."""
    line = inbox[0]
    cleaned = line.lower()
    for ch in ".,!?;:'\"()-":
        cleaned = cleaned.replace(ch, "")
    return cleaned.split()

def count_words(inbox):
    """Count word frequencies in a list of words."""
    words = inbox[0]
    counts = {}
    for w in words:
        counts[w] = counts.get(w, 0) + 1
    return counts

_totals = {}

def accumulate_and_print(inbox, top_n=5):
    """Merge per-line counts into running totals and print progress."""
    line_counts = inbox[0]
    for word, n in line_counts.items():
        _totals[word] = _totals.get(word, 0) + n
    total_words = sum(_totals.values())
    print(f"  ... {total_words} total words, {len(_totals)} unique")

# -- Sample data --
SAMPLE_TEXT = [
    "PaPy is a parallel pipeline framework for Python",
    "It uses Workers to wrap functions and Pipers to connect them",
    "NuMap provides the parallel execution engine with threads or processes",
    "Workers compose multiple functions into a single processing node",
    "Pipers define how and where Workers are evaluated in the pipeline",
    "The Dagger connects Pipers into a directed acyclic graph",
    "The Plumber provides the interface to run and monitor a pipeline",
    "Data flows through pipes from upstream Pipers to downstream Pipers",
]

if __name__ == "__main__":
    _totals.clear()

    # 1. Create a shared pool of 4 worker processes
    nmap = NuMap(worker_type="process", worker_num=4)

    # 2. Create Pipers — first two are parallel, last is sequential
    p_tokenize = Piper(Worker(tokenize), parallel=nmap)
    p_count = Piper(Worker(count_words), parallel=nmap)
    p_report = Piper(Worker(accumulate_and_print, kwargs={"top_n": 5}))

    # 3. Build pipeline and run
    plumber = Plumber()
    plumber.add_pipe((p_tokenize, p_count, p_report))
    plumber.start([SAMPLE_TEXT])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

    # 4. Print final report
    print(f"\nCompleted in {plumber.stats['run_time']:.3f}s")
    ranked = sorted(_totals.items(), key=lambda kv: kv[1], reverse=True)
    for i, (word, count) in enumerate(ranked[:10], 1):
        print(f"  {i:2d}. {word:<15s} {count}")

A complete version with more features is in examples/word_frequency.py.


Example 1: Minimal linear pipeline

A three-stage pipeline that reads numbers, squares them, and prints the results.

flowchart LR
    DATA["[1,2,3,4,5]"] --> P1["square"] --> P2["to_string"] --> P3["printer"]
from papy import Worker, Piper, Plumber

# --- worker functions ---

def square(inbox):
    """Square the input."""
    return inbox[0] ** 2

def to_string(inbox):
    """Format as a string."""
    return f"result = {inbox[0]}"

def printer(inbox):
    """Print and discard (output worker returns None)."""
    print(inbox[0])

# --- build pipeline ---

if __name__ == '__main__':
    p1 = Piper(Worker(square))
    p2 = Piper(Worker(to_string))
    p3 = Piper(Worker(printer))

    plumber = Plumber()
    plumber.add_pipe((p1, p2, p3))
    plumber.start([range(1, 6)])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

Output:

result = 1
result = 4
result = 9
result = 16
result = 25

Example 2: Parallel pipeline with NuMap

Using a shared process pool to speed up CPU-bound work.

from papy import Worker, Piper, Plumber
from numap import NuMap

def heavy_computation(inbox):
    """Simulate CPU-bound work."""
    n = inbox[0]
    return sum(i * i for i in range(n * 10000))

def store(inbox):
    """Accumulate results (in real pipelines, write to file/DB)."""
    print(f"got: {inbox[0]}")

if __name__ == '__main__':
    # shared pool of 4 worker processes
    nmap = NuMap(worker_type='process', worker_num=4)

    p1 = Piper(Worker(heavy_computation), parallel=nmap)
    p2 = Piper(Worker(store))

    plumber = Plumber()
    plumber.add_pipe((p1, p2))
    plumber.start([range(1, 21)])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

Example 3: Branching pipeline

Two branches process the same input differently and merge at the end.

flowchart LR
    DATA["input"] --> P1["double"]
    DATA --> P2["negate"]
    P1 --> P3["combine"]
    P2 --> P3
from papy import Worker, Piper, Plumber

def double(inbox):
    return inbox[0] * 2

def negate(inbox):
    return -inbox[0]

def combine(inbox):
    """Receives two inputs: doubled and negated."""
    doubled, negated = inbox[0], inbox[1]
    print(f"{doubled} and {negated} -> sum = {doubled + negated}")
    return doubled + negated

if __name__ == '__main__':
    p_double = Piper(Worker(double), branch=0)
    p_negate = Piper(Worker(negate), branch=1)
    p_combine = Piper(Worker(combine))

    plumber = Plumber()
    # two pipes from the same input converging at p_combine
    plumber.add_pipe((p_double, p_combine))
    plumber.add_pipe((p_negate, p_combine))

    plumber.start([range(1, 6)])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

Output:

2 and -1 -> sum = 1
4 and -2 -> sum = 2
6 and -3 -> sum = 3
8 and -4 -> sum = 4
10 and -5 -> sum = 5

Example 4: Composed workers

A Worker can chain multiple functions. Each function receives the output of the previous one (boxed in a 1-tuple):

from papy import Worker

def parse_line(inbox):
    """Parse 'key=value' into a tuple."""
    line = inbox[0]
    key, value = line.split('=')
    return (key.strip(), int(value.strip()))

def validate(inbox):
    """Reject negative values."""
    key, value = inbox[0]
    if value < 0:
        raise ValueError(f"negative value for {key}")
    return (key, value)

def format_output(inbox):
    """Format as a string."""
    key, value = inbox[0]
    return f"{key}: {value}"

# compose all three into a single worker
w = Worker((parse_line, validate, format_output))

# call it directly (no pipeline needed)
print(w(["count = 42"]))     # "count: 42"
print(w(["score = -1"]))     # WorkerError (negative value)
print(w(["total = 100"]))    # "total: 100"

Example 5: Using NuMap standalone

NuMap can be used independently of PaPy as a drop-in replacement for multiprocessing.Pool.imap:

from numap import NuMap
import math

def process(inbox):
    """Compute factorial."""
    return math.factorial(inbox)

if __name__ == '__main__':
    # basic usage: single task
    nmap = NuMap(worker_type='process', worker_num=4)
    nmap.add_task(process, range(10))
    nmap.start()
    results = list(nmap)
    print(results)
    nmap.stop(ends=[0])
    # [1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880]

Multiple chained tasks

from numap import NuMap

def double(inbox):
    return inbox * 2

def add_ten(inbox):
    return inbox + 10

if __name__ == '__main__':
    nmap = NuMap(worker_type='thread', worker_num=2)
    out0 = nmap.add_task(double, range(5))
    out1 = nmap.add_task(add_ten, out0)
    nmap.start()

    # iterate over the final task's output
    for result in out1:
        print(result)
    # 10, 12, 14, 16, 18

    nmap.stop(ends=[1])

Example 6: Thread workers for IO-bound tasks

When tasks are IO-bound (network, disk), thread workers avoid the overhead of process spawning:

import time
from numap import NuMap

def fetch(inbox):
    """Simulate a network request."""
    time.sleep(0.2)  # simulate latency
    return f"response for {inbox}"

if __name__ == '__main__':
    nmap = NuMap(worker_type='thread', worker_num=8)
    nmap.add_task(fetch, range(16))
    t0 = time.monotonic()
    nmap.start()
    results = list(nmap)
    elapsed = time.monotonic() - t0
    nmap.stop(ends=[0])
    print(f"Fetched {len(results)} items in {elapsed:.2f}s")
    # ~0.4s with 8 threads vs ~3.2s sequential

Example 7: Sub-interpreter workers (Python 3.14+)

Sub-interpreters provide true CPU parallelism with lower overhead than processes:

import sys
from numap import NuMap, HASINTERP

def cpu_work(inbox):
    """CPU-bound: sum of squares."""
    return sum(i * i for i in range(inbox * 100000))

if __name__ == '__main__':
    if not HASINTERP:
        print(f"Python {sys.version_info[:2]} — sub-interpreters not available")
        sys.exit(0)

    nmap = NuMap(worker_type='interpreter', worker_num=4, stride=4)
    nmap.add_task(cpu_work, [10] * 8)
    nmap.start()
    results = list(nmap)
    nmap.stop(ends=[0])
    print(f"Computed {len(results)} results with sub-interpreters")

Note

Sub-interpreter workers require Python 3.14+. On earlier versions, HASINTERP is False. Use worker_type='process' as a fallback.

Example 8: Error handling in pipelines

PaPy catches exceptions inside workers and propagates them downstream as PiperError instances. The pipeline continues processing other items:

from papy import Worker, Piper, Plumber, PiperError

def safe_divide(inbox):
    """Divide 100 by the input — may raise ZeroDivisionError."""
    return 100 / inbox[0]

def report(inbox):
    """Print result or error."""
    value = inbox[0]
    if isinstance(value, PiperError):
        print(f"  error: {value.args[0]}")
    else:
        print(f"  result: {value}")

if __name__ == '__main__':
    p1 = Piper(Worker(safe_divide))
    p2 = Piper(Worker(report))

    plumber = Plumber()
    plumber.add_pipe((p1, p2))
    plumber.start([[5, 0, 10, 0, 2]])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()

Output:

  result: 20.0
  error: division by zero
  result: 10.0
  error: division by zero
  result: 50.0

The pipeline processed all 5 items despite two ZeroDivisionError exceptions.

Example 9: Worker function arguments

Workers can carry extra positional and keyword arguments that are passed to the function alongside each input item:

from papy import Worker

def multiply(inbox, factor):
    return inbox[0] * factor

def power(inbox, exponent=2):
    return inbox[0] ** exponent

# positional args
w1 = Worker(multiply, (3,))        # multiply by 3
print(w1([7]))                      # 21

# keyword args
w2 = Worker(power, kwargs={'exponent': 3})
print(w2([4]))                      # 64

# composed: multiply by 3, then cube
w3 = Worker((multiply, power), ((3,), ()), kwargs=(None, {'exponent': 3}))
print(w3([2]))                      # (2*3)^3 = 216

Example 10: Using built-in utility functions

PaPy provides utility functions in papy.util.func for common data-flow operations:

from papy import Worker, Piper, Plumber
from papy.util.func import ipasser, njoiner

def source_a(inbox):
    return f"hello-{inbox[0]}"

def source_b(inbox):
    return f"world-{inbox[0]}"

if __name__ == '__main__':
    pa = Piper(Worker(source_a), branch=0)
    pb = Piper(Worker(source_b), branch=1)

    # njoiner joins all inputs with a separator
    p_join = Piper(Worker(njoiner, kwargs={'join': ' + '}))

    plumber = Plumber()
    plumber.add_pipe((pa, p_join))
    plumber.add_pipe((pb, p_join))
    plumber.start([range(3)])
    plumber.run()
    plumber.wait()
    plumber.pause()
    plumber.stop()