Skip to content

Introduction

Many computational tasks require sequential processing of data i.e. the global data set is split into items which are processed separately by multiple chained processing nodes. This is generally called a dataflow. PaPy adapts flow-based programming paradigms and allows to create a data-processing pipeline which allows for both data and task parallelism. In the process of design we tried to make the PaPy API as idiosyncrasy-free as possible, relying on familiar concepts of map functions and directed acyclic graphs.

Feature summary

This is a list of features of a workflow constructed using the PaPy package and its additional components.

  • Construction of arbitrarily complex pipelines (any directed acyclic graph is a valid workflow)
  • Evaluation is lazy-buffered (allows to process datasets that do not fit into memory)
  • Flexible local and remote parallelism (local and remote resources can be pooled)
  • Shared local and remote resources (resource pools can be flexibly assigned to processing nodes)
  • Robustness to exceptions
  • Support for time-outs
  • Real-time logging / monitoring
  • OS-independent (really a feature of multiprocessing)
  • Distributed (really a feature of RPyC)
  • Small code-base
  • Tested and documented

How PaPy works

A typical pipeline looks like this:

flowchart LR
    IN1[(input\ndata)] --> P1
    IN2[(input\ndata)] --> P2

    subgraph Pipeline
        P1["Piper\nparse"] -->|pipe| P3["Piper\ntransform"]
        P2["Piper\nfilter"] -->|pipe| P3
        P3 -->|pipe| P4["Piper\nwrite output"]
    end

    subgraph Resources
        N1["NuMap\n4 processes"]
        N2["NuMap\n2 threads"]
    end

    P1 -.->|uses| N1
    P3 -.->|uses| N1
    P2 -.->|uses| N2
    P4 -.->|uses| N2

Data flows through pipes from input Pipers to output Pipers. Each Piper wraps a Worker (one or more composed functions). NuMap instances provide pools of parallel workers and can be shared across multiple Pipers.

Description

Workflows are constructed from components of orthogonal functionality:

  • the function wrapping Workers
  • the connection capable Pipers
  • the topology defining Dagger
  • the parallel executors NuMaps

The Dagger connects Pipers via pipes into a directed acyclic graph while the NuMaps are assigned to Pipers and evaluate their Workers either locally using threads or processes or on remote hosts. The Workers allow to compose multiple functions while the Pipers allow to connect the inputs and outputs of Workers as defined by the Dagger topology. Data parallelism is possible because data items are independent i.e. if it is a collection (or can be split into one) of data-items: files, messages, sequences, arrays. PaPy enables task parallelism by allowing the data processing functions to be evaluated on different computational resources represented by NuMap instances.

PaPy is written in and for Python this means that the user is expected to write Python functions with defined call/return signatures, but the function code is largely arbitrary e.g. they can call a shell script or import a library. PaPy focuses on modularity, functions should be re-used and composed within pipelines and Workers.

The PaPy workflow automatically logs its execution, is resistant to exceptions and timeouts and should work on all platforms where multiprocessing is available. It also allows to compute over a cross-platform ad-hoc grid using the RPyC package.

Where/When should PaPy be used?

It is likely that you will benefit from using PaPy if some of the following is true:

  • You need to process large collections of data items.
  • Your data collection is too large to fit into memory.
  • You want to utilize an ad-hoc grid.
  • You have to construct a complex workflow or data-flows.
  • You are likely to deal with timeouts or bogus data.
  • The execution of your workflow needs to be logged / monitored.
  • You want to refactor existing code.
  • You want to reuse (wrap) existing code.

Where/When should PaPy not be used?

  • You do not need a workflow just a method to evaluate a function in parallel (consider NuMap for this).
  • The parallel evaluation will improve performance only if the functions have sufficient granularity i.e. a computation to communication ratio.
  • Your input is not a collection and it does not allow for data parallelism.