Runtime (Logging, Interaction, Errors and Timeouts)¶
A started workflow evaluates the functions of the processing nodes on the data
items traversing the pipeline using the assigned computational resources. The
execution happens in the background i.e. the user has the choice to interact
with the running pipeline via the Plumber object. Further components of
a workflow i.e. Piper and NuMap instances log runtime messages. These
logs can be saved to disk and inspected when needed.
Logging behavior is customized by papy.util.config, interaction is possible
through papy.core.Plumber, exceptions, errors and timeouts are handled by
the processing nodes i.e. papy.core.Piper instances.
Plumber lifecycle¶
The Plumber follows a strict state machine. Calling methods out of order
raises PlumberError:
stateDiagram-v2
[*] --> started : start(datas)
started --> running : run()
running --> paused : pause()
running --> finished : all items consumed
paused --> running : run()
paused --> stopped : stop()
finished --> stopped : stop()
stopped --> [*]
| State | Description |
|---|---|
started |
Pipeline connected and NuMap pools active. No results being retrieved yet. |
running |
Background plunger thread is pulling results from output Pipers. |
paused |
Plunger thread stopped. Buffered results remain. NuMap pools stay alive. |
finished |
All input consumed and all results retrieved. Pipeline ready to stop. |
stopped |
All threads/processes joined. Stats available. |
Typical usage¶
The correct lifecycle to run a pipeline to completion is:
plumber = Plumber()
plumber.add_pipe((p1, p2, p3))
plumber.start([input_data]) # connect pipers + start NuMaps
plumber.run() # start pulling results (background thread)
plumber.wait() # block until all items consumed
plumber.pause() # join plunger thread, clear running state
plumber.stop() # shut down NuMaps, disconnect, record stats
Warning
stop() requires the pipeline to not be running. Always call
pause() before stop(). Calling wait() before pause() ensures
all results are consumed before shutting down.
Pausing and resuming¶
A running pipeline can be paused before completion. While paused, no new results are retrieved, but NuMap workers continue until their buffer fills. A paused pipeline can be either run again or stopped:
plumber.start([data])
plumber.run()
# ... some time later ...
plumber.pause() # stop pulling results (may lose buffered items)
# inspect state, adjust parameters, etc.
plumber.run() # resume pulling results
plumber.wait() # block until finished
plumber.pause() # clear running state
plumber.stop()
Warning
A paused pipeline cannot be "rewound". If input items were consumed before the pause, they will not be re-evaluated.
Pipeline statistics¶
After a pipeline runs, Plumber.stats contains timing and tracking data:
| Key | Type | Description |
|---|---|---|
start_time |
float |
Unix timestamp when start() was called. |
run_time |
float |
Elapsed seconds from start() to stop(). |
pipers_tracked |
dict |
Results for Pipers with track=True (see below). |
Tracking results¶
By default, results flow through the pipeline and are discarded after the
output Piper processes them. To retain intermediate results, set
track=True on a Piper:
p_compute = Piper(Worker(my_func), parallel=nmap, track=True)
plumber = Plumber()
plumber.add_pipe((p_input, p_compute, p_output))
plumber.start([data])
plumber.run()
plumber.wait()
plumber.pause()
plumber.stop()
# access tracked results
for piper, results in plumber.stats['pipers_tracked'].items():
print(piper, results)
Note
Tracking stores results in memory. For large datasets, consider writing results to disk in the output worker instead.
Logging¶
PaPy uses Python's built-in logging module. Both papy and numap
loggers are available:
import logging
from papy.util.config import start_logger
# log to console at INFO level
start_logger(log_to_stream=True, log_level=logging.INFO)
# log to file at DEBUG level (very verbose)
start_logger(log_to_file='pipeline.log', log_level=logging.DEBUG)
Logger names:
| Logger | Source |
|---|---|
papy |
Pipeline operations (Piper, Dagger, Plumber) |
numap |
NuMap pool operations (task submission, results, sentinels) |
Tip
Set numap to DEBUG level to diagnose hanging pipelines — it logs
every task submission, result retrieval, and sentinel.
Errors and exceptions¶
PaPy workflows are by default resistant to all exceptions that occur in user-provided worker functions. The exception propagation chain:
- Worker function raises
Exception Workercatches it and returns (not raises) aWorkerErrorPiperwraps it as aPiperErrorand passes it downstream- Downstream
Pipersreceiving aPiperErroras input skip evaluation and propagate it further - Output
Piperlogs the error
This means a single bad input item does not crash the entire pipeline.
Debug mode¶
Set debug=True on a Piper to raise exceptions immediately instead of
wrapping them. This is useful during development but will hang the
interpreter after the error:
Dealing with timeouts¶
Timeouts control how long a Piper waits for a result from its NuMap.
If a result is not available within the specified time, a PiperError
wrapping a TimeoutError is returned.
| Parameter | Where | Effect |
|---|---|---|
timeout on Piper |
Piper.__init__ |
Seconds to wait per result before returning PiperError(TimeoutError) |
skip on NuMap |
NuMap.__init__ |
If True, timed-out results are skipped; if False, retrieval is retried |
timeout on Plumber.wait() |
Plumber.wait |
Seconds to block waiting for pipeline to finish |
Warning
Do not specify timeouts for chained tasks within a single NuMap
instance. A timeout on an upstream task will cause downstream tasks to
deadlock because they depend on the upstream result.
Recommended timeout pattern¶
nmap = NuMap(worker_type='process', worker_num=4, skip=True)
p = Piper(Worker(unreliable_func), parallel=nmap, timeout=30)
With skip=True, timed-out items are dropped and the pipeline continues.
Without it, the pipeline retries the same result indefinitely.