PaPy API

This part of the documentation is generated automatically from the source code documentation strings. It should be the most up-to-date version. If there is a conflict between the hand-written and and generated documentation, please contact the author e.g. by adding an issue on the project page.

NuMap is a parallel (local or remote), buffered, multi-task, lazy map function, which can use threads and processes.

papy.core

This module provides classes and functions to construct and run a PaPy pipeline.

class papy.core.Dagger(pipers=(), pipes=(), xtras=None)

The Dagger is a directed acyclic graph. It defines the topology of a PaPy pipeline / workflow. It is a subclass of DictGraph. DictGraph edges are called within the Dagger pipes and have an inverted direction which reflects dataflow not dependency. Edges can be thought of as dependencies, while pipes as dataflow between Pipers or nodes of the graph.

Arguments:

  • pipers(sequence) [default: ()] A sequence of valid add_piper inputs (see the documentation for the add_piper method).
  • pipes(sequence) [default: ()] A sequence of valid add_pipe inputs (see the documentation for the add_piper method).
add_pipe(pipe, branch=None)

Adds a pipe (A, ..., N) which is an N-tuple tuple of Pipers instances. Adding a pipe means to add all the Pipers and connect them in the specified left to right order.

The direction of the edges in the DictGraph is reversed compared to the left to right data-flow in a pipe.

Arguments:

  • pipe(sequence) N-tuple of Piper instances or objects which are valid add_piper arguments. See: Dagger.add_piper and Dagger.resolve.
add_piper(piper, xtra=None, create=True, branch=None)

Adds a Piper instance to this Dagger, but only if the Piper is not already there. Optionally creates a new Piper if the “piper” argument is valid for the Piper constructor. Returns a tuple (new_piper_created, piper_instance) indicating whether a new Piper has been created and the instance of the added Piper. Optionally takes “branch” and “xtra” arguments for the topological node in the graph.

Arguments:

  • piper(Piper, Worker or id(Piper)) Piper instance or object which will be converted to a Piper instance.
  • create(bool) [default: True] Should a new Piper be created if “piper” cannot be resolved in this Dagger?
  • xtra(dict) [default: None] Dictionary of graph.Node properties.
add_pipers(pipers, *args, **kwargs)

Adds a sequence of Pipers instances to the Dagger in the specified order. Takes optional arguments for Dagger.add_piper.

Arguments:

  • pipers(sequence of valid add_piper arguments) Sequence of Pipers or valid Dagger.add_piper arguments to be added to the Dagger in the left to right order.
add_pipes(pipes, *args, **kwargs)

Adds a sequence of pipes to the Dagger in the specified order. Takes optional arguments for Dagger.add_pipe.

Arguments:

  • pipes(sequence of valid add_pipe arguments) Sequence of pipes or other valid Dagger.add_pipe arguments to be added to the Dagger in the left to right order.
children_after_parents(piper1, piper2)

Custom compare function. Returns 1 if the first Piper instance is upstream of the second Piper instance, -1 if the first Piper is downstream of the second Piper and 0 if the two Pipers are independent.

Arguments:

  • piper1(Piper) Piper instance.
  • piper2(Piper) Piper instance.
connect(datas=None)

Connects Pipers in the order input -> output. See Piper.connect. According to the pipes (topology). If “datas” is given will connect the input Pipers to the input data see: Dagger.connect_inputs.

Argumensts:

  • datas(sequence) [default: None] valid sequence of input data. see: Dagger.connect_inputs.
connect_inputs(datas)

Connects input Pipers to “datas” input data in the correct order determined, by the Piper.ornament attribute and the Dagger._cmp function.

It is assumed that the input data is in the form of an iterator and that all inputs have the same number of input items. A pipeline will deadlock otherwise.

Arguments:

  • datas (sequence of sequences) An ordered sequence of inputs for all input Pipers.
del_pipe(pipe, forced=False)

Deletes a pipe (A, ..., N) which is an N-tuple of Piper instances. Deleting a pipe means to delete all the connections between Pipers and to delete all the Pipers. If “forced” is False only Pipers which are not used anymore (i.e. have not downstream Pipers) are deleted.

The direction of the edges in the DictGraph is reversed compared to the left to right data-flow in a pipe.

Arguments:

  • pipe(sequence) N-tuple of Piper instances or objects which can be resolved in the Dagger (see: Dagger.resolve). The Pipers are removed in the order from right to left.
  • forced(bool) [default: False] The forced argument will be given to the Dagger.del_piper method. If “forced” is False only Pipers with no outgoing pipes will be deleted.
del_piper(piper, forced=False)

Removes a Piper from the Dagger instance.

Arguments:

  • piper(Piper or id(Piper)) Piper instance or Piper instance id.
  • forced(bool) [default: False] If “forced” is True, will not raise a DaggerError if the Piper hase outgoing pipes and will also remove it.
del_pipers(pipers, *args, **kwargs)

Deletes a sequence of Pipers instances from the Dagger in the reverse of the specified order. Takes optional arguments for Dagger.del_piper.

Arguments:

  • pipers (sequence of valid del_piper arguments) Sequence of Pipers or valid Dagger.del_piper arguments to be removed from the Dagger in the right to left order.
del_pipes(pipes, *args, **kwargs)

Deletes a sequence of pipes from the Dagger in the specified order. Takes optional arguments for Dagger.del_pipe.

