Skip to content

About the Parallelism in PaPy

This document explains and gives code snippets how to use the parallel features of PaPy. The first paragraphs introduce the map function and the NuMap object. The types of parallelism, are explained in the later sections where also typical optimizations, bottlenecks of pipelines are covered.

Parallel functionality is provided by the NuMap class. In the most basic mode of operation it can be called exactly like map or the multiprocessing Pool imap method. Setting additional options allows to parallelise the evaluation using Workers of threads, processes or remote processes and share those workers among multiple functions and inputs. A unique feature of NuMap is that it allows to fine-tune the memory-consumption, parallelism and laziness trade-off of nested function maps.

The interpreter will hang on exit if a pipeline does not finish or is halted abnormally

The Python interpreter exits (returns) if all spawned threads or forked processes return. PaPy uses multiple threads to manage the pipeline and evaluates functions in separate threads or processes. All of them need to be stopped before the parent python process can return. This is done automatically whenever a pipeline finishes or some expected exception occurs, in all other cases it is required that the user stops the pipeline manually.

Map basics

A map function applies a function to the items of a sequence. This is in Python expressed with the following syntax:

def power(x):
    return x*x

inp_list = [1,2,3,4]
out_list = list(map(power, inp_list))

The resulting output is [1, 4, 9, 16]. Though unmatched in simplicity this function has several computational drawbacks.

  1. results are evaluated sequentially i.e. power(1), power(2), power(3), power(4) and on a single processor.

  2. The function returns only after all results have been calculated.

  3. The list of results has to fit into memory together with the input.

  4. The results are always returned in order.

The last two issues can be addressed by using the map function as a lazy iterator. Its usage is almost as simple:

inp_list = [1,2,3,4]
out_iterator = map(power, inp_list)
first_result = next(out_iterator)  # returns power(1)
second_result = next(out_iterator) # returns power(2)

There are however a number of differences between the two. The map function returns a result object immediately, but this object is only a link (via the next(out_iterator) call) to the next result to be calculated. The evaluation starts as soon as next is called (lazy evaluation) and the result is returned as soon as it is calculated. The function, argument tuples are still evaluated sequentially and on a single processor, but they are returned as soon as they are needed and only a single result needs to fit into memory.

Python 3.12+ provides implementations of a parallel map where results are evaluated by a pool of worker processes. This functionality comes in three flavours:

from multiprocessing import Pool
pool = Pool()
out_list = pool.map(power, inp_list)
out_iterator1 = pool.imap(power, inp_list)
next(out_iterator1, timeout=1)
out_iterator2 = pool.imap_unordered(power, inp_list)

Warning

This example like most of the following is a code snippet. It is or should be syntactically correct. But might outside of a valid script file.

The Pool.map does exactly the same as the simple map function, but it uses by default all availble cores. This addresses the second drawback. Both imap methods return a result object with the next method like map. Calling it returns the next calculated (imap_unordered) or expected result (imap). The next method has an optional timeout argument i.e. if no result is availble within the limit a TimeoutError is raised. Although it might seem that those implementations have none of the above mentioned drawbacks several implementation choices make them inappropriate for constructing pipelines.

A pipeline is a nested imap

Map functions have a list as input and return a list so they can be nested i.e.:

from math import degrees, radians
list(map(degrees, map(radians, [1,2,3])))
[1.0, 2.0, 3.0]

In this example the function radians is first applied to the elements of the input list then the the results are back-converted to degrees. The order of evaluation:

temp_list = []
temp_list.append(radians(1))
temp_list.append(radians(2))
temp_list.append(radians(3))
result_list = []
result_list.append(degrees(temp_list[0]))
result_list.append(degrees(temp_list[1]))
result_list.append(degrees(temp_list[2]))
<< return list of results >>

If we use a lazy version of the map function i.e.:

map(degrees, map(radians, [1,2,3]))
[1.0, 2.0, 3.0]

The order of calculation changes:

temp_result = radians(1)
result = degree(temp_result)
<<return result>>
temp_result = radians(2)
result = degree(temp_result)
<<return result>>
temp_result = radians(3)
result = degree(temp_result)
<<return result>>

Note that only one temporary result needs to be stored at any given time and that the first result is returned after only two calculations. But multiprocessing Pools imap function yield yet another calculation order.

from multiprocessing import Pool
pool = Pool()
pool.imap(degrees, pool.imap(radians, [1,2,3]))
[1.0, 2.0, 3.0]

