Skip to content

FAQ

My pipeline hangs on exit. What do I do?

The Python interpreter cannot exit while NuMap worker threads or processes are alive. This happens when a pipeline is not stopped cleanly. Always call plumber.stop() after a pipeline finishes:

plumber.start([data])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()  # required!

If the pipeline is stuck (e.g., during development), you can force-stop:

plumber.pause()
plumber.stop()

As a last resort, press Ctrl+C twice.

When should I use threads vs. processes vs. sub-interpreters?

Worker type Best for Python version
'process' CPU-bound tasks 3.12+
'thread' IO-bound tasks (network, disk) 3.12+
'interpreter' CPU-bound tasks with lower overhead 3.14+

Use 'thread' when your function waits on I/O (HTTP requests, file reads, database queries). Use 'process' or 'interpreter' when your function does heavy computation. Sub-interpreters have lower spawn overhead than processes but require Python 3.14+.

How do I pass arguments to a worker function?

Use the second argument to Worker for positional args and kwargs for keyword arguments:

# positional: multiply(inbox, 3)
w = Worker(multiply, (3,))

# keyword: power(inbox, exponent=3)
w = Worker(power, kwargs={'exponent': 3})

Can I use lambda functions or closures as workers?

No — at least not with worker_type='process'. Process-based workers require functions to be picklable, which means they must be defined at the top level of a module. Lambdas, closures, and nested functions cannot be pickled.

Thread-based and interpreter-based workers work within a single process and can use any callable, but for consistency it is recommended to use top-level functions.

What does the @imports decorator do?

The @imports decorator attaches import statements to a function definition. This is required for functions sent to remote RPyC workers, where the remote Python process may not have the same imports as the local one:

from numap import imports

@imports(['numpy', 'scipy.stats'])
def compute(inbox):
    return numpy.mean(inbox[0])

The decorator also supports fallback imports:

@imports(['ujson,json'])  # try ujson first, fall back to json
def parse(inbox):
    return ujson.loads(inbox[0])

What is the difference between consume, produce, and spawn?

These Piper parameters control non-1:1 input/output mappings:

  • consume=N: batch N input items into one evaluation
  • produce=N: unpack one evaluation result into N output items
  • spawn=N: create N implicit copies of the Piper

See the Produce / Spawn / Consume page for details and examples.

How do I access intermediate results?

Set track=True on a Piper to store its results in Plumber.stats['pipers_tracked']:

p = Piper(Worker(my_func), parallel=nmap, track=True)
# ... build and run pipeline ...
plumber.wait()
plumber.pause()
plumber.stop()
print(plumber.stats['pipers_tracked'][p])

My pipeline deadlocks. How do I debug it?

Common causes of deadlock:

  1. Buffer too small: if buffer < stride, the pipeline cannot make progress. Use the default buffer (auto-calculated) unless you have a specific reason to reduce it.

  2. Timeout on chained tasks: do not set timeout on upstream tasks within a shared NuMap. A timed-out upstream result blocks the downstream task.

  3. Output not consumed: if a pipeline has multiple output Pipers and you only consume one, the other's buffer fills and blocks the entire NuMap.

  4. Wrong consume/produce/spawn balance: the constraint produce * spawn (upstream) = consume * spawn (downstream) must hold for every connected pair.

Debugging steps:

  1. Enable NuMap debug logging:

    import logging
    logging.getLogger('numap').setLevel(logging.DEBUG)
    

  2. Check if the putter or getter thread is blocked (look for "waits for semaphore" or "waits for next task" in the log).

  3. Reduce the problem: use a small input (3-5 items), worker_num=1, stride=1, and worker_type='thread' to isolate the issue.

Can I reuse a Plumber after stopping it?

No. After stop() the Plumber's state is cleared and internal connections are broken. Create a new Plumber instance for each run.

How do I check if sub-interpreters are available?

from numap.NuMap import HASINTERP
print(HASINTERP)  # True on Python 3.14+, False otherwise

You can use this to write portable code:

from numap import NuMap
from numap.NuMap import HASINTERP

if HASINTERP:
    nmap = NuMap(worker_type='interpreter', worker_num=4)
else:
    nmap = NuMap(worker_type='process', worker_num=4)