Arguments:

  • pipes(sequence of valid del_pipe arguments) Sequence of pipes or other valid Dagger.del_pipe arguments to be removed from the Dagger in the left to right order.
disconnect(forced=False)

Given the pipeline topology disconnects Pipers in the order output -> input. This also disconnects inputs. See Dagger.connect, Piper.connect and Piper.disconnect. If “forced” is True NuMap instances will be emptied.

Arguments:

  • forced(bool) [default: False] If set True all tasks from all NuMaps instances used in the Dagger will be removed even if they did not belong to this Dagger.
get_inputs()

Returns Piper instances, which are inputs to the pipeline i.e. have no incoming pipes (outgoing dependency edges).

get_outputs()

Returns Piper instances, which are outputs to the pipeline i.e. have no outgoing pipes (incoming dependency edges).

resolve(piper, forgive=False)

Given a Piper instance or the id of the Piper. Returns the Piper instance if it can be resolved else raises a DaggerError or returns False depending on the “forgive” argument.

Arguments:

  • piper(Piper or id(Piper)) a Piper instance or its id to be found in the Dagger.
  • forgive(bool) [default: False] If “forgive” is False a DaggerError is raised whenever a Piper cannot be resolved in the Dagger. If “forgive” is True then False is returned.
start()

Given the pipeline topology starts Pipers in the order input -> output. See Piper.start. Pipers instances are started in two stages, which allows them to share NuMaps.

stop()

Stops the Pipers according to pipeline topology.

exception papy.core.DaggerError

Exceptions raised or related to Dagger instances.

class papy.core.Piper(worker, parallel=False, consume=1, produce=1, spawn=1, timeout=None, branch=None, debug=False, name=None, track=False, repeat=False)

Creates a new Piper instance. The Piper is an object that acts a a processing node in a PaPy pipeline.

A Piper can be created from a Worker instance another Piper instance or a sequence of functions or Worker instances in every case a new Piper instance is created.

Piper instances evaluate functions in parallel if they are created with a NuMap instance provided otherwise they use the itertools.imap function.

The “produce” and “consume” arguments allow for different than 1:1 mappings between the number of input and output items, while “spawn” allows accomodate a Piper to handle additional outputs. Additional outputs are created from the elements of the sequence returned by the wrapped Worker instance.

The product of “produce” and “spawn” of the upstream Piper has to equal the product of “consume” and “spawn” of the downstream Piper, for each pair of pipers connected.

The “branch” argument sets the “branch” attribute of a Piper instance. If two Pipers have no upstream->downstream relation they will be sorted according to their “branch” attributes. If neither of them has a “branch” attribute or both are identical their sort order will be semi-random. Pipers will implicitly inherit the “branch” of an up-stream Piper, thus it is only necessary to sepcify the branch of a Piper if it is the first one after a branch point.

It is possible to construct pipelines without specifying branches if Pipers which are connected to multiple up-stream Pipers (the order of which is by default semi-random) use Workers that act correctly regardless of the order of results in their inbox.

If “debug” is True exceptions are raised on all errors. This will most likely hang the Python interpreter after the error occurs. Use during development only!