What happens is a little bit unexpected, first the function is evaluated (by multiple processes) and only after all temporary results are calculated the second functions is iteratively applied:

temp_list = []
temp_list.append(radians(1))
temp_list.append(radians(2))
temp_list.append(radians(3))
result = degree(temp_result[0])
<<return result>>
result = degree(temp_result[1])
<<return result>>
result = degree(temp_result[2])
<<return result>>

The results are either returned imediately or stored in a result list. The maximum size of this list is the size of temporary list and the size of the input. The reason for this behaviour is the order by which tasks i.e. (function, data) tuples are submitted to the pool. If one pool handles two functions first all (radians, x) tuples are submited and then all (degrees, x). The outer function is evaluated last so for multiple and computationally expensive functions the first result might be availble after a long lag phase followed by a burst of results.

This problem can be solved by having a separate pool for each function.

from multiprocessing import Pool
pool1 = Pool()
pool2 = Pool()
pool2.imap(degrees, pool1.imap(radians, [1,2,3]))
[1.0, 2.0, 3.0]

Now the execution order is not defined anymore as processes in the two Pools compete for CPU time from the OS. A possible evaluation order might be like this:

temp_list = []
temp_list.append(radians(1))
temp_list.append(radians(2))
result = degree(temp_result[0])
<<return result>>
result = degree(temp_result[1])
<<return result>>
temp_list.append(radians(3))
result = degree(temp_result[2])
<<return result>>

As you can see a temporary result list is still built. Its maximum lenght is not predictable and limited by the lenght of the input. Another drawback is that if the number of functions is big the number of process-pool workers significantly exceeds the number of availble CPUs or CPU-cores, which is inefficient.

NuMap internal architecture

The following diagram shows the internal threading architecture of NuMap. Two manager threads (pool_putter and pool_getter) mediate between the user-facing API and the worker pool:

flowchart LR
    subgraph User API
        AT["add_task()"]
        NX["next()"]
    end

    subgraph Manager Threads
        PP["pool_putter\nthread"]
        PG["pool_getter\nthread"]
    end

    subgraph "Worker Pool"
        W1["Worker 1"]
        W2["Worker 2"]
        W3["Worker N"]
    end

    AT --> WV["_Weave\n(interleaves tasks\nby stride)"]
    WV --> PP
    PP -->|"in_queue"| W1
    PP -->|"in_queue"| W2
    PP -->|"in_queue"| W3
    W1 -->|"out_queue"| PG
    W2 -->|"out_queue"| PG
    W3 -->|"out_queue"| PG
    PG -->|"per-task\nresult queues"| NX

    SEM["Semaphore\n(buffer limit)"] -.->|"controls"| PP

The _Weave object interleaves tasklets from multiple tasks according to the stride parameter. The semaphore enforces the buffer limit — preventing unbounded memory growth. Workers can be threads, processes, or sub-interpreters depending on worker_type.

The task and the tasklet

The imap implementation in PaPy (NuMap) is different as it allows to control the order by which function and data tuples are submitted to the worker pool. It introduces the concept of a task which is a function, input and arguments tuple. The input is a python iterator i.e. an object which has a next method it obviously should return data to be calculated next. The argument is a tuple of parameters which is given to the function for example:

(function, data_iterator, ('rome', 1.17, some_object))   # a task

The NuMap role is to evaluate tasks. To evaluate a task means to evaluate all tasklets:

(function, next(data_iterator), 'rome', 1.17, someobject) # a tasklet
result = function(next(data_iterator), 'rome', 1.17, someobject) # evaluation

Until the data_iterator is empty.

NuMap parallelism is defined by a stride

The stride controls how tasks are interleaved. With two chained tasks and stride=2, the evaluation order looks like this:

sequenceDiagram
    participant PP as pool_putter
    participant Pool as Worker Pool
    participant PG as pool_getter

    Note over PP,PG: stride = 2

    PP->>Pool: radians(1)
    PP->>Pool: radians(2)
    Pool->>PG: result_0
    Pool->>PG: result_1
    PP->>Pool: degrees(result_0)
    PP->>Pool: degrees(result_1)
    Pool->>PG: final_0
    Pool->>PG: final_1

    Note over PP,PG: next stride

    PP->>Pool: radians(3)
    PP->>Pool: radians(4)
    Pool->>PG: result_2
    Pool->>PG: result_3
    PP->>Pool: degrees(result_2)
    PP->>Pool: degrees(result_3)
    Pool->>PG: final_2
    Pool->>PG: final_3

