Skip to main content

Workflow Engine

The Workflow Engine is a DAG-based execution platform that runs multi-step automation workflows. It is built on a queue-first, append-only-log architecture designed for correctness under concurrent execution across multiple application pods.

Design principles​

Before reading anything else, internalize these five rules β€” they explain nearly every design decision in the engine:

  1. Queue-first β€” every node execution is a discrete job. No synchronous blocking between nodes.
  2. Append-only log β€” tblWorkflowInstanceLogs is the single source of truth. Context is assembled by folding log rows; nothing is ever updated in place.
  3. Optimistic concurrency β€” a version field on tblWorkflowInstances serialises the scheduling decision using a CAS (compare-and-swap) loop. No distributed locks required.
  4. Idempotent log writes β€” a unique index on (instanceID, nodeID, eventType, nodeAttempt) means duplicate result delivery is discarded before the scheduling loop is reached.
  5. NODE_DISPATCHED sentinel β€” written atomically by the CAS winner before addNodeJob, giving losers a definitive read to check before re-dispatching.

System components​

LayerModuleResponsibility
API / Serviceworkflow.service.jsCRUD for workflow definitions; triggers execution
Queue Transportqueue.config.js (fastq)In-process FIFO queues for jobs and results
Task WorkertaskWorker.jsExecutes individual node handlers; reports results
Orchestratororchestrator.jsProcesses results; updates log; decides next nodes
State ManagerstateManager.jsAll DB reads/writes through a single interface

Data model​

Tables​

TablePurpose
tblWorkflowsWorkflow definition β€” title, options, tenant
tblWorkflowNodesNode list β€” type, config, position
tblWorkflowEdgeDirected edges β€” upstreamNodeID β†’ downstreamNodeID, sourceHandle
tblWorkflowInstancesOne row per execution β€” status, version (CAS), timestamps
tblWorkflowInstanceLogsAppend-only event log β€” the single source of truth

tblWorkflowInstanceLogs schema​

Every event in an execution writes one immutable row here. Context assembly, barrier checks, and dispatch guards all read from this table.

ColumnTypeDescription
logIDBigInt PKAuto-increment; determines ordering for context fold
instanceIDUUID FKLinks to tblWorkflowInstances
nodeIDUUID | nullNull for INPUT_SET and SYSTEM_SET rows
eventTypeEnumSee event types below
nodeStatussuccess | error | nullExecution result for node events
outputVariablestring | nullKey written into the context object
payloadJSONBContext contribution (empty {} for failure/dispatch rows)
errorMessagestring | nullNon-null only for NODE_FAILED
nodeAttemptinteger | nullWhich taskWorker retry produced this row (1 = first attempt)

Event types​

eventTypeWho writes itCarries payload?Purpose
INPUT_SETstateManager.createInstanceYes β€” { input: params }Records workflow input; seeds context
NODE_COMPLETEDorchestrator.handleTaskResultYes β€” node outputNode ran successfully; contributes to context
NODE_FAILEDorchestrator.handleTaskResultNo (empty {})Node failed; error in errorMessage column
NODE_DISPATCHEDOrchestrator CAS winnerNo (empty {})Signals a node has been queued; prevents duplicate dispatch
SYSTEM_SETOrchestrator (test runs)Yes β€” __workflowDefinition etc.Metadata for test-run execution

Required schema migrations​

Run the following before deploying the concurrency fix. Both columns and both indexes must exist for optimistic locking and idempotency to function correctly.

-- 1. Optimistic lock version field
ALTER TABLE "tblWorkflowInstances"
ADD COLUMN IF NOT EXISTS "version" INTEGER NOT NULL DEFAULT 0;

-- 2. Execution attempt tracking
ALTER TABLE "tblWorkflowInstanceLogs"
ADD COLUMN IF NOT EXISTS "nodeAttempt" INTEGER NULL;

-- 3. Idempotency guard β€” prevents duplicate log rows from at-least-once delivery
CREATE UNIQUE INDEX IF NOT EXISTS "uq_workflow_log_node_event"
ON "tblWorkflowInstanceLogs" ("instanceID", "nodeID", "eventType", "nodeAttempt")
WHERE "nodeID" IS NOT NULL;

-- 4. Fast lookup for dispatch guard checks
CREATE INDEX IF NOT EXISTS "idx_workflow_log_dispatched"
ON "tblWorkflowInstanceLogs" ("instanceID", "eventType")
WHERE "eventType" = 'NODE_DISPATCHED';

-- 5. Fast lookup for barrier checks (may already exist)
CREATE INDEX IF NOT EXISTS "idx_workflow_log_completed"
ON "tblWorkflowInstanceLogs" ("instanceID", "nodeID", "eventType")
WHERE "eventType" = 'NODE_COMPLETED';

Execution lifecycle​

Saved workflow run​

Test workflow run​

Test runs allow the frontend to execute unsaved workflows from the editor canvas without writing a permanent definition to the database.

Key differences from a production run:

  • The node/edge graph is stored as __workflowDefinition inside a SYSTEM_SET log row, not read from the database on each scheduling step.
  • _resolveTestNextNodes reads from context.__workflowDefinition instead of calling dagScheduler.calculateNextNodes.
  • The barrier check uses contextData.__node_<id> sentinel keys instead of querying NODE_COMPLETED log rows, because test runs use frontend UUID strings as nodeIDs which do not match database UUIDs.

Context assembly​

The workflow context is a plain key-value object assembled by folding all INPUT_SET, NODE_COMPLETED, and SYSTEM_SET log rows in logID order. Later rows win on key conflicts.

Each NODE_COMPLETED row writes three keys into its payload:

  • [outputVariable] β€” the value downstream nodes reference via {{ctx.variableName}}.
  • output (end node only) β€” the final workflowOutput object for widget consumption.
  • __node_<nodeID> β€” a sentinel used by test-run barrier checks and debugging.

NODE_DISPATCHED rows carry no payload and are intentionally excluded from context assembly.


Concurrency model​

The double-dispatch problem​

When a join node has multiple incoming branches (joinMode: "all"), those branches may complete at nearly the same time. Two separate handleTaskResult calls run concurrently β€” one per branch result.

This is not a single-process problem. It occurs across pods because both write their log rows before either reads getCompletedNodeIDs.

The fix: optimistic locking with CAS​

The version field on tblWorkflowInstances is the atomic gate.

CAS retry loop​

Why not pessimistic locking?​

SELECT FOR UPDATE is a valid alternative. The constraint is that every DB call inside handleTaskResult must use the same Prisma transaction client (tx), because PostgreSQL row locks are session-scoped. This requires threading tx into stateManager and dagScheduler.

Optimistic locking avoids this refactor: every query uses its own pool connection normally, correctness is enforced by the CAS gate, and there is zero blocking overhead in the common case (races are rare β€” they require two branches to complete within milliseconds).


DAG scheduler​

calculateNextNodes​

Given a completed node and its output handle, this function returns the set of downstream nodes ready to execute. It makes exactly 4 DB queries regardless of fan-out width:

Barrier check: isNodeReadyToExecute​

For each candidate node, the barrier check evaluates joinMode:

joinModeFires when
"all" (default)Every upstream node in the incoming edge set has a NODE_COMPLETED log row
"any"At least one upstream node has a NODE_COMPLETED log row

Single-parent nodes (incomingEdges.length <= 1) are always ready β€” no DB query needed.

For test runs the barrier falls back to checking contextData.__node_<id> key presence, because test runs use frontend UUID strings as nodeIDs.


Node handlers​

Handler contract​

Every handler exports one async function:

async function execute(nodeConfig, context, helpers) {
// helpers: { instanceID, nodeID, workflowID, resolveTemplate }
return {
output: object, // written into context under outputVariable
nextHandle: string, // which outgoing edge to follow
queueDelay: number, // optional ms β€” delays dispatch of next nodes (delay node)
};
}

Handler reference​

Node typeFileKey behaviour
startstartHandler.jsPasses through; returns { started: true, inputReceived: ctx.input }
endendHandler.jsCollects outputParameters from context; nextHandle: null (terminal)
javascriptjavascriptHandler.jsRuns user code in isolated-vm sandbox; IIFE wrapper so return works; synchronous only
dataQuerydataQueryHandler.jsCalls QueryEngine.executeQuery(dataQueryID, resolvedArgs); args resolved via template engine
conditionconditionHandler.jsEvaluates branches in order; returns first truthy branch ID as nextHandle
delaydelayHandler.jsReturns queueDelay: N ms; non-blocking β€” next node is re-queued with delay
looploopHandler.jsResolves source array; nextHandle: "loop" (has items) or "done" (empty)

Template resolution​

Handlers resolve {{ctx.*}} expressions via sharedResolveTemplate. Two modes:

  • Single expression β€” "{{ctx.input.id}}" preserves the original type (number, array, object).
  • String interpolation β€” "prefix_{{ctx.input.id}}" always returns a string.

JavaScript node sandbox​

User code runs inside isolated-vm with a configurable timeout (default 30 s). The sandbox exposes only ctx (the current workflow context).

// Both forms work inside the javascript node
return ctx.input.items.map(item => item.id);

// Async
const result = await fetch('https://...');
return await result.json();

errorHandling modes​

ModeBehaviour
continue (default)Return error output and follow the "error" output handle. Workflow continues.
fail_workflowThrow β€” propagates to taskWorker retry logic, then marks workflow FAILED.

Queue configuration​

In-process queues (fastq)​

There is no external broker. Two queues exist:

QueueProducerConsumerConcurrency
workflow.tasksaddNodeJobtaskWorker._processJob10
workflow.resultsaddResultorchestrator.handleTaskResult10

Delayed jobs​

addNodeJob accepts options.delay (ms). Delayed jobs use setTimeout to push into the task queue after the delay. This is used by the delay node β€” the handler returns queueDelay: N and the orchestrator forwards this when dispatching next nodes.

At-least-once delivery​

The unique index on (instanceID, nodeID, eventType, nodeAttempt) means any duplicate result delivery throws P2002 on the log write and exits cleanly without re-entering the scheduling loop. The CAS loop is never reached.


State manager API​

MethodReturnsDescription
logEvent({...})log rowSingle write path for all events. Pass nodeAttempt for node events.
logEventBulk(events[]){ count }Batch INSERT for multiple events. One round-trip.
createInstance({...}){ instance, initialContext }Creates instance row + INPUT_SET log row. Returns initial context directly.
getInstance(instanceID)instance | nullFetches instance row (status, version, flags). Does not assemble context.
assembleContext(instanceID)context objectFolds INPUT_SET + NODE_COMPLETED + SYSTEM_SET payloads in logID order.
getCompletedNodeIDs(id, nodeIDs[])Set<string>Barrier check β€” returns nodeIDs that have NODE_COMPLETED rows.
getDispatchedNodeIDs(id, nodeIDs[])Set<string>CAS loser check β€” returns nodeIDs that have NODE_DISPATCHED rows.
completeInstance(id, status)voidMarks instance COMPLETED / FAILED / CANCELLED. Does not write a log row.
getStaleRunningInstances(opts)instance[]Recovery sweep β€” finds RUNNING instances with updatedAt older than threshold.
markInstancesAsRecovered(ids[])voidBulk-marks stale instances FAILED; writes SYSTEM_SET log row per instance.
deleteTestInstance(instanceID)voidTransactionally deletes all log rows and the instance row. Test-run only.

Error handling and recovery​

Node-level retry​

  • nodeAttempt is recorded on every log row so the audit trail shows which attempt produced each result.
  • Default maxAttempts is 3 with exponential backoff: 1 s, 2 s, 4 s.

Orchestrator errors​

If handleTaskResult itself throws (outside the CAS loop), the catch block calls stateManager.completeInstance(instanceID, "FAILED"). This prevents an instance from being stuck in RUNNING indefinitely.

Stale instance recovery​

On application startup, recoverStuckWorkflows runs:

  1. Finds RUNNING instances whose updatedAt is older than 5 minutes.
  2. Marks them FAILED via updateMany.
  3. Writes a SYSTEM_SET log row with __recoveryError for each.
  4. Emits workflow_status_update to any connected sockets.

Real-time socket events​

Server β†’ client​

EventRoomPayload
workflow_node_updateinstanceID{ instanceID, nodeID, status, output, error }
workflow_status_updateinstanceID{ instanceID, status, contextData }

Client β†’ server​

EventPayloadEffect
workflow_run_join{ runId: instanceID }Socket joins the instanceID room to receive node and status events

Workflow service API​

MethodDescription
getAllWorkflows({ tenantID })Returns all workflow definitions for a tenant including nodes and edges
getWorkflowByID({ workflowID })Fetches a single workflow definition with nodes and edges
createWorkflow({ title, nodes, edges, workflowOptions })Transactionally creates workflow + nodes + edges
updateWorkflow({ workflowID, nodes, edges })Transactionally replaces nodes and edges (delete-all + re-create)
deleteWorkflow({ workflowID })Transactionally removes nodes, edges, instances, and the workflow
executeWorkflow({ workflowID, inputParams })Starts a production run; returns { instanceID } immediately
testWorkflow({ nodes, edges, inputParams })Starts a test run from unsaved in-memory graph
stopTestWorkflow({ instanceID })Deletes test instance and all its log rows
getRunStatus(instanceID)Returns instance status and log history
getRunStatusForWidget({ instanceID, widgetType, widgetConfig })Returns status with processed widget data (SYNC / replay path)

Testing​

Stress test architecture​

The stress test suite at __tests__/stress/workflowStress.test.js drives the full orchestrator β†’ handler β†’ orchestrator cycle in-process without a real queue.

ComponentFilePurpose
DB mockinMemoryPrisma.jsIn-memory Prisma mock with correct matchesWhere(), version CAS semantics, and P2002 simulation
ExecutorworkflowExecutor.jsdrainJobsFor(instanceID) β€” per-instance drain prevents concurrent instances stealing each other's queue entries
Topologiestopologies.js12 synthetic workflow shapes: linear, fan-out, fan-in, diamond AND/OR, deep chain, error path, conditional, delay, multi-level join

Key assertions​

  • Join node executes exactly once per instance β€” even under runParallel (concurrent batch processing).
  • 100 concurrent linear instances all reach COMPLETED.
  • 50 concurrent diamond-AND instances show zero double-dispatch events.
  • All 12 topology shapes reach a terminal state.
  • Duplicate result delivery (P2002 on log write) exits cleanly without double-dispatch.

Operational trade-offs​

Advantages of the current design​

  • Simple local development β€” no external broker needed.
  • Fast feedback for editor test runs.
  • Low cognitive overhead while the feature set evolves.
  • Queue API surface is transport-agnostic, so replacing fastq with pg-boss or RabbitMQ requires only changes to queue.config.js.

Current constraints​

  • Queue state is process-local β€” lost on pod restart.
  • Horizontal scaling is limited β€” task and result processing must happen on the same pod where the queue was initialised.
  • Delayed retries use in-process timers β€” less resilient than broker-managed delays.

Migration path​

Because queue.config.js exposes a stable interface (addNodeJob, addResult, registerTaskWorker, registerResultsWorker), the workflow layer remains fully decoupled from the transport. Replacing the queue with pg-boss or RabbitMQ requires no changes to the orchestrator, task worker, or state manager.


File reference​

FileRole
orchestrator.jshandleTaskResult CAS loop; startWorkflow; startTestWorkflow
stateManager.jsAll DB operations β€” log writes, context assembly, barrier/dispatch checks
dagScheduler.jscalculateNextNodes; isNodeReadyToExecute barrier logic
taskWorker.jsNode job consumer; handler execution; retry with backoff
workflowWorkers.jsStartup β€” initialises queue, registers task worker and results consumer
workflow.service.jsBusiness logic facade β€” CRUD, executeWorkflow, testWorkflow, stopTestWorkflow
queue.config.jsfastq in-process queues; addNodeJob; addResult; registerTaskWorker/ResultsWorker
handlers/startHandler.jsStart node β€” passes input through
handlers/endHandler.jsEnd node β€” collects outputParameters; terminal
handlers/javascriptHandler.jsJS execution in isolated-vm sandbox with timeout
handlers/dataQueryHandler.jsQueryEngine adapter; template-resolved args
handlers/conditionHandler.jsBranch evaluation β€” returns first truthy branch as nextHandle
handlers/delayHandler.jsNon-blocking delay via queueDelay return value
handlers/loopHandler.jsArray iteration scaffolding
workflow.socket.controller.jsSocket handler β€” workflow_run_join