Quick Start


The aim of DataFlow is to chain existing operators to define a workflow and run the workflow in the user or institution infrastructure (whatever it is).

The DataFlow has been designed to make it easy and simple for the user to define a workflow and to run the job. The definition of the operators has to handle different situations and the interaction with task environment, but DataFlow makes it also easy to do.

Hello world

Let’s see how we would implement a Hello World example.

Starting from scratch, we will define a simple an operator:

@op_decorator(BasicOP)
def Hello(item, task):
    """
    Say Hello
    """

    if item is DONE:
        return

    person = item
    message = f"Hello {person}"
    print(message)
    return message

In this code, item will be any item arriving to the operator, and the operator has to handle it. There is a special item DONE that will be received when the user declares that no more data will be sent. In practice, the users will not work much with operators but defining workflows and operator inputs.

Once we have an operation, we can define our workflow as follows:

@wf_decorator(wf_class=ThreadWF)
def HelloWF(head, out):
    """
    Say hello to anyone
    """
    hello = Hello()

    head >> hello >> out

Any workflow starts with a head and ends with the out. Workflow inputs will be sent to the head by default and the result will be gathered by the out, but any intermediate item will be stored in the task environment.

Now we just need to define where will run our workflow:

class HelloWorld(LocalJob):
    """
    Job running List workflow locally
    """
    workflow_class = HelloWF

A local job just needs to know which workflow to run. Other kinds of jobs may be more complex to configure, but this depends on the infrastructure.

Now we have all pieces in place, and our user just needs to call the job:

people = ["Nico", "Gabriel", "Emilio", "Fernan"]

job = job_service.create("HelloWorld")
job.prepare()
for p in people:
    job.submit(p)
job.done()