Arguments:

  • worker(Worker, Piper or sequence of functions or``Workers``)
  • parallel(False or NuMap) [default: False] If parallel is False the Piper instance will not process data-items in parallel
  • consume(int) [default: 1] The number of input items consumed from all directly connected upstream Pipers per one evaluation.
  • produce(int) [default: 1] The number of results to generate for each evaluation result.
  • spawn(int) [default: 1] The number of times this Piper` is implicitly added to the pipeline to consume the specified number of results.
  • timeout(int) [default: None] Time to wait till a result is available. Otherwise a PiperError is returned not raised.
  • branch(object) [default: None] This affects the order of Pipers in the Dagger. Piper instances are sorted according to the data-flow upstream->downstream and their “branch” attributes. The argument can be any object which can be used by the cmp built-in function. If necessary they can override the __cmp__ method.
  • debug(bool) [default: False] Verbose debugging mode. Raises a PiperError on WorkerErrors.
  • name(str) [default: None] A string to identify the Piper.
  • track(bool) [default: False] If True results of this Piper will be tracked by the NuMap (ignored if Piper is linear).
  • repeat(bool) [default: False] If True``and "produce" is larger than ``1 the evaluated results will be repeated. If False it assumes that the evaluated results are sequences and produce will iterate over that list or tuple.
connect(inbox)

Connects the Piper instance to its upstream Pipers that should be given as a sequence. This connects this Piper.inbox with the upstream Piper.outbox respecting any “consume”, “spawn” and “produce” arguments.

Arguments:

  • inbox(sequence) sequence of Piper instances.
disconnect(forced=False)

Disconnects the Piper instance from its upstream Pipers or input data if the Piper is the input node of a pipeline.

Arguments:

  • forced(bool) [default: False] If True the Piper will try to forcefully remove all tasks (including the spawned ones) from the NuMap instance.
next()

Returns the next result. If no result is availble within the specified (during construction) “timeout” then a PiperError which wraps a TimeoutError is returned.

If the result is a WorkerError it is also wrapped in a PiperError and is returned or raised if “debug” mode was specified at initialization. If the result is a PiperError it is propagated.

start(stages=None)

Makes the Piper ready to return results. This involves starting the the provided NuMap instance. If multiple Pipers share a NuMap instance the order in which these Pipers are started is important. The valid order is upstream before downstream. The NuMap instance can only be started once, but the process can be done in 2 stages. This methods “stages” argument is a tuple which can contain any the numbers 0 and/or 1 and/or 2 specifying which stage of the start routine should be carried out:

  • stage 0 - creates the needed itertools.tee objects.
  • stage 1 - activates NuMap pool. A call to next will block..
  • stage 2 - activates NuMap pool managers.

If this Piper shares a NuMap with other Pipers the proper way to start them is to start them in a valid postorder with stages (0, 1) and (2,) separately.

Arguments:

  • stages(tuple) [default: (0,) if linear; (0,1,2) if parallel] Performs the specified stages of the start of a Piper instance. Stage 0 is necessary and sufficient to start a linear Piper which uses an itertools.imap. Stages 1 and 2 are required to start any parallel Piper instance.
stop(forced=False, **kwargs)

Attempts to cleanly stop the Piper instance. A Piper is “started” if its NuMap instance is “started”. Non-parallel Pipers do not have to be started or stopped. An NuMap instance can be stopped by triggering its stopping procedure and retrieving results from the NuMaps end tasks. Because neither the Piper nor the NuMap “knows” which tasks i.e. Pipers are the end tasks they have to be specified:

end_task_ids = [0, 1]    # A list of NuMap task ids
piper_instance.stop(ends =end_task_ids)        

results in:

NuMap_instance.stop(ends =[0,1])

If the Piper did not finish processing the data before the stop method is called the “forced” argument has to be True:

piper_instance.stop(forced =True, ends =end_task_ids)

If the Piper (and consequently NuMap) is part of a Dagger graph the Dagger.stop method should be called instead. See: NuMap.stop and Dagger.stop.

# verify this: # If “forced” is set True but the ends NuMap argument is not # given. The NuMap instance will not try to retrieve any results and # will not call the NuMap._stop method.

Arguments:

  • forced(bool) [default: False] The Piper will be forced to stop the NuMap instance.

Additional keyworded arguments are passed to the Piper.imap instance.

exception papy.core.PiperError

Exceptions raised or related to Piper instances.

class papy.core.Plumber(logger_options={}, **kwargs)

The Plumber is a subclass of Dagger and Graph with added run-time methods and a high-level interface for working with PaPy pipelines.

Arguments:

  • dagger(Dagger instance) [default: None] An optional Dagger instance. if None a new one is created.
load(filename)

Instanciates (loads) pipeline from a source code file.

Arguments:

  • filename(path) location of the pipeline source code.
pause()

Pauses a running pipeline. This will stop retrieving results from the pipeline. Parallel parts of the pipeline will stop after the NuMap buffer is has been filled. A paused pipeline can be run or stopped.

run()

Executes a started pipeline by pulling results from it’s output Pipers. Processing nodes i.e. Pipers with the track attribute set True will have their returned results stored within the Dagger.stats['pipers_tracked'] dictionary. A running pipeline can be paused.

save(filename)

Saves pipeline as a Python source code file.

Arguments:

  • filename(path) Path to save the pipeline source code.
start(datas)

Starts the pipeline by connecting the input Pipers of the pipeline to the input data, connecting the pipeline and starting the NuMap instances.

The order of items in the “datas” argument sequence should correspond to the order of the input Pipers defined by Dagger._cmp and Piper.ornament.

Arguments:

  • datas(sequence) A sequence of external input data in the form of sequences or iterators.
stop()

Stops a paused pipeline. This will a trigger a StopIteration in the inputs of the pipeline. And retrieve the buffered results. This will stop all Pipers and NuMaps. Python will not terminate cleanly if a pipeline is running or paused.

wait(timeout=None)

Waits (blocks) until a running pipeline finishes.

Arguments:

  • timeout(int) [default: None] Specifies the timeout, RuntimeError will be raised. The default is to wait indefinetely for the pipeline to finish.
exception papy.core.PlumberError

Exceptions raised or related to Plumber instances.

class papy.core.Worker(functions, arguments=None, kwargs=None, name=None)

The Worker is an object that composes sequences of functions. When called these functions are evaluated from left to right. The function on the right will receive the return value from the function on the left.

The constructor takes optionally sequences of positional and keyworded arguments for none or all of the composed functions. Positional arguments should be given in a tuple. Each element of this tuple should be a tuple of positional arguments for the corresponding function. If a function does not take positional arguments its corresponding element in the arguments tuple should be an empty tuple i.e. (). Keyworded arguments should also be given in a tuple. Each element of this tuple should be a dictionary of arguments for the corresponding function. If a function does not take any keyworded arguments its corresponding element in the keyworded arguments tuple should be an empty dict i.e. {}. If none of the functions takes arguments of a given type the positional and/or keyworded arguments tuple can be omitted.

All exceptions raised by the functions are caught, wrapped and returned not raised. If the Worker is called with the first argument being a sequence which contains an Exception no function is evaluated and the Exception is re-wrapped and returned.

A Worker instance can be constructed in a variety of ways:

  • with a sequence of functions and a optional sequences of positional and keyworded arguments e.g.:

    Worker((func1,         func2,    func3), 
          ((arg11, arg21), (arg21,), ()),
          ({},             {},       {'arg31':arg31}))
    
  • with another Worker instance, which results in their functional equivalence e.g.:

    Worker(worker_instance)
    
  • with multiple Worker instances, where the functions and arguments of the Workers are combined e.g.:

    Worker((worker1, worker2))
    

    this is equivalent to:

    Worker(worker1.task + worker2.task,                  worker1.args + worker2.args,                  worker1.kwargs + worker2.kwargs)
    
  • with a single function and its arguments in a tuple e.g.:

    Worker(function, (arg1, arg2, arg3))
    

    this is equivalent to:

    Worker((function,),((arg1, arg2, arg3),))
    
exception papy.core.WorkerError

Exceptions raised or related to Worker instances.

papy.graph

This module implements a graph data structure without explicit edges, using nested Python dictionaries. It provides DictNode and DictGraph.

class papy.graph.DictGraph(nodes=(), edges=(), xtras=None)

A dictionary-based graph data structure. This graph implementation is a little bit unusual as it does not explicitly hold a list of edges. A DictGraph instance is a dictionary, where the keys of the dictionary are hashable object instances (node objects), while the values are DictNode instances (topological nodes). A DictNode instance is also a dictionary, where the keys are node objects and the values are DictNode instances. A Node instance (value) is basically a dictionary of outgoing edges from the node object (key). The edges are indexed by the incoming objects. So we end up with a single recursivly nested dictionary which defines the topology of the DictGraph instance. An edge is a tuple of two node objects.

Arguments:

  • nodes(iterable) [default: ()] A sequence of node objects to be added to the graph. See: Graph.add_nodes
  • edges(iterable) [default: ()] A sequence of edges to be added to the graph. See: Graph.add_edges
  • xtras(iterable) [default: None] A sequence of property dictionaries for the added node objects. The topological nodes corresponding to the added node objects will have their Node.xtra attributes updated with the contents of this sequence. Either all or no "xtra" dictionaries must to be given.
add_edge(edge, double=False)

Adds an edge to the DictGraph. An edge is just a pair of node objects. If the node objects are not in the graph they are created.

Arguments:

  • edge(iterable) An ordered pair of node objects. The edge is assumed to have a direction from the first to the second node object.
  • double(bool) [default: False`] If True the the reverse edge is also added.
