ποΈ Architecture Deep Dive
This document provides a comprehensive technical overview of the Jet Admin Workflow Engine. It covers the architecture, execution flow, core components, and implementation details for both backend and frontend.
1. High-Level Architectureβ
The workflow engine is designed as an event-driven, distributed system using RabbitMQ for decoupling the Orchestrator from the Task Workers. This allows for scalability and robust failure handling.
2. Core Componentsβ
2.1 Workflow Service (workflow.service.js)β
The entry point for all workflow operations. It handles CRUD for workflow definitions and triggers executions.
executeWorkflow: Starts a production run. Creates atblWorkflowInstancesrecord and queues the Start Node.testWorkflow: Starts a test run. Instead of reading from the DB, it places the in-memory node/edge definition into the execution context (__workflowDefinition), allowing users to test unsaved changes.
2.2 Orchestrator (orchestrator.js)β
The "Brain" of the engine. It consumes Execution Results and decides what to do next.
Key Responsibilities:
- Consume Results: Listens to
workflow.results. - Update State: Updates the workflow instance context with the output of the completed node. Uses optimistic locking to prevent race conditions.
- Determine Next Step:
- If
isTestRun: Uses in-memory graph from context. - If Production: Calls
dagSchedulerto find downstream nodes from the DB.
- If
- Dispatch Jobs: Pushes new jobs to
workflow.tasksfor the next nodes. - Completion: If a terminal node (End Node) is reached, marks the instance as
COMPLETED.
2.3 Task Worker (taskWorker.js)β
The "Muscle" of the engine. It consumes Task Jobs and executes the specific logic for that node type.
Key Responsibilities:
- Consume Tasks: Listens to
workflow.tasks. - Route to Handler: Based on
nodeType(e.g.,javascript,dataQuery), delegates to the specific handler inhandlers/. - Execute: Runs the handler logic. This is statelessβit receives everything it needs (config + context) in the job payload.
- Retry Logic: Handles retries with exponential backoff using specific delayed queues (e.g.,
workflow.tasks_delayed_5000). - Publish Result: Sends success/error payload to
workflow.results.
2.4 State Manager (stateManager.js)β
Handles all database interactions for execution state.
- Optimistic Locking: When updating context, it checks
version. If the version in DB has changed, it retries. This ensures that parallel branches updating the context don't overwrite each other's data.
3. Data Structuresβ
3.1 Context (ctx)β
The contextData generic JSON field in tblWorkflowInstances holds the state.
input: Initial arguments passed to the workflow.- Node Outputs: Each node's output is stored under its
nodeID.{
"input": { "userId": 123 },
"node_abc123": { "success": true, "data": [...] },
"node_xyz789": { "jsResult": 42 }
}
3.2 Job Payloadβ
Message sent to workflow.tasks:
{
"instanceID": "uuid",
"nodeID": "uuid",
"nodeType": "javascript",
"nodeConfig": { "code": "return 1+1" },
"context": { ... }, // Snapshot of context at dispatch time
"workflowID": "uuid"
}
3.3 Result Payloadβ
Message sent to workflow.results:
{
"instanceID": "uuid",
"nodeID": "uuid",
"status": "success", // or 'error'
"output": { "result": 2 },
"nextHandle": "success" // Which output handle to follow (e.g., 'true', 'false', 'error')
}
4. Implementation Details & Code Samplesβ
4.1 Orchestrator Logicβ
The core "Check-Decide-Act" loop in orchestrator.js:
// orchestrator.js
async function handleTaskResult(result) {
const { instanceID, nodeID, status, output, nextHandle } = result;
// 1. Update Context (State)
const updatedInstance = await stateManager.updateContext(
instanceID,
{ [nodeID]: output }, // Merge new output into context
instance.version
);
// 2. Calculate Next Nodes (Decision)
const nextNodes = await dagScheduler.calculateNextNodes(
instance.workflowID,
nodeID,
nextHandle // Follow specific handle (e.g., 'success')
);
// 3. Queue Next Jobs (Action)
for (const nextNode of nextNodes) {
await addNodeJob({
instanceID,
nodeID: nextNode.nodeID,
nodeType: nextNode.nodeType,
nodeConfig: nextNode.nodeConfig,
context: updatedInstance.contextData,
});
}
}
4.2 Node Handler (Backend)β
Example of the JavaScript Node Execution in handlers/javascriptHandler.js. Note the use of vm2 for sandboxing.
// handlers/javascriptHandler.js
const { VM } = require('vm2');
async function execute(nodeConfig, context) {
const { code, outputVariable } = nodeConfig;
// Create sandbox with access to context
const sandbox = {
ctx: context,
console: { log: () => {} }, // Security: disable console
// ... allow safe globals like Math, JSON
};
const vm = new VM({ timeout: 1000, sandbox });
try {
// Run user code
const result = vm.run(code);
return {
output: { [outputVariable]: result },
nextHandle: 'success'
};
} catch (error) {
return {
output: { error: error.message },
nextHandle: 'error' // Follow 'error' edge
};
}
}
4.3 Node Component (Frontend)β
The frontend uses JSON Forms to render the configuration panel. This allows for declarative UI definitions.
// packages/workflow-nodes/src/nodes/javascriptNode.jsx
export const JavascriptNodeConfigurator = ({ data, onChange }) => {
// Define Schema for JSON Forms
const schema = {
type: 'object',
properties: {
code: {
type: 'string',
format: 'code-javascript',
title: 'Network Code'
},
timeoutSeconds: { type: 'integer', default: 30 }
}
};
return (
<JsonForms
schema={schema}
data={data}
onChange={({ data }) => onChange(data)}
renderers={workflowNodeRenderers} // Custom renderers
/>
);
};
5. Adding a New Node Typeβ
To add a new node type (e.g., slackNode):
-
Frontend (
packages/workflow-nodes):- Create
slackNode.jsxwithSlackNode(visual) andSlackNodeConfigurator(config form). - Register in
nodeTypesmap.
- Create
-
Backend Handler (
apps/backend/modules/workflow/workers/handlers):- Create
slackHandler.js:async function execute(config, context) {
/* call slack api */
return { output: { sent: true }, nextHandle: 'success' };
}
module.exports = { execute }; - Register in
handlers/index.js.
- Create
-
Queue: No changes needed! The
taskWorkerdynamically loads the handler based on thenodeTypestring.
6. Error Handling & Retriesβ
- User Errors: Caught in the handler. The handler returns
nextHandle: 'error', allowing the workflow to proceed down an "Error" path (if defined). - System Errors (Crashes): Caught by
taskWorker.- Retry: If
attempts < maxAttempts, the message is sent to a Delayed Queue (RabbitMQx-message-ttl). - Dead Letter: After max retries, it moves to
workflow.tasks.dlq.
- Retry: If
7. Development Tipsβ
- Test Mode: When you run "Test" in the UI, the backend does not create DB nodes. It relies entirely on the JSON payload you send. This is crucial for rapid iteration.
- Context usage: Always use
ctx.to access data from previous nodes. - Logs: Execution logs are stored in
tblNodeExecutionLogs. This is what populates the "Console" in the UI.