NuMap allows to control the order in which tasklets are evaluated. This is accomplished by the stride parameter. A stride is the number of tasklets from one task submitted before any tasklet from the next task. The default stride is equal to the number of pool workers and should not be smaller.

from numap import NuMap
from math import radians, degrees
nmap = NuMap(worker_num=2)
output = Imap.add_task(radians, [1,2,3,4])
result = Imap.add_task(degrees, output)
Imap.start() # finished adding tasks
next(result) # or Imap.next(task=1) 0 is the first task

In this example the Imap instance has a pool with two workers (by default those workers are separate processes), its default stride is therefore 2. The order in which the tasks will be evaluated is as follows.

temp_list = []
result_list = []
temp_list.append(radians(1))
temp_list.append(radians(2))
result_list.append(degree(temp_list[0]))
<<return result>>
result_list.append(degree(temp_list[1]))
<<return result>>
temp_list = []
result_list = []
temp_list.append(radians(3))
temp_list.append(radians(4))
result_list.append(degree(temp_list[2]))
<<return result>>
result_list.append(degree(temp_list[3]))
<<return result>>

The temp_list (in fact it is a queue) has a defined size limit (stride) and so is the result_list. The details of memory consumption will be explained in the next paragraphs here it suffices to say that a minimum memory requirement of 2 temporary results can ben enfored on this pipeline without loss of efficiency. If the pipeline was longer and had computationally expensive functions it would be noticable that the results from the outer function arrive in burst of 2 or bursts of stirde size.

NuMap needs tasks in the right order

In the previous example two nested tasks have been added to the NuMap function using the add_task method. The general way of working with NuMap is as follows:

#0. import NuMap
from numap import NuMap
#1. define imap keyworded parameters e.g.
numap_instance = NuMap(worker_remote=[['host', 2]])
#2. add tasks
out0 = numap_instance.add_task(function0, input_data)
out1 = numap_instance.add_task(function1, out0)
out2 = numap_instance.add_task(function2, other_data)
#3. start the evaluation
numap_instance.start()
#4.
<< get the results >>

In this section we will focus on step 2. We have submitted 3 tasks to the imap instance. Because the order of submission matters they will be evaluated in the order.

# first stride
function0(input_data[0 .. n]) # where n is stride
function1(out1[0 .. n])
function2(other_data[0 .. n])

# second stride
function0(input_data[n .. n+n]) # where n is stride
function1(out1[n .. n+n])
function2(other_data[n .. n+n])1

# and so on

Because function1 depends on the results from function0 it can't be added as a task before function0. It might seem impossible because function1 takes the output of function 0 (out0) as an argument, but in general the input could be an object created before out0, which is modified with out0 after creation. This is possible because evaluation starts only after the start method is called.

NuMap can limit the memory consumption.

By default the maximum memory consumption of an NuMap instance is equal to the number of tasks times the stide size, but this limit can be changed. Consider an imap instance with two tasks, which are not nested and a stride of 3, this means that the default maximum memory consumption is 6. The evaluation will pause whenever the NuMap instance reaches the limit. In pseudo-python:

list0 = []
list1 = []
# a stride of 3
list0.append(function0(arg0))
list0.append(function0(arg1))
list0.append(function0(arg2)) # list0 has size 3
list1.append(function1(arg0))
list1.append(function1(arg1))
list1.append(function1(arg2)) # list1 has size 3

Because at this moment the two list have together a lenght of 6 no further evaluations takes place. The only way to clear a list is to get results from the output iterators (say out0 for function0 and out1 for function1). If we take a single result say:

result_0_0 = next(out0) # submits function0(arg3) to the pool

memory consumption lowers to 5 and the next task is submitted to the pool.

list0.append(function0(arg3))

memory consumption is once again at 6 and the next task (function0, arg4) waits. By retrieving results from the output iterators we free the temporary result lists (queues) and allow evaluation to proceed. Results do not have to be retrieved in the order the tasks have been submitted to the pool or the order in which the results have been calculated. Assume that in the last example the next method of next(out0) has been called 5 more times:

result_0_1 = next(out0) # submits function0(arg4) to the pool
result_0_2 = next(out0) # submits function0(arg5) to the pool
result_0_3 = next(out0) # submits function1(arg3) to the pool !note 1
result_0_4 = next(out0) # submits function1(arg4) to the pool !note 1
result_0_5 = next(out0) # submits function1(arg5) to the pool !note 1

after all workers finish list0 the imap reaches a stage where:

  • list0 - will be empty
  • list1 - will have 6 results (arg0 - arg5)
  • task (function0, arg6) will wait to be submitted

List0 is empty and the next task (function0, arg6) cannot be submitted because the total memory consumption is 6. If we would call next(out0) the result would never arrive and the python interpreter would be blocked. A timeout argument can be supplied it causes the next method to raise a TimeoutError if after the specified number of seconds no result is availble.

result_0_6 = next(out0, timeout=2) # raise after 2 seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
multiprocessing.TimeoutError

We have to empty the other functions output (out1) to get the 7th result for out0. Note that the order of task submissions is defined at start by how the memory is freed (the order in which next(out0) and next(out1) are called) does not change this order:

result_1_0 = next(out1) # submits function0(arg6) to the pool
result_0_6 = next(out0) # submits function0(arg7) to the pool

To never run into an block only it is necessary to:

  • retrieve at most stride number of results from any output in a sequence
  • retrieve the result n from outputN before the result n from outputN+1

The most memory efficient way to do this is to get the results in batches of stride size, which is equivalent to the order the tasks have been submitted to the worker pool. If the two functions from the above example were nested this would happen automatically for the inner function. If the the pipeline run in the most memory efficient way the memory consumption can be lowered to the stride of the NuMap this is done using the buffer argument. For example:

nmap = NuMap(stride=3, buffer=5)

Parallel: local vs. remote and threads vs. processes

NuMap supports parallelization using local threads (worker_type='thread'), processes (worker_type='process'), and sub-interpreters (worker_type='interpreter'). Remote threads and processes are run by local processes. NuMap is designed to allow the user to choose the type of parallelization using the worker_type argument and/or by specifying the remote processes using the worker_remote argument.

Choosing the right worker type

process thread interpreter
True CPU parallelism Yes No (GIL) Yes (per-interpreter GIL)
Spawn overhead High (fork/spawn) Low Low
Memory isolation Full None Partial (isolated globals)
Remote execution Yes (RPyC) Yes (RPyC) No
Minimum Python 3.12 3.12 3.14

Use 'process' for CPU-bound tasks on Python 3.12/3.13. Use 'thread' for IO-bound tasks (network, disk). Use 'interpreter' for CPU-bound tasks on Python 3.14+ where you want lower spawn overhead than processes.

Sub-interpreters (Python 3.14+)

Python 3.14 introduced sub-interpreters with per-interpreter GILs (PEP 684). Each sub-interpreter is a fully isolated Python runtime within the same OS process, with its own GIL. This means:

  • True CPU parallelism: multiple sub-interpreters execute bytecode simultaneously, unlike threads which are serialized by the GIL.
  • Low spawn overhead: no fork()/spawn() system call; interpreters are created in microseconds compared to milliseconds for processes.
  • No shared mutable state: globals are isolated per interpreter, avoiding race conditions without explicit locking.

When to use it: CPU-bound tasks on Python 3.14+ where process spawn overhead is measurable (short tasks, large worker counts) or where you want to avoid the pickling cost of inter-process communication.

Requirement: worker functions must be top-level importable callables (same requirement as process workers). Callable class instances (such as PaPy Worker objects) are automatically unwrapped — each function in the task chain is called individually in the sub-interpreter.

from numap import NuMap, HASINTERP

def square(x):
    return x * x

if HASINTERP:
    nmap = NuMap(worker_type='interpreter', worker_num=4)
else:
    nmap = NuMap(worker_type='process', worker_num=4)

nmap.add_task(square, range(100))
nmap.start()
results = list(nmap)
nmap.stop(ends=[0])

Note

Sub-interpreter support requires Python 3.14 or later. On Python 3.12 or 3.13, HASINTERP is False and specifying worker_type='interpreter' raises ImportError. The worker_remote argument is not supported with worker_type='interpreter'.

Because of the global interpreter lock (GIL) in the standard cPython implementation of the Python programming language, only one os-thread can execute interpreted python code. Multiple running threads are assigned timeslices of "code access". In general it is not possible to speed cpu-bound computations using threads. Multiple threads can however speed up certain functions to a certain degree if either the function is:

  • IO-bound (e.g. waits for server responses)
  • uses libraries which release the global interpreter lock

The first case applies to all function which depend on user interaction and other blocking Input/Output operations like reading or writing a file. the second case applies mostly to external compiled libraries doing cpu-intensive calculations. Those libraries can natively be multithreaded. An NuMap running tasks in worker threads is therfore a good choice if the tasks submitted to the function are IO-bound or if it uses a library which releases the GIL for significant periods of time.

The GIL can be circumvented by forking the python interpreter process instead of spawning additional threads within. Such forked processes have separate memory space and are seen by the operating system as another python interpreter. Multiple processes have each their own GIL and are therefore suited to parallelize intepreted python code. This parallelization will make the computation faster if the operating system has enough resources to support the processes. If a function is CPU-bound the most limiting resource is the CPU and therefore the number of processes should not exceed the number of availble CPUs. Using multiple processes within on parent process is called multiprocessing and Python 3.12+ supports this feature via the multiprocessing module out of the box on most operating systems (Linux, MacOSX, Windows). It is also possible to parallelize local computations using the RPyC library. For cpu-bound tasks the additional overhead is minor.

If a single machine is not fast enough for the task, distributed computing i.e. computation on remote physical computers might be considered. NuMap supports distributed computation using the RPyC library. To use this feature a "classic" RPyC server has to be running on a remote host. This server allows clients (i.e. NuMap instances) to connect to them and execute functions. An RPyC server can be both thread or process based. Only a process-based RPyC will be able to use multiple CPUs on the remote computer (not supported on Windows). An NuMap which connects to remote servers spawns a new process for each thread/process spawned/forked remotely (by the RPyC server).

Note

NuMap has no possibility to change the thread/process nature of the remote server.

In the following example snippet:

nmap = NuMap(worker_type='process', worker_num=2, worker_remote=[['host1',1], ['host2', 4]])

An NuMap instance is created which uses worker processes. It has a total of 7 local worker processes. Five of the local worker processes do the computation on remote hosts: 4 on host2 nad 1 on host1.

The following is illegal, because remote threads/processes can be managed only by 'process' workers:

nmap = NuMap(worker_type='process', worker_num=2, worker_remote=[['host1',1], ['host2', 4]])

The RPyC server can also be started on the local machine ('localhost'). In this example the NuMap instance has a total of 2 local worker processes which manage two remote processes which happen to exist on the same physical machine.

nmap = NuMap(worker_type='process', worker_num=0, worker_remote=[['localhost', 2]])

If the order of the results is not important

NuMap supports unordered results via the ordered argument:

nmap = NuMap(ordered=True)  # the default is ordered
nmap = NuMap(ordered=False) # random order of results

If the results are allowed to be unordered the evaluation might be significantly faster under certain circumstances described later, but this order is not reproducible. As a general advice do not use unordered Imap instances in branched papy pipelines, unless you really know what you are doing.

Timeouts and skipping

NuMap supports timeouts. A TimeoutError (from the multiprocessing module) is raised whenever a result for a given task cannot be returned within approximately the number of seconds specified. The timeout is only approximate because NuMap uses multiple threads to manage the input and output queues for the worker threads/process. The thread, which receives result might not not have access to the interpreter when the timeout passes.

The skipping argument allows to skip results which did timeout. If skipping is not specified NuMap will try to return the same result (for this task) once more. If the timeout is not specified skipping is ignored. If NuMap is used with nested tasks a timeout should in practice not be specified unless NuMap is used within from a piper object or the NuMap instance

and timeouts are specified the skipping argument should be true, the reason for this is that

The parallel stride revisited

In one of the previous sections it has been described how NuMap allows for parallelism by introducing the concept of the stride. To recapitulate a stride is the number of tasklets submitted to the pool for a specific task to be executed/evaluated in parallel. Another tasklet can be submitted if the buffer is larger than the stride or as soon as any of the results of the parallel evaluations is retrieved. The user can add multiple tasks to one NuMap instance. If the evaluation is CPU-bound and the NuMap uses worker processes an optimal speed up equal to the number of CPUs or CPU-cores. However because of inter task dependencies i.e. nested functions, the speed-up might be smaller as a result of the memory trade-off and task submission order.

  1. The output needs to be retrieved. If the output of an NuMap instance is not retrieved it will pause whenever it fill the buffer of temporary results, therefore it is important to retrieve the results as soon as they are ready. Because results generaly arrive in batches of stride size it is best to try to retrieve stride number of results from each task. If the NuMap is used within a papy pipeline the Plumber has a separate thread (started using the plunge method) dedicated to keep the NuMap evaluating.

  2. Variable tasklet calculation times

If the next task depends on the results from the first task then it's first tasklet