Workflow implementation

Goal

Trigger tasks based on some constraints.

We can implement these constraints in multiple steps:

  • React to one event
  • React to one result
  • React to one event filtered based on its data
  • React to one result filtered based on its data
  • React to multiple events
  • React to multiple results

The current limit for this implementation:

  • Execution graph
  • React to one event only
  • React to one result only
  • No data transformation

Specifications

The initial workflow is a simplification of the final workflow, we will first implement a Tree Graph of execution instead of a Directed Acyclic Graph and later on maybe a Directed Graph.

The Tree implementation is possible thanks to the parentHash in the execution structure that references the previous execution.

Every new execution will create a new execution data that points to the previous one (the result that permits to trigger this execution).

For now, we limit the concept to one event start the workflow and the workflow can contain multiple chains of results.

Here is an example:

graph LR A[serviceA#eventX] -->C(serviceB#task1) C --> D[serviceC#task2] C --> E[serviceD#task3] C --> F[serviceE#task4] D --> G[serviceF#task5]

The workflow is responsible for creating this execution graph and have the following logic:
For every event/result, fetch the workflow definition that matches this event/result. Resolve the inputs (for now no processing just passing data into input), create the execution with the link of the previous execution if exists.

Implementation

The workflow engine will be its own package that runs as a job in its own thread and will listen to all events coming from the api.

  • Listen to events
  • Match workflows
  • Fetch previous execution based on the event data
  • Iterate on every
  • Create a new execution with the data of the event and the previous executionHash

The workflow only listens to events. It is not directly linked to the end of a task. In order to react from a result we need to create a system event that SubmitResult can emit and that the workflow will listen.

The user will have the possibility to connect to a result of execution but this should be “compiled” into events.

type Task struct {
  serviceHash string // this needs to be the hash not sid, this will be resolved by the workflow importer when we have an importer
  taskKey string
}
type Trigger struct {
  serviceHash string
  eventKey string // executionFinished to have a result of an execution
  filter Predicate // we can start with a simple map eg: `{ taskKey: "xxx" }`
}
type Workflow struct {
  trigger Trigger
  tasks []Task
}

Example of that based on the first graph:

workflowA:
  trigger:
    serviceHash: serviceA
    eventKey: "eventX"
  tasks:
      - serviceHash: serviceB
        taskKey: "task1"
      - serviceHash: serviceC
        taskKey: "task2"
      - serviceHash: serviceF
        taskKey: "task5"
workflowB:
  trigger:
    serviceHash: serviceB
    eventKey: "executionFinished"
    filter: { taskKey: "task1" }
  tasks:
    - serviceHash: serviceD
      taskKey: task3
workflowC:
  trigger:
    serviceHash: serviceB
    eventKey: "executionFinished"
    filter: { taskKey: "task1" }
  tasks:
    - serviceHash: serviceE
      taskKey: task4

This way we create our tree which is composed of multiple workflows and will be correct by construction.

One of the limitations here is that we cannot access the data of “eventX” from “workflowB” and “workflowC” but we have a workaround to create a task at the end of “workflowA” that aggregate all the data needed for “workflowB” and “workflowC”, this way the new workflow will have all the data.

For now, workflows need to be hardcoded when the core starts (creating system workflow) and will be available for users on a different feature.

Drafts:

1 Like