Pipeline
rackio_AI.Pipeline()Pipeline is a class based on a simple architectural style called Pipe and Filter, which connects a number of components that process a stream of data, each connected to the next component in the processing pipeline via a Pipe.
The Pipe and Filter architecture is inspired by the Unix technique of connecting the output of an application to the input of another via pipes on the shell.
The Pipe and Filter architecture consists of one or more data sources. The data source is connected to data filters via pipes. Filters process the data they receive, passing them to others filters in the pipeline. The final data is received at a Data Sink.
In the following image you can see graphically this architecture style.

Pipe and filter are used commonly for applications that perform a lot of data processing such as data analytics, data transformation, metadata extraction, and so on.
call(self, func_args, *args)Pipeline is too a callable object, so, when the Pipeline is called this function creates the pipeline architecture. Each component in the pipeline (function or method) may need input arguments in addition to its main argument (data to be processed), so, this arguments differents to the main argument must be passed into pipeline as a list of dicts with the following structure.
func_args = [
{
"args": [],
"kwargs": {},
},
{
"args": [],
"kwargs": {},
},
{
"args": [],
"kwargs": {},
}
]
Each element in func_args represents the input arguments of each component (function or method) in the pipeline.
Each component (function or method) are passed into pipeline after the first argument (func_args) in the callable. So, the first element in func_args represents the arguments for the first component of the pipeline
Parameters
- func_args: (list of dicts) Functions argument of each component in the pipeline
- args: (function or method) Positional arguments that represents each component in the pipeline.
returns
- obj: The main argument passed into each component.
Snippet code
>>> from rackio_AI import Pipeline
>>> import numpy as np
>>> def load(value): return np.array([value, value, value, value])
>>> def power(data, value): return data ** value
>>> def sum(data, value): return data + value
>>> args = [{"args": [], "kwargs": {}}, {"args": [2], "kwargs": {}}, {"args": [1], "kwargs": {}}, {"args": [-2], "kwargs": {}}]
>>> pipeline = Pipeline()
>>> pipeline(args, load, power, sum, sum)
>>> pipeline.start(2)
>>> pipeline.data
array([3, 3, 3, 3], dtype=int32)
start(self, initial_state)This method starts to run the pipeline architecture
Parameters
- initial_state: First argument of the pipeline's source method
returns
None