Skip to content

Optimization

The throughput of a pipeline will be most significantly limited by the slowest Piper. A processing node might be slow either because it does a CPU-intensive or IO-intensive task, because it waits for some data, or because it synchronizes with other nodes and waits.

Identifying bottlenecks

As a general rule you should optimize the bottleneck(s) only. Therefore it is critical to understand where and what the bottleneck is.

This has good reason as most of your nodes will not limit the throughput of the workflow while parallelization is quite expensive. If your pipeline has no obvious bottleneck it's probably fast enough. If not you might be able to use a shared pool.

Understanding bottlenecks

There are three main categories of bottlenecks:

CPU-bound: the worker function performs heavy computation (number crunching, parsing, compression). The function occupies the CPU for most of its execution time.

  • Fix: assign a NuMap with worker_type='process' (or 'interpreter' on Python 3.14+) and worker_num equal to the number of CPU cores.

IO-bound: the worker function waits for external resources (network requests, disk reads, database queries). The CPU is mostly idle.

  • Fix: assign a NuMap with worker_type='thread' and a high worker_num (8-64 depending on latency).

Synchronization-bound: the pipeline topology forces sequential execution. This happens when a downstream Piper depends on results from a slow upstream Piper and both share the same NuMap.

  • Fix: assign separate NuMap instances to the two Pipers so they run in independent pools.

How to measure

Use Plumber.stats['run_time'] for total elapsed time. For per-node timing, add timing instrumentation inside the worker function:

import time

def timed_func(inbox):
    t0 = time.monotonic()
    result = expensive_work(inbox[0])
    elapsed = time.monotonic() - t0
    print(f"timed_func: {elapsed:.3f}s")
    return result

Stride and buffer tuning

The stride and buffer parameters of NuMap control the trade-off between parallelism and memory consumption.

Parameter Effect of increasing Effect of decreasing
stride More items in-flight per task; better utilization of workers Less memory; more synchronization overhead
buffer More items buffered across all tasks; smoother flow Less memory; earlier backpressure

Rules of thumb:

  • Set stride >= worker_num so all workers stay busy.
  • The default buffer = stride * num_tasks is safe for all topologies. Only reduce it if memory is a constraint.
  • For chained tasks, the minimum safe buffer is stride (one batch in-flight per task at a time).
nmap = NuMap(
    worker_type='process',
    worker_num=4,
    stride=8,    # 2x workers for overlap
    buffer=16,   # 2 strides of headroom
)

Unordered results

By default, NuMap returns results in the same order as the input (ordered=True). This requires holding earlier results in memory while waiting for slower items. Setting ordered=False on the NuMap returns results in the order they complete:

nmap = NuMap(worker_type='process', worker_num=4, ordered=False)
p = Piper(Worker(variable_time_func), parallel=nmap)

Unordered mode does not compute faster — it makes results available sooner. This helps when:

  • Computation time varies significantly across items.
  • A downstream Piper sharing the same NuMap can start earlier.
  • You do not need to correlate output order with input order.

Warning

Do not use ordered=False in branching pipelines unless you are certain the downstream merge point does not depend on item ordering.

Note

The ordered parameter belongs to NuMap, not Piper. If multiple Pipers share a NuMap, they all inherit its ordering behavior.

Addressing serialization

When worker_type='process', all data passing between the manager process and worker processes is serialized via pickle. This has two costs:

  1. CPU time for pickle.dumps() / pickle.loads() on every item.
  2. Memory for the serialized byte buffer.

Strategies to reduce serialization overhead

Use threads for small items: if your data is small and your function releases the GIL (NumPy, I/O), worker_type='thread' avoids serialization entirely.

Use sub-interpreters (Python 3.14+): worker_type='interpreter' avoids fork/spawn overhead while still requiring serialization for items crossing the interpreter boundary.

Reduce item size: pass file paths or database keys instead of large objects. Let the worker function load the data inside its own process.

def process_file(inbox):
    """Worker receives a file path, not the file contents."""
    import pandas as pd
    path = inbox[0]
    df = pd.read_csv(path)
    # ... process df ...
    return result

Collapse pipers: if two consecutive Pipers use the same NuMap, their data crosses the process boundary twice. Composing them into a single Worker eliminates the intermediate serialization:

# Instead of two pipers with two serialization round-trips:
# p1 = Piper(Worker(func_a), parallel=nmap)
# p2 = Piper(Worker(func_b), parallel=nmap)

# Compose into one:
p = Piper(Worker((func_a, func_b)), parallel=nmap)

Distributing computational resources

As a general rule you most likely should not use a shared NuMap instance among all Pipers within a workflow.

If the throughput of your pipeline is limited by a CPU-intensive task you should parallelize this node. PaPy allows to parallelize CPU-bound Pipers. The amount of CPU-power should be proportional to the computational requirements of a processing task. The number of recommended NuMap pool worker processes should equal or slightly exceed the number of physical CPU-cores on each local or remote computer.

Shared vs. separate pools

Approach Pros Cons
Shared NuMap Fewer total processes; simpler resource management Stride interleaving adds latency; one slow task blocks others
Separate NuMaps Independent parallelism per node; no cross-task blocking More total processes; higher memory usage

Recommended pattern: use a shared NuMap for lightweight nodes and a dedicated NuMap for the bottleneck node.

nmap_heavy = NuMap(worker_type='process', worker_num=8)
nmap_light = NuMap(worker_type='thread', worker_num=2)

p_parse = Piper(Worker(parse), parallel=nmap_light)
p_compute = Piper(Worker(heavy_compute), parallel=nmap_heavy)
p_write = Piper(Worker(write_output), parallel=nmap_light)