add_edges(edges, *args, **kwargs)

Adds edges to the graph. Takes optional arguments for DictGraph.add_edge.

Arguments:

  • edges(iterable) Sequence of edges to be added to the DictGraph.
add_node(node, xtra=None, branch=None)

Adds a node object to the DictGraph. Returns True if a new node object has been added. If the node object is already in the DictGraph returns False.

Arguments:

  • node(object) Node to be added. Any hashable Python object.
  • xtra(dict) [default: None] The newly created topological Node.xtra dictionary will be updated with the contents of this dictionary.
  • branch(object) [default: None] an identificator used to sort topologically equivalent branches.
add_nodes(nodes, xtras=None)

Adds node objects to the graph.

Arguments:

  • nodes(iterable) Sequence of node objects to be added to the DictGraph
  • xtras(iterable) [default: None] Sequence of Node.xtra dictionaries corresponding to the node objects being added. See: Graph.add_node.
clear_nodes()

Clears all nodes in the Graph. See Node.clear.

cmp_branch(node1, node2)

comparison of node objects based on the "branch" attribute of their topological nodes.

deep_nodes(node)

Returns all reachable node objects from a node object. See: DictNode.deep_nodes.

Arguments:

  • node(object) a node object present in the graph.
del_edge(edge, double=False)

Removes an edge from the DictGraph. An edge is a pair of node objects. The node objects are not removed from the DictGraph.

Arguments:

  • edge(tuple) An ordered pair of node objects. The edge is assumed to have a direction from the first to the second node object.
  • double(bool) [default: False`] If True the the reverse edge is also removed.
del_edges(edges, *args, **kwargs)

Removes edges from the graph. Takes optional arguments for DictGraph.del_edge.

Arguments:

  • edges(iterable) Sequence of edges to be removed from the DictGraph.
del_node(node)

Removes a node object from the DictGraph. Returns True if a node object has been removed. If the node object is not in the DictGraph raises a KeyError.

Arguments:

  • node(object) node object to be removed. Any hashable Python object.
del_nodes(nodes)

Removes node objects from the graph.

Arguments:

  • nodes(iterable) Sequence of node objects to be removed from the DictGraph. See: DictGraph.del_node.
dfs(node, bucket=None, order='append')

Recursive depth first search. By default (“order” = "append") this returns the node objects in the reverse postorder. To change this into the preorder use a collections.deque as “bucket” and "appendleft" as “order”.

Arguments:

  • bucket(list or collections.dequeue) [default: None] The user must provide a new list or collections.dequeue to store the nodes.
  • order(str) [default: "append"] Method of the “bucket” which will be called with the node object that has been examined. Other valid options might be "appendleft" for a collections.dequeue.
edges(nodes=None)

Returns a tuple of all edges in the DictGraph an edge is a pair of node objects.

Arguments:

  • nodes(iterable) [default: None] iterable of node objects if specified the edges will be limited to those outgoing from one of the specified nodes.
incoming_edges(node)

Returns a tuple of incoming edges for a node object.

Arguments:

  • node(object) node object present in the graph to be queried for incoming edges.
iter_nodes()

Returns an iterator of all node objects in the DictGraph.

node_rank()

Returns the maximum rank for each topological node in the DictGraph. The rank of a node is defined as the number of edges between the node and a node which has rank 0. A topological node has rank 0 if it has no incoming edges.

node_width()

Returns the width of each node in the graph. #TODO

nodes()

Returns a list of all node objects in the DictGraph.

outgoing_edges(node)

Returns a tuple of outgoing edges for a node object.

Arguments:

  • node(object) node object present in the graph to be queried for outgoing edges.
postorder()

Returns a valid postorder of the node objects of the DictGraph if the topology is a directed acyclic graph. This postorder is semi-random, because the order of elements in a dictionary is semi-random and so are the starting nodes of the depth-first search traversal, which determines the postorder, consequently some postorders will be discovered more frequently.

This postorder enforces some determinism on particular ties:

  • toplogically equivalent branches come first are sorted by length (shorter branches come first).
  • if the topological Nodes corresponding to the node objects have a "branch" attribute it will be used to sort the graph from left to right.

However the final postorder is still not deterministic.

rank_width()

Returns the width of each rank in the graph. #TODO

class papy.graph.DictNode(entity=None, xtra=None)

DictNode is the topological node of a DictGraph. Please note that the node object is not the same as the topological node. The node object is any hashable Python object. The topological node is defined for each node object and is a dictionary of other node objects with incoming edges from a single node object.

A node has: "discovered", "examined" and "branch" attributes.

Arguments:

  • entity (object) [default: None] Any hashable object is a valid node object.
  • xtra (dict) [default: None] A dictionary of arbitrary properties of the topological node.
clear()

Sets the "discovered" and "examined" attributes to False.

deep_nodes(allnodes=None)

A recursive method to return a list of all node objects connected from this toplogical node.

iternodes()

Returns an iterator of node objects directly connected from this topological node.

nodes()

Returns a list of node objects directly connected from this topological node.

papy.util.codefile

Provides template strings for saving PaPy pipelines directly as Python source code.

papy.util.config

Configures logging to monitor the execution of PaPy pipelines and OS-dependent defaults for different variables.

papy.util.config.get_defaults()

Returns a dictionary of variables and their possibly os-dependent defaults.

papy.util.config.start_logger(log_to_file=False, log_to_stream=False, log_to_file_level=20, log_to_stream_level=20, log_filename=None, log_stream=None, log_rotate=True, log_size=524288, log_number=3)

Configures and starts a logger to monitor the execution of a PaPy pipeline.

Arguments:

  • log_to_file(bool) [default: True] Should we write logging messages into a file?
  • log_to_stream(bool or object) [default: False] Should we print logging messages to a stream? If True this defaults to stderr.
  • log_to_file_level(int) [default: INFO] The minimum logging level of messages to be written to file.
  • log_to_screen_level(int) [default: ERROR] The minimum logging level of messages to be printed to the stream.
  • log_filename(str) [default: "PaPy_log" or "PaPy_log_$TIME$"] Name of the log file. Ignored if “log_to_file” is False.
  • log_rotate(bool) [default: True] Should we limit the number of logs? Ignored if “log_to_file” is False.
  • log_size(int) [default: 524288] Maximum number of bytes saved in a single log file. Ignored if “log_to_file” is False.
  • log_number(int) [default: 3] Maximum number of rotated log files Ignored if “log_to_file” is False.

papy.util.func

A collection of core functions to use in Worker instances includes functions for dealing with inputs/outputs of a pipeline or Pipers. In general these functions are used to connect Pipers to external inputs/outputs (these are the pipeline input/outputs i.e. streams) or to connect them to other Pipers (via items i.e. transformed elements of the input streams). Based on that distinction two types of functions are provided:

  • stream function load or save the input stream from or into a single

    file, therefore they can only be used at the beginnings or ends of a pipeline. Stream loaders are not worker functions, as they are called once (e.g. with the input file name as the argument) and create the input stream in the form of a generator of input items.

  • item functions load, save, process or display data items. These are Worker functions and should be used within Pipers.

No method of interprocess communication, besides the default inefficient is supported on all platforms. Even among UNIX implementation details forking and can differ.

papy.util.func.dump_item(inbox, type='file', prefix=None, suffix=None, dir=None, timeout=320, buffer=None)

Writes the first element of the inbox as a file of a specified type. The type can be ‘file’, ‘fifo’ or ‘socket’ corresponding to typical files, named pipes (FIFOs). FIFOs and TCP sockets and are volatile i.e. exists only as long as the Python process, which created them. FIFOs are local i.e. allow to communicate processes only on the same computer.

This function returns a semi-random name of the file written. By default creates files and fifos in the default temporary directory. To use named pipes the operating system has to support both forks and fifos (not Windows). Sockets should work on all operating systems.

This function is useful to efficently communicate parallel Pipers without the overhead of using queues.

Arguments:

  • type(‘file’, ‘fifo’, ‘socket’) [default: ‘file’] Type of the created file-like object.
  • prefix(str) [default: "tmp_papy_%type%"] Prefix of the file to be created. Should probably identify the Worker and Piper type.
  • suffix(str) [default: ''] Suffix of the file to be created. Should probably identify the format of the serialization protocol e.g. "pickle" or deserialized data e.g. "nubox".
  • dir(str) [default: tempfile.gettempdir()] Directory to safe the file to. (can be changed only for types "file" and "fifo")
  • timeout(int) [default: 320] Number of seconds to keep the process at the write-end of the "socket" or "pipe" alive.
papy.util.func.dump_pickle_stream(inbox, handle)

Writes the first element of the inbox to the provided stream (data handle) as a pickle. To be used with the load_pickle_stream function.

Arguments:

  • handle(file) A file handle open for writing.
papy.util.func.dump_stream(inbox, handle, delimiter=None)

Writes the first element of the inbox to the provided stream (file handle) delimiting the input by the optional delimiter string. Returns the name of the file being written.

Note that only a single process can have access to a file handle open for writing. Therefore this function should only be used by a non-parallel Piper.

Arguments:

  • handle(file) File handle open for writing.
  • delimiter(str) [default: None] A string which will seperate the written items. e.g: "END" becomes "\nEND\n" in the output stream. The default is an empty string which means that items will be seperated by a blank line i.e.: "\n\n" (two new-line characters).
papy.util.func.ipasser(inbox, i=0)

Passes the “i”-th input from inbox. By default passes the first input.

Arguments:

  • i(int) [default: 0]
papy.util.func.json_dumps(inbox)

Serializes the first element of the input using the JSON protocol as implemented by the json Python 2.6 library.

papy.util.func.json_loads(inbox)

Deserializes the first element of the input using the JSON protocol as implemented by the json Python 2.6 library.

papy.util.func.load_item(inbox, type='string', remove=True, buffer=None)

Loads data from a file. Determines the file type automatically "file", "fifo", "socket", but allows to specify the representation type "string" or "mmap" for memory mapped access to the file. Returns the loaded item as a str or mmap object. Internally creates an item from a file.

Arguments:

  • type("string" or "mmap") [default: "string"] Determines the type of object the worker returns i.e. the file is read as a string or a memmory map. FIFOs cannot be memory mapped.
  • remove(bool) [default: True] Should the file be removed from the filesystem? This is mandatory for FIFOs and sockets. Only Files can be used to store data persistantly.
papy.util.func.load_pickle_stream(handle)

Creates an object generator from a stream (file handle) containing data in pickles. To be used with the dump_pickle_stream

File handles should not be read by different processes.

Arguments:

  • handle(file) A file handle open for reading.
papy.util.func.load_stream(handle, delimiter=None)

Creates a string generator from a stream (file handle) containing data delimited by the delimiter strings. This is a stand-alone function and should be used to feed external data into a pipeline.

Arguments:

  • hande(file) A file handle open for reading.
  • delimiter(str) [default: None] The default means that items will be separated by two new-line characters i.e.: "\n\n".
papy.util.func.make_lines(handle, follow=False, wait=0.1)

Creates a line generator from a stream (file handle) containing data in lines.

Arguments:

  • follow(bool) [default: False] If True follows the file after it finishes like ‘tail -f’.
  • wait(float) [default: 0.1] time to wait in seconds between file polls.
papy.util.func.njoiner(inbox, n=None, join='')

String joins and returns the first “n” inputs.

Arguments:

  • n(int) [default: None] All elements in the inbox smaller then this number will be joined.
  • join(str) [default: ""] String which will join the elements of the inbox i.e. join.join().
papy.util.func.npasser(inbox, n=None)

Passes “n” first inputs from inbox. By default passes the whole inbox.

Arguments:

  • n(int) [default: None]
papy.util.func.nzipper(inbox, n=None)

Zips the “n” first inputs from inbox. By default zips thee whole inbox.

Arguments:

  • n(int) [default: None] The default translates to zip(*inbox[:])
papy.util.func.pickle_dumps(inbox)

Serializes the first element of the input using the pickle protocol using the fastes binary protocol.

papy.util.func.pickle_loads(inbox)

Deserializes the first element of the input using the pickle protocol.

papy.util.func.plugger(inbox)

Returns nothing.

papy.util.func.print_(inbox)

Prints the first element of the inbox.

papy.util.func.sjoiner(inbox, s=None, join='')

String joins input with indices in s.

Arguments:

  • s(sequence) [default: None] tuple or list of indices of the elements which will be joined.
  • join(str) [default: ""] String which will join the elements of the inbox i.e. join.join().
papy.util.func.spasser(inbox, s=None)

Passes inputs with indecies in s. By default passes the whole inbox.

Arguments:

  • s(sequence) [default: None] The default translates to a range for all inputs of the “inbox” i.e. range(len(inbox))
papy.util.func.szipper(inbox, s=None)

Zips inputs from inbox with indicies in “s”. By default zips the whole inbox (all indices).

Arguments:

  • s(sequence) [default: None]

papy.util.script

Provides script a Worker function to interect with arbitrary scripts.

papy.util.script.get_config(lang='python')

Returns language-specific script configuration.

Arguments:

  • lang(‘python’, ‘bash’) which programming language
papy.util.script.script(inbox, cfg)

Execute arbitrary scripts.

Arguments:

  • cfg(dict) script configuartion dictionary
papy.util.script.write_template(fn, lang='python')

Write language-specific script template to file.

Arguments:

  • fn(string) path to save the template to
  • lang(‘python’, ‘bash’) which programming language

papy.util.runtime

Provides a (possibly shared, but not yet) dictionary.

papy.util.runtime.get_runtime()

Returns a PAPY_RUNTIME dictionary.

numap.NuMap

This module provides a parallel (local or remote), buffered, multi-task, lazy map function, which can use threads and processes.

class numap.NuMap.NuMap(func=None, iterable=None, args=None, kwargs=None, worker_type=None, worker_num=None, worker_remote=None, stride=None, buffer=None, ordered=True, skip=False, name=None)

NuMap is a parallel (thread- or process-based, local or remote), buffered, multi-task, itertools.imap or multiprocessing.Pool.imap function replacment. Like imap it evaluates a function on elements of a sequence or iterable, and it does so lazily. Laziness can be adjusted via the “stride” and “buffer” arguments. Unlike imap, NuMap supports multiple pairs of function and iterable tasks. The tasks are not queued rather they are interwoven and share a pool or worker “processes” or “threads” and a memory “buffer”.

Pool

The pool is a set of managed worker processes or threads. The choice of the “worker_type” has a fundamental impact on the performance of the map. As a general rule use “process” if you have multiple CPUs or CPU-cores and your task functions are cpu-bound. Use “thread” if your function is IO-bound e.g. retrieves data from the Web. Increasing the number of workers above the number of CPUs makes sense only if these are “thread” based workers and the evaluated functions are IO-bound. Some CPU-bound tasks might evaluate faster if the number of worker processes equals the number of CPUs + 1. For “thread” based NuMaps a larger number of workers of might improve performance. The “worker_num” argument must not not include workers needed to run remote processes and can be equal 0 for a purely remote NuMaps.

Iteration

Results are retrieve through iteration. A single NuMap instance supports iteration over results from many tasks. This means that it supports multiple end-points. The default is to iterate over the results from the first task. An iterator for a single task is returned by the NuMap.get_task method.

Order

The tasks can be interdependent i.e. the results from one task being the input to a second task. The order in which tasks are added to the NuMap instance is important. It affects the order of evaluation and consequently the order in which results should be retrieved. If the tasks are chained then the “order” must be a valid topological sort (reverse topological order). If the NuMap is ordered the n-th result for a specific task the will be always be available before the n+1-th result. If “order” is False the results will be available in the order they are calculated.

Skipping

The “skipping” argument determines how to respond to TimeoutErrors it is ignored if no “timeout” value is given to the NuMap.next method. If “skipping” is True results, which are not calculated on time will be omitted. If “skip” False an exception will be raised, but the result can be retrieved later. If tasks are chained a TimeoutError will collapse the NuMap evaluation. Do not* specify timeouts in for chained **tasks.

Parallel evaluation

The parallelism of the evaluation is strictly defined by the “stride”, “buffer” and the total number of workers in the pool. The worker number is obviously the upper bound of concurrently evaluated elements. The maximum number of elements from a single task evauluated in parallel is defined by “stride”. The “buffer” limits the maximum number of pending results for all tasks it is a function of “stride”, but also of the topology of dependencies between the tasks. A long “stride” improves parallelism, but increases “buffer” memory requirements. It should not be smaller than the number of pool workers, because some will be idle. The size of the “buffer” is larger or equal to “stride” because a task might depend on results from multiple up-stream tasks.

The minimum “stride” and “buffer is 1 therefore the results from the “buffer” must be removed in the same order as input elements are evaluated. Otherwise the NuMap might dead-lock.

An element is buffered until it is returned by the NuMap.next method. Starting the NuMap will cause one element (the first from the first task to be submitted to the pool. For “stride” equal to 1, the next queued element is the first from the second task, which can enter the pool only if either the first result is retrieved (i.e. NuMap.next returns) or the “buffer” is larger then the “stride”. If the “buffer” is n then n tasklets can enter the pool. A “stride” of n requires at least n elements to enter the pool, therefore “buffer” cannot be smaller then “stride”. The “minimum” buffer is the maximum possible number of queued results. This number depends on the interdependencies between tasks and the “stride”. The default is conservative and sufficient for all topologies. If the tasks are chained i.e. the output from one is consumed by another thenat most one i-th element from each chained task is at a given moment in the pool. In those cases the minimum “buffer” to satisfy the worst case number of queued results is lower then the safe default.

Stopping

The NuMap can be stopped at any time, however some buffered results might be lost and up to 2 * “stride” additional input elements comsumed. If pending buffered results are not retrieved the NuMap might not shut down properly.

Arguments:

  • func (callable) [default: None] If the NuMap is given a function it is used to define the first and only task of the NuMap
  • iterable (iterable) [default: None] a sequence of first arguments for “func” required if “func” is given
  • args (tuple) [default: None] optional, see: NuMap.add_task
  • kwargs (dict) [default: None] optional, see: NuMap.add_task
  • worker_type('process' or 'thread') [default: 'process'] Defines the type of internally spawned pool workers. For multiprocessing.Process based worker choose ‘process’ for threading.Thread workers choose ‘thread’.
  • worker_num(int) [default: number of CPUs, min: 1] The number of workers to spawn locally. Defaults to the number of availble CPUs, which is a reasonable choice for process-based NuMaps.
  • worker_remote(iterable) [default: None] A sequence of “remote host” “remote worker_num” tuples e.g. (('localhost', 2]), ('127.0.0.1', 2)) “remote worker_num” is the number of workers processes per remote host. A custom TCP port can be specified (('localhost:6666',2),).
  • stride(int) [default: automatic] number of elements from a task evaluated in parallel
  • buffer(int) [default: automatic] total number number of elements (inputs and results) in the NuMap instance
  • ordered(bool) [default: True] If True the output of all tasks will be ordered see: order.
  • skip(bool) [default: False] Should we skip a result if trying to retrieve it raised a TimeoutError?
  • name(str) [default: “imap_id(id(object))”] an optional name to associate with this NuMap instance. It should be unique. Useful for code generation.

Restrictions:

  • a completely lazy i.e. “buffer-free” evaluation is not supported
  • if remote workers are enabled, “worker_type” has to be the default “process”.
add_task(func, iterable, args=None, kwargs=None, timeout=None, block=True, track=False)

Adds a task to evaluate. A task is jointly a function or callable an iterable with optional arguments and keyworded arguments. The iterable can be the result iterator of a previously added task (to the same or to a different NuMap instance).

Arguments:

  • func (callable) this will be called on each element of the “iterable” and supplied with arguments “args” and keyworded arguments “kwargs”
  • iterable (iterable) this must be a sequence of picklable objects which will be the first arguments passed to the “func”
  • args (tuple) [default: None] A tuple of optional constant arguments passed to the callable “func” after the first argument from the “iterable”
  • kwargs (dict) [default: None] A dictionary of keyworded arguments passed to “func” after the variable argument from the “iterable” and the arguments from “args”
  • timeout (bool) see: _NuMapTask
  • block (bool) see: _NuMapTask
  • track (bool) [default: False] If True the results (or exceptions) of a task are saved within: self._tasks_tracked[%task_id%] as a {index:result} dictionary. This is only useful if the callable “func” creates persistant data. The dictionary can be used to restore the correct order of the data
get_task(task=0, timeout=None, block=True)

Returns an iterator which results are limited to one task. The default iterator the one which e.g. will be used in a for loop is the iterator for the first task (task =0). The returned iterator is a _NuMapTask instance.

Compare:

for result_from_task_0 in imap_instance:
    pass

with:

for result_from_task_1 in imap_instance.get_task(task_id =1):
    pass

a typical use case is:

task_0_iterator = imap_instance.get_task(task_id =0)
task_1_iterator = imap_instance.get_task(task_id =1)

for (task_1_res, task_0_res) in izip(task_0_iterator, task_1_iterator):
    pass
next(timeout=None, task=0, block=True)

Returns the next result for the given task. Defaults to 0, which is the first task. If multiple chained tasks are evaluated then the next method of only the last should be called directly.

Arguments:

  • timeout (float) Number of seconds to wait until a TimeoutError is raised.
  • task (int) id of the task from the NuMap instance
  • block (bool) if True call will block until result is available
pop_task(number)

Removes a previously added task from the NuMap instance.

Arguments:

  • number (int or True) A positive integer specifying the number of tasks to pop. If number is set True all tasks will be popped.
start(stages=(1, 2))

Starts the processes or threads in the internal pool and the threads, which manage the worker pool input and output queues. The starting mode is split into two stages, which can be initiated seperately. After the first stage the worker pool processes or threads are started and the NuMap._started event is set True. A call to the NuMap.next method will block. After the second stage the NuMap._pool_putter and NuMap._pool_getter threads will be running. The NuMap.next method should only be called after this method returns.

Arguments:

  • stages (tuple) [default: (1, 2)] Specifies which stages of the start process to execute, by default both stages.
stop(ends=None, forced=False)

Stops an NuMap instance. If the list of end tasks is specified via the “ends” argument a call to NuMap.stop will block the calling thread and retrieve (discards) a maximum of 2 * stride of results. This will stop the worker pool and the threads which manage its input and output queues respectively.

If the “ends” argument is not specified, but the “forced” argument is the method does not block and the NuMap._stop has to be called after all pending results have been retrieved. Calling NuMap._stop with pending results will dead-lock.

Either “ends” or “forced” has to be True.

Arguments:

  • ends (list) [default: None] A list of task ids which are not consumed within the NuMap instance.
  • forced (bool) [default: False] If “ends” is not None this argument is ignored. If “ends” is None and “forced” is True the NuMap instance will trigger stopping mode.
numap.NuMap.imports(modules, forgive=False)

Should be used as a decorator to attach import statments to function definitions. These imports are added to the global (i.e. module-level of the decorated function) namespace.

Two forms of import statements are supported (in the following examples foo, bar, oof, and ``rab are modules not classes or functions):

import foo, bar              # -> @imports(['foo', 'bar'])
import foo.oof as oof            
import bar.rab as rab        # -> @imports(['foo.oof', 'bar.rab'])

It provides support for alternatives:

try:
    import foo
except ImportError:
    import bar

which is expressed as:

@imports(['foo,bar'])

or alternatively:

try:
    import foo.oof as oof
except ImportError:
    import bar.rab as oof

becomes:

@imports(['foo.oof,bar.rab'])

This import is available in the body of the function as oof All needed imports should be attached for every function (even if two function are in the same module and have the same globals)

Arguments:

  • modules (list) A list of modules in the following forms ['foo', 'bar', ..., 'baz'] or ['foo.oof', 'bar.rab', ..., 'baz.zab']
  • forgive (bool) [default: False] If True will not raise ImportError`

Table Of Contents

Previous topic

Dictionary of terms and definitions

This Page