Optimization¶
The throughput of a pipeline will be most significantly limited by the slowest
Piper. A processing node might be slow either because it does a
CPU-intensive or IO-intensive task, because it waits for some data, or because
it synchronizes with other nodes and waits.
Identifying bottlenecks¶
As a general rule you should optimize the bottleneck(s) only. Therefore it is critical to understand where and what the bottleneck is.
This has good reason as most of your nodes will not limit the throughput of the workflow while parallelization is quite expensive. If your pipeline has no obvious bottleneck it's probably fast enough. If not you might be able to use a shared pool.
Understanding bottlenecks¶
There are three main categories of bottlenecks:
CPU-bound: the worker function performs heavy computation (number crunching, parsing, compression). The function occupies the CPU for most of its execution time.
- Fix: assign a
NuMapwithworker_type='process'(or'interpreter'on Python 3.14+) andworker_numequal to the number of CPU cores.
IO-bound: the worker function waits for external resources (network requests, disk reads, database queries). The CPU is mostly idle.
- Fix: assign a
NuMapwithworker_type='thread'and a highworker_num(8-64 depending on latency).
Synchronization-bound: the pipeline topology forces sequential execution.
This happens when a downstream Piper depends on results from a slow
upstream Piper and both share the same NuMap.
- Fix: assign separate
NuMapinstances to the twoPipersso they run in independent pools.
How to measure¶
Use Plumber.stats['run_time'] for total elapsed time. For per-node timing,
add timing instrumentation inside the worker function:
import time
def timed_func(inbox):
t0 = time.monotonic()
result = expensive_work(inbox[0])
elapsed = time.monotonic() - t0
print(f"timed_func: {elapsed:.3f}s")
return result
Stride and buffer tuning¶
The stride and buffer parameters of NuMap control the trade-off between
parallelism and memory consumption.
| Parameter | Effect of increasing | Effect of decreasing |
|---|---|---|
stride |
More items in-flight per task; better utilization of workers | Less memory; more synchronization overhead |
buffer |
More items buffered across all tasks; smoother flow | Less memory; earlier backpressure |
Rules of thumb:
- Set
stride >= worker_numso all workers stay busy. - The default
buffer = stride * num_tasksis safe for all topologies. Only reduce it if memory is a constraint. - For chained tasks, the minimum safe buffer is
stride(one batch in-flight per task at a time).
nmap = NuMap(
worker_type='process',
worker_num=4,
stride=8, # 2x workers for overlap
buffer=16, # 2 strides of headroom
)
Unordered results¶
By default, NuMap returns results in the same order as the input
(ordered=True). This requires holding earlier results in memory while
waiting for slower items. Setting ordered=False on the NuMap returns
results in the order they complete:
nmap = NuMap(worker_type='process', worker_num=4, ordered=False)
p = Piper(Worker(variable_time_func), parallel=nmap)
Unordered mode does not compute faster — it makes results available sooner. This helps when:
- Computation time varies significantly across items.
- A downstream
Pipersharing the sameNuMapcan start earlier. - You do not need to correlate output order with input order.
Warning
Do not use ordered=False in branching pipelines unless you are certain
the downstream merge point does not depend on item ordering.
Note
The ordered parameter belongs to NuMap, not Piper. If multiple
Pipers share a NuMap, they all inherit its ordering behavior.
Addressing serialization¶
When worker_type='process', all data passing between the manager process and
worker processes is serialized via pickle. This has two costs:
- CPU time for
pickle.dumps()/pickle.loads()on every item. - Memory for the serialized byte buffer.
Strategies to reduce serialization overhead¶
Use threads for small items: if your data is small and your function
releases the GIL (NumPy, I/O), worker_type='thread' avoids serialization
entirely.
Use sub-interpreters (Python 3.14+): worker_type='interpreter' avoids
fork/spawn overhead while still requiring serialization for items crossing the
interpreter boundary.
Reduce item size: pass file paths or database keys instead of large objects. Let the worker function load the data inside its own process.
def process_file(inbox):
"""Worker receives a file path, not the file contents."""
import pandas as pd
path = inbox[0]
df = pd.read_csv(path)
# ... process df ...
return result
Collapse pipers: if two consecutive Pipers use the same NuMap, their
data crosses the process boundary twice. Composing them into a single Worker
eliminates the intermediate serialization:
# Instead of two pipers with two serialization round-trips:
# p1 = Piper(Worker(func_a), parallel=nmap)
# p2 = Piper(Worker(func_b), parallel=nmap)
# Compose into one:
p = Piper(Worker((func_a, func_b)), parallel=nmap)
Distributing computational resources¶
As a general rule you most likely should not use a shared NuMap instance
among all Pipers within a workflow.
If the throughput of your pipeline is limited by a CPU-intensive task you
should parallelize this node. PaPy allows to parallelize CPU-bound
Pipers. The amount of CPU-power should be proportional to the computational
requirements of a processing task. The number of recommended NuMap pool
worker processes should equal or slightly exceed the number of physical
CPU-cores on each local or remote computer.
Shared vs. separate pools¶
| Approach | Pros | Cons |
|---|---|---|
| Shared NuMap | Fewer total processes; simpler resource management | Stride interleaving adds latency; one slow task blocks others |
| Separate NuMaps | Independent parallelism per node; no cross-task blocking | More total processes; higher memory usage |
Recommended pattern: use a shared NuMap for lightweight nodes and a
dedicated NuMap for the bottleneck node.