Skip to content

Runtime (Logging, Interaction, Errors and Timeouts)

A started workflow evaluates the functions of the processing nodes on the data items traversing the pipeline using the assigned computational resources. The execution happens in the background i.e. the user has the choice to interact with the running pipeline via the Plumber object. Further components of a workflow i.e. Piper and NuMap instances log runtime messages. These logs can be saved to disk and inspected when needed.

Logging behavior is customized by papy.util.config, interaction is possible through papy.core.Plumber, exceptions, errors and timeouts are handled by the processing nodes i.e. papy.core.Piper instances.

Plumber lifecycle

The Plumber follows a strict state machine. Calling methods out of order raises PlumberError:

stateDiagram-v2
    [*] --> started : start(datas)
    started --> running : run()
    running --> paused : pause()
    running --> finished : all items consumed
    paused --> running : run()
    paused --> stopped : stop()
    finished --> stopped : stop()
    stopped --> [*]
State Description
started Pipeline connected and NuMap pools active. No results being retrieved yet.
running Background plunger thread is pulling results from output Pipers.
paused Plunger thread stopped. Buffered results remain. NuMap pools stay alive.
finished All input consumed and all results retrieved. Pipeline ready to stop.
stopped All threads/processes joined. Stats available.

Typical usage

The correct lifecycle to run a pipeline to completion is:

plumber = Plumber()
plumber.add_pipe((p1, p2, p3))

plumber.start([input_data])   # connect pipers + start NuMaps
plumber.run()                  # start pulling results (background thread)
plumber.wait()                 # block until all items consumed
plumber.pause()                # join plunger thread, clear running state
plumber.stop()                 # shut down NuMaps, disconnect, record stats

Warning

stop() requires the pipeline to not be running. Always call pause() before stop(). Calling wait() before pause() ensures all results are consumed before shutting down.

Pausing and resuming

A running pipeline can be paused before completion. While paused, no new results are retrieved, but NuMap workers continue until their buffer fills. A paused pipeline can be either run again or stopped:

plumber.start([data])
plumber.run()
# ... some time later ...
plumber.pause()    # stop pulling results (may lose buffered items)
# inspect state, adjust parameters, etc.
plumber.run()      # resume pulling results
plumber.wait()     # block until finished
plumber.pause()    # clear running state
plumber.stop()

Warning

A paused pipeline cannot be "rewound". If input items were consumed before the pause, they will not be re-evaluated.

Pipeline statistics

After a pipeline runs, Plumber.stats contains timing and tracking data:

Key Type Description
start_time float Unix timestamp when start() was called.
run_time float Elapsed seconds from start() to stop().
pipers_tracked dict Results for Pipers with track=True (see below).

Tracking results

By default, results flow through the pipeline and are discarded after the output Piper processes them. To retain intermediate results, set track=True on a Piper:

p_compute = Piper(Worker(my_func), parallel=nmap, track=True)

plumber = Plumber()
plumber.add_pipe((p_input, p_compute, p_output))
plumber.start([data])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()

# access tracked results
for piper, results in plumber.stats['pipers_tracked'].items():
    print(piper, results)

Note

Tracking stores results in memory. For large datasets, consider writing results to disk in the output worker instead.

Logging

PaPy uses Python's built-in logging module. Both papy and numap loggers are available:

import logging
from papy.util.config import start_logger

# log to console at INFO level
start_logger(log_to_stream=True, log_level=logging.INFO)

# log to file at DEBUG level (very verbose)
start_logger(log_to_file='pipeline.log', log_level=logging.DEBUG)

Logger names:

Logger Source
papy Pipeline operations (Piper, Dagger, Plumber)
numap NuMap pool operations (task submission, results, sentinels)

Tip

Set numap to DEBUG level to diagnose hanging pipelines — it logs every task submission, result retrieval, and sentinel.

Errors and exceptions

PaPy workflows are by default resistant to all exceptions that occur in user-provided worker functions. The exception propagation chain:

  1. Worker function raises Exception
  2. Worker catches it and returns (not raises) a WorkerError
  3. Piper wraps it as a PiperError and passes it downstream
  4. Downstream Pipers receiving a PiperError as input skip evaluation and propagate it further
  5. Output Piper logs the error

This means a single bad input item does not crash the entire pipeline.

Debug mode

Set debug=True on a Piper to raise exceptions immediately instead of wrapping them. This is useful during development but will hang the interpreter after the error:

p = Piper(Worker(my_func), debug=True)

Dealing with timeouts

Timeouts control how long a Piper waits for a result from its NuMap. If a result is not available within the specified time, a PiperError wrapping a TimeoutError is returned.

p = Piper(Worker(slow_func), parallel=nmap, timeout=5)
Parameter Where Effect
timeout on Piper Piper.__init__ Seconds to wait per result before returning PiperError(TimeoutError)
skip on NuMap NuMap.__init__ If True, timed-out results are skipped; if False, retrieval is retried
timeout on Plumber.wait() Plumber.wait Seconds to block waiting for pipeline to finish

Warning

Do not specify timeouts for chained tasks within a single NuMap instance. A timeout on an upstream task will cause downstream tasks to deadlock because they depend on the upstream result.

nmap = NuMap(worker_type='process', worker_num=4, skip=True)
p = Piper(Worker(unreliable_func), parallel=nmap, timeout=30)

With skip=True, timed-out items are dropped and the pipeline continues. Without it, the pipeline retries the same result indefinitely.