FAQ¶
My pipeline hangs on exit. What do I do?¶
The Python interpreter cannot exit while NuMap worker threads or processes are
alive. This happens when a pipeline is not stopped cleanly. Always call
plumber.stop() after a pipeline finishes:
If the pipeline is stuck (e.g., during development), you can force-stop:
As a last resort, press Ctrl+C twice.
When should I use threads vs. processes vs. sub-interpreters?¶
| Worker type | Best for | Python version |
|---|---|---|
'process' |
CPU-bound tasks | 3.12+ |
'thread' |
IO-bound tasks (network, disk) | 3.12+ |
'interpreter' |
CPU-bound tasks with lower overhead | 3.14+ |
Use 'thread' when your function waits on I/O (HTTP requests, file reads,
database queries). Use 'process' or 'interpreter' when your function does
heavy computation. Sub-interpreters have lower spawn overhead than processes
but require Python 3.14+.
How do I pass arguments to a worker function?¶
Use the second argument to Worker for positional args and kwargs for
keyword arguments:
# positional: multiply(inbox, 3)
w = Worker(multiply, (3,))
# keyword: power(inbox, exponent=3)
w = Worker(power, kwargs={'exponent': 3})
Can I use lambda functions or closures as workers?¶
No — at least not with worker_type='process'. Process-based workers
require functions to be picklable, which means they must be defined at the
top level of a module. Lambdas, closures, and nested functions cannot be
pickled.
Thread-based and interpreter-based workers work within a single process and can use any callable, but for consistency it is recommended to use top-level functions.
What does the @imports decorator do?¶
The @imports decorator attaches import statements to a function definition.
This is required for functions sent to remote RPyC workers, where the
remote Python process may not have the same imports as the local one:
from numap import imports
@imports(['numpy', 'scipy.stats'])
def compute(inbox):
return numpy.mean(inbox[0])
The decorator also supports fallback imports:
@imports(['ujson,json']) # try ujson first, fall back to json
def parse(inbox):
return ujson.loads(inbox[0])
What is the difference between consume, produce, and spawn?¶
These Piper parameters control non-1:1 input/output mappings:
consume=N: batch N input items into one evaluationproduce=N: unpack one evaluation result into N output itemsspawn=N: create N implicit copies of thePiper
See the Produce / Spawn / Consume page for details and examples.
How do I access intermediate results?¶
Set track=True on a Piper to store its results in
Plumber.stats['pipers_tracked']:
p = Piper(Worker(my_func), parallel=nmap, track=True)
# ... build and run pipeline ...
plumber.wait()
plumber.pause()
plumber.stop()
print(plumber.stats['pipers_tracked'][p])
My pipeline deadlocks. How do I debug it?¶
Common causes of deadlock:
-
Buffer too small: if
buffer < stride, the pipeline cannot make progress. Use the default buffer (auto-calculated) unless you have a specific reason to reduce it. -
Timeout on chained tasks: do not set
timeouton upstream tasks within a sharedNuMap. A timed-out upstream result blocks the downstream task. -
Output not consumed: if a pipeline has multiple output
Pipersand you only consume one, the other's buffer fills and blocks the entire NuMap. -
Wrong consume/produce/spawn balance: the constraint
produce * spawn (upstream) = consume * spawn (downstream)must hold for every connected pair.
Debugging steps:
-
Enable NuMap debug logging:
-
Check if the putter or getter thread is blocked (look for "waits for semaphore" or "waits for next task" in the log).
-
Reduce the problem: use a small input (3-5 items),
worker_num=1,stride=1, andworker_type='thread'to isolate the issue.
Can I reuse a Plumber after stopping it?¶
No. After stop() the Plumber's state is cleared and internal connections
are broken. Create a new Plumber instance for each run.
How do I check if sub-interpreters are available?¶
You can use this to write portable code: