Examples¶
This page contains complete, runnable examples that demonstrate PaPy's core
features. Each example can be saved as a .py file and executed directly.
Warning
Parallel features of PaPy do not work in the interactive interpreter.
Wrap your code in if __name__ == '__main__': when using processes.
End-to-end example: Parallel word-frequency pipeline¶
This complete example builds a pipeline that processes text through three stages — tokenization, counting, and reporting — with the first two stages running in parallel via a shared NuMap process pool.
flowchart LR
TEXT["15 lines\nof text"] --> T["tokenize\n(parallel)"]
T --> C["count words\n(parallel)"]
C --> R["accumulate\n& report"]
Save this as word_frequency.py and run with python word_frequency.py:
from papy import Worker, Piper, Plumber
from numap import NuMap
# -- Worker functions --
# Each receives inbox[0] from the upstream Piper.
def tokenize(inbox):
"""Split a line into lowercase words, stripping punctuation."""
line = inbox[0]
cleaned = line.lower()
for ch in ".,!?;:'\"()-":
cleaned = cleaned.replace(ch, "")
return cleaned.split()
def count_words(inbox):
"""Count word frequencies in a list of words."""
words = inbox[0]
counts = {}
for w in words:
counts[w] = counts.get(w, 0) + 1
return counts
_totals = {}
def accumulate_and_print(inbox, top_n=5):
"""Merge per-line counts into running totals and print progress."""
line_counts = inbox[0]
for word, n in line_counts.items():
_totals[word] = _totals.get(word, 0) + n
total_words = sum(_totals.values())
print(f" ... {total_words} total words, {len(_totals)} unique")
# -- Sample data --
SAMPLE_TEXT = [
"PaPy is a parallel pipeline framework for Python",
"It uses Workers to wrap functions and Pipers to connect them",
"NuMap provides the parallel execution engine with threads or processes",
"Workers compose multiple functions into a single processing node",
"Pipers define how and where Workers are evaluated in the pipeline",
"The Dagger connects Pipers into a directed acyclic graph",
"The Plumber provides the interface to run and monitor a pipeline",
"Data flows through pipes from upstream Pipers to downstream Pipers",
]
if __name__ == "__main__":
_totals.clear()
# 1. Create a shared pool of 4 worker processes
nmap = NuMap(worker_type="process", worker_num=4)
# 2. Create Pipers — first two are parallel, last is sequential
p_tokenize = Piper(Worker(tokenize), parallel=nmap)
p_count = Piper(Worker(count_words), parallel=nmap)
p_report = Piper(Worker(accumulate_and_print, kwargs={"top_n": 5}))
# 3. Build pipeline and run
plumber = Plumber()
plumber.add_pipe((p_tokenize, p_count, p_report))
plumber.start([SAMPLE_TEXT])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
# 4. Print final report
print(f"\nCompleted in {plumber.stats['run_time']:.3f}s")
ranked = sorted(_totals.items(), key=lambda kv: kv[1], reverse=True)
for i, (word, count) in enumerate(ranked[:10], 1):
print(f" {i:2d}. {word:<15s} {count}")
A complete version with more features is in examples/word_frequency.py.
Example 1: Minimal linear pipeline¶
A three-stage pipeline that reads numbers, squares them, and prints the results.
flowchart LR
DATA["[1,2,3,4,5]"] --> P1["square"] --> P2["to_string"] --> P3["printer"]
from papy import Worker, Piper, Plumber
# --- worker functions ---
def square(inbox):
"""Square the input."""
return inbox[0] ** 2
def to_string(inbox):
"""Format as a string."""
return f"result = {inbox[0]}"
def printer(inbox):
"""Print and discard (output worker returns None)."""
print(inbox[0])
# --- build pipeline ---
if __name__ == '__main__':
p1 = Piper(Worker(square))
p2 = Piper(Worker(to_string))
p3 = Piper(Worker(printer))
plumber = Plumber()
plumber.add_pipe((p1, p2, p3))
plumber.start([range(1, 6)])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
Output:
Example 2: Parallel pipeline with NuMap¶
Using a shared process pool to speed up CPU-bound work.
from papy import Worker, Piper, Plumber
from numap import NuMap
def heavy_computation(inbox):
"""Simulate CPU-bound work."""
n = inbox[0]
return sum(i * i for i in range(n * 10000))
def store(inbox):
"""Accumulate results (in real pipelines, write to file/DB)."""
print(f"got: {inbox[0]}")
if __name__ == '__main__':
# shared pool of 4 worker processes
nmap = NuMap(worker_type='process', worker_num=4)
p1 = Piper(Worker(heavy_computation), parallel=nmap)
p2 = Piper(Worker(store))
plumber = Plumber()
plumber.add_pipe((p1, p2))
plumber.start([range(1, 21)])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
Example 3: Branching pipeline¶
Two branches process the same input differently and merge at the end.
flowchart LR
DATA["input"] --> P1["double"]
DATA --> P2["negate"]
P1 --> P3["combine"]
P2 --> P3
from papy import Worker, Piper, Plumber
def double(inbox):
return inbox[0] * 2
def negate(inbox):
return -inbox[0]
def combine(inbox):
"""Receives two inputs: doubled and negated."""
doubled, negated = inbox[0], inbox[1]
print(f"{doubled} and {negated} -> sum = {doubled + negated}")
return doubled + negated
if __name__ == '__main__':
p_double = Piper(Worker(double), branch=0)
p_negate = Piper(Worker(negate), branch=1)
p_combine = Piper(Worker(combine))
plumber = Plumber()
# two pipes from the same input converging at p_combine
plumber.add_pipe((p_double, p_combine))
plumber.add_pipe((p_negate, p_combine))
plumber.start([range(1, 6)])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
Output:
2 and -1 -> sum = 1
4 and -2 -> sum = 2
6 and -3 -> sum = 3
8 and -4 -> sum = 4
10 and -5 -> sum = 5
Example 4: Composed workers¶
A Worker can chain multiple functions. Each function receives the output
of the previous one (boxed in a 1-tuple):
from papy import Worker
def parse_line(inbox):
"""Parse 'key=value' into a tuple."""
line = inbox[0]
key, value = line.split('=')
return (key.strip(), int(value.strip()))
def validate(inbox):
"""Reject negative values."""
key, value = inbox[0]
if value < 0:
raise ValueError(f"negative value for {key}")
return (key, value)
def format_output(inbox):
"""Format as a string."""
key, value = inbox[0]
return f"{key}: {value}"
# compose all three into a single worker
w = Worker((parse_line, validate, format_output))
# call it directly (no pipeline needed)
print(w(["count = 42"])) # "count: 42"
print(w(["score = -1"])) # WorkerError (negative value)
print(w(["total = 100"])) # "total: 100"
Example 5: Using NuMap standalone¶
NuMap can be used independently of PaPy as a drop-in replacement for
multiprocessing.Pool.imap:
from numap import NuMap
import math
def process(inbox):
"""Compute factorial."""
return math.factorial(inbox)
if __name__ == '__main__':
# basic usage: single task
nmap = NuMap(worker_type='process', worker_num=4)
nmap.add_task(process, range(10))
nmap.start()
results = list(nmap)
print(results)
nmap.stop(ends=[0])
# [1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880]
Multiple chained tasks¶
from numap import NuMap
def double(inbox):
return inbox * 2
def add_ten(inbox):
return inbox + 10
if __name__ == '__main__':
nmap = NuMap(worker_type='thread', worker_num=2)
out0 = nmap.add_task(double, range(5))
out1 = nmap.add_task(add_ten, out0)
nmap.start()
# iterate over the final task's output
for result in out1:
print(result)
# 10, 12, 14, 16, 18
nmap.stop(ends=[1])
Example 6: Thread workers for IO-bound tasks¶
When tasks are IO-bound (network, disk), thread workers avoid the overhead of process spawning:
import time
from numap import NuMap
def fetch(inbox):
"""Simulate a network request."""
time.sleep(0.2) # simulate latency
return f"response for {inbox}"
if __name__ == '__main__':
nmap = NuMap(worker_type='thread', worker_num=8)
nmap.add_task(fetch, range(16))
t0 = time.monotonic()
nmap.start()
results = list(nmap)
elapsed = time.monotonic() - t0
nmap.stop(ends=[0])
print(f"Fetched {len(results)} items in {elapsed:.2f}s")
# ~0.4s with 8 threads vs ~3.2s sequential
Example 7: Sub-interpreter workers (Python 3.14+)¶
Sub-interpreters provide true CPU parallelism with lower overhead than processes:
import sys
from numap import NuMap, HASINTERP
def cpu_work(inbox):
"""CPU-bound: sum of squares."""
return sum(i * i for i in range(inbox * 100000))
if __name__ == '__main__':
if not HASINTERP:
print(f"Python {sys.version_info[:2]} — sub-interpreters not available")
sys.exit(0)
nmap = NuMap(worker_type='interpreter', worker_num=4, stride=4)
nmap.add_task(cpu_work, [10] * 8)
nmap.start()
results = list(nmap)
nmap.stop(ends=[0])
print(f"Computed {len(results)} results with sub-interpreters")
Note
Sub-interpreter workers require Python 3.14+. On earlier versions,
HASINTERP is False. Use worker_type='process' as a fallback.
Example 8: Error handling in pipelines¶
PaPy catches exceptions inside workers and propagates them downstream as
PiperError instances. The pipeline continues processing other items:
from papy import Worker, Piper, Plumber, PiperError
def safe_divide(inbox):
"""Divide 100 by the input — may raise ZeroDivisionError."""
return 100 / inbox[0]
def report(inbox):
"""Print result or error."""
value = inbox[0]
if isinstance(value, PiperError):
print(f" error: {value.args[0]}")
else:
print(f" result: {value}")
if __name__ == '__main__':
p1 = Piper(Worker(safe_divide))
p2 = Piper(Worker(report))
plumber = Plumber()
plumber.add_pipe((p1, p2))
plumber.start([[5, 0, 10, 0, 2]])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
Output:
The pipeline processed all 5 items despite two ZeroDivisionError exceptions.
Example 9: Worker function arguments¶
Workers can carry extra positional and keyword arguments that are passed to the function alongside each input item:
from papy import Worker
def multiply(inbox, factor):
return inbox[0] * factor
def power(inbox, exponent=2):
return inbox[0] ** exponent
# positional args
w1 = Worker(multiply, (3,)) # multiply by 3
print(w1([7])) # 21
# keyword args
w2 = Worker(power, kwargs={'exponent': 3})
print(w2([4])) # 64
# composed: multiply by 3, then cube
w3 = Worker((multiply, power), ((3,), ()), kwargs=(None, {'exponent': 3}))
print(w3([2])) # (2*3)^3 = 216
Example 10: Using built-in utility functions¶
PaPy provides utility functions in papy.util.func for common data-flow
operations:
from papy import Worker, Piper, Plumber
from papy.util.func import ipasser, njoiner
def source_a(inbox):
return f"hello-{inbox[0]}"
def source_b(inbox):
return f"world-{inbox[0]}"
if __name__ == '__main__':
pa = Piper(Worker(source_a), branch=0)
pb = Piper(Worker(source_b), branch=1)
# njoiner joins all inputs with a separator
p_join = Piper(Worker(njoiner, kwargs={'join': ' + '}))
plumber = Plumber()
plumber.add_pipe((pa, p_join))
plumber.add_pipe((pb, p_join))
plumber.start([range(3)])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()