Workflow Automation Engine¶
The Workflow Automation Engine is a generic, node-based automation runtime that drives Sales CRM lead nurture end to end. An admin authors a visual graph (trigger → wait → send email / WhatsApp / AI call → branch) on a React-Flow canvas; the backend persists it as JSONB, scans CRM lead segments (or reacts to lead lifecycle events, or runs on demand), enrolls matching leads, and a BullMQ worker advances each enrollment one node at a time — reusing the existing email, WhatsApp, and Aarya AI-calling services rather than re-implementing any send logic. A second, much simpler subsystem under the same "workflow" name stores Mr. Hire recruitment rules + email templates as plain CRUD config.
Status: documented from source on this branch.
Overview¶
There are two separate things in this codebase that carry the word "workflow". This document covers both, but the headline is the first.
| Subsystem | Schema / store | Author | Executes? | Source |
|---|---|---|---|---|
| Sales Automation Engine (node-based) | mas_crm.workflows, workflow_enrollments, workflow_node_runs |
ADMIN | Yes — BullMQ engine | src/services/WorkflowEngineService.ts, src/services/WorkflowService.ts |
| Mr. Hire Workflow Rules (recruitment) | workflow_rules, email_templates |
any authed user | No engine here — CRUD config only | src/controllers/WorkflowController.ts, src/entities/WorkflowRule.entity.ts |
Who uses it
- Sales admins (
role === ADMINonly — sales reps and even sales heads are intentionally excluded byadminMiddleware) build, enable, monitor, and retry Sales workflows at/api/admin/workflows/*. - The WorkflowWorker (
src/workers/workflow.worker.ts, queueworkflowQueue, concurrency 5) is the runtime: it runs the scheduled segment scan, advances each enrollment one node per job, and reacts to lead events. - Recruiters / Mr. Hire users manage workflow rules and email templates at
/api/mr-hire/workflow/*(auth only, no admin gate).
Where it sits in the suite — the Sales engine is a pure consumer of the Sales CRM (RawLead, LeadCallLog, lead email/WhatsApp logs) and of the comms + AI-calling services. It owns its own tables in the mas_crm schema and adds zero new send mechanics; everything routes through LeadEmailService, WhatsAppService / LeadWhatsAppService, and ElevenLabsBatchService.
Key concepts & entities¶
Glossary
- Workflow — a named, enable/disable-able automation. Holds a visual
graph(nodes + edges) and atriggerConfig(which leads enter, and how). - Node — one step in the graph. Types:
trigger,wait,sendEmail,sendWhatsApp,aaryaCall,branch(WorkflowNodeTypeinsrc/entities/workflow/workflowTypes.ts). - Edge — a directed connection between two nodes. A branch node's edges carry a
sourceHandle(hot/warm/cold/no_engagement) to pick the path. - Trigger mode —
segment(scheduled scan of all matching leads),event(enroll the instant a lifecycle event fires), ormanual(admin hand-picks leads via a Run modal). - Enrollment — one lead's journey through one workflow. Tracks the
currentNodeIdit sits on and acontextscratchpad. - Node run — one audit row per time the engine executes a node for an enrollment; powers the timeline UI.
- Aarya call — an outbound AI phone call (ElevenLabs) whose recorded duration is later bucketed into hot/warm/cold/no_engagement and used by
branchnodes.
Main entities
| Entity | File | Table | Notes |
|---|---|---|---|
Workflow |
src/entities/workflow/Workflow.entity.ts |
mas_crm.workflows |
graph JSONB, triggerConfig JSONB, cached triggerNodeId, enabled, enrollmentLimit, owningSalesHeadId, createdBy, lastScanAt |
WorkflowEnrollment |
src/entities/workflow/WorkflowEnrollment.entity.ts |
mas_crm.workflow_enrollments |
workflowId, leadId, status, currentNodeId, context JSONB, waitingForCallLogId, resumeAt, completedAt. Partial unique index (workflowId, leadId) WHERE status IN active/waiting/paused |
WorkflowNodeRun |
src/entities/workflow/WorkflowNodeRun.entity.ts |
mas_crm.workflow_node_runs |
enrollmentId, workflowId, nodeId, nodeType, status, result JSONB, error |
WorkflowRule |
src/entities/WorkflowRule.entity.ts |
workflow_rules |
Recruitment rules: condition, conditionValue, action, actionTemplate, enabled, triggerCount, jobPostId, belongsTo |
Type shapes (src/entities/workflow/workflowTypes.ts)
WorkflowGraph = { nodes: WorkflowNode[]; edges: WorkflowEdge[] }— stored verbatim as JSONB, walked in memory, never queried via SQL.WorkflowTriggerConfig— a whitelisted subset ofRawLeadService.ListInput(createdFrom/To,assignedTo,assignedCounsellor,owningSalesHeadId,sourceType,vendorApiKeyId,interestLevel,aaryaInterest,webinarStatus,sessionYear,tagIds,leadStatus) plustriggerTypeandevents[].LeadEventType—lead.created,lead.assigned,lead.counsellor_assigned,lead.owner_changed,lead.interest_changed,lead.webinar_changed,lead.converted,lead.closed.
Architecture¶
flowchart TD
subgraph FE["Admin UI (React Flow builder)"]
UI["Sales Workflows canvas"]
end
subgraph API["Express API (mounted at /api)"]
SWR["SalesWorkflowRoutes (admin-only)"]
WR["WorkflowRoutes (Mr. Hire rules)"]
end
subgraph CTRL["Controllers"]
SWC["SalesWorkflowController"]
WC["WorkflowController"]
end
subgraph SVC["Services"]
WS["WorkflowService (CRUD + graph validation)"]
ENG["WorkflowEngineService (execution)"]
LES["LeadEventService (event chokepoint)"]
QS["QueueService (workflowQueue)"]
end
subgraph WRK["BullMQ worker"]
WW["workflow.worker (scan / step / event)"]
end
subgraph EXT["Reused send + AI services"]
LEM["LeadEmailService"]
WA["WhatsAppService + LeadWhatsAppService"]
EL["ElevenLabsBatchService (Aarya)"]
ACS["AaryaCallSyncService (call buckets)"]
RLS["RawLeadService (segment query)"]
end
subgraph DB["PostgreSQL (mas_crm schema)"]
WFT["workflows"]
ENR["workflow_enrollments"]
NRUN["workflow_node_runs"]
LEADS["raw_leads + lead_call_logs"]
RULES["workflow_rules + email_templates"]
end
UI --> SWR --> SWC --> WS
WC --> RULES
WR --> WC
WS --> WFT
WS --> ENG
WS --> QS
ENG --> RLS --> LEADS
ENG --> ENR
ENG --> NRUN
ENG --> LEM
ENG --> WA
ENG --> EL
ENG --> ACS
LES --> QS
QS --> WW
WW --> ENG
Data model¶
erDiagram
WORKFLOW ||--o{ WORKFLOW_ENROLLMENT : "enrolls"
WORKFLOW ||--o{ WORKFLOW_NODE_RUN : "audits"
WORKFLOW_ENROLLMENT ||--o{ WORKFLOW_NODE_RUN : "produces"
RAW_LEAD ||--o{ WORKFLOW_ENROLLMENT : "is enrolled as"
LEAD_CALL_LOG ||--o{ WORKFLOW_ENROLLMENT : "wakes via waitingForCallLogId"
WORKFLOW {
uuid id PK
string name
text description
boolean enabled
jsonb graph
jsonb trigger_config
string trigger_node_id
int enrollment_limit
uuid owning_sales_head_id
uuid created_by
timestamp last_scan_at
}
WORKFLOW_ENROLLMENT {
uuid id PK
uuid workflow_id FK
uuid lead_id FK
enum status
string current_node_id
jsonb context
uuid waiting_for_call_log_id
timestamp resume_at
timestamp completed_at
timestamp enrolled_at
}
WORKFLOW_NODE_RUN {
uuid id PK
uuid enrollment_id FK
uuid workflow_id FK
string node_id
string node_type
enum status
jsonb result
text error
timestamp created_at
}
WORKFLOW_RULE {
uuid id PK
string name
string condition
int conditionValue
string action
string actionTemplate
boolean enabled
int triggerCount
uuid jobPostId FK
uuid belongsTo
}
Notable enums / status fields
WorkflowEnrollmentStatus:active,waiting,paused,completed,failed,cancelled.WorkflowNodeRunStatus:success,failed,skipped,scheduled.WorkflowNodeType:trigger,aaryaCall,branch,sendEmail,sendWhatsApp,wait.WorkflowTriggerConfig.triggerType:segment(default when absent),event,manual.WorkflowRulerelates only loosely toJobPost(onDelete: SET NULL);WorkflowEnrollment.leadIdis a loose link tomas_crm.raw_leads(no cascade).
API surface¶
All routes are mounted under /api (src/routes/index.ts). Two route classes:
Sales Automation Engine — SalesWorkflowRoutes (admin only)¶
Guard on every route: authMiddleware + adminMiddleware (ADMIN role only).
| Method | Path | Auth/role | Purpose |
|---|---|---|---|
| GET | /api/admin/workflows/metadata |
ADMIN | Builder dropdown data — email/WhatsApp templates, Aarya agents + phone numbers, vendors, assignees, sales heads, segment field options, lead event types, branch outputs |
| POST | /api/admin/workflows/scan-now |
ADMIN | Enqueue an immediate segment scan pass |
| GET | /api/admin/workflows |
ADMIN | List workflows + active/total enrollment counts |
| POST | /api/admin/workflows |
ADMIN | Create a workflow (validates graph; starts disabled) |
| GET | /api/admin/workflows/:id |
ADMIN | Get one workflow (full graph + config) |
| PUT | /api/admin/workflows/:id |
ADMIN | Update a workflow (re-validates graph) |
| PATCH | /api/admin/workflows/:id/enabled |
ADMIN | Enable / disable; enabling re-wakes paused enrollments + kicks a scan |
| DELETE | /api/admin/workflows/:id |
ADMIN | Cancel in-flight enrollments, then delete workflow + runs |
| POST | /api/admin/workflows/:id/run-manual |
ADMIN | Enroll a hand-picked leadIds[] (manual-trigger workflows only) |
| GET | /api/admin/workflows/:id/enrollments |
ADMIN | List enrollments (page, limit, scope=active\|all) with lead identity + current-step label |
| GET | /api/admin/workflows/:id/enrollments/:eid/runs |
ADMIN | Node-run timeline for one enrollment |
| POST | /api/admin/workflows/:id/enrollments/:eid/retry |
ADMIN | Rewind to the earliest unresolved failed node and re-activate |
| POST | /api/admin/workflows/:id/enrollments/:eid/retry-from |
ADMIN | Re-run from a specific nodeId in the body |
Mr. Hire Workflow Rules + Templates — WorkflowRoutes (auth only)¶
Guard: authMiddleware. Rows are owned per-user (belongsTo) with shared system defaults (belongsTo IS NULL).
| Method | Path | Auth/role | Purpose |
|---|---|---|---|
| GET | /api/mr-hire/workflow/rules |
authed | List own + system-default rules |
| POST | /api/mr-hire/workflow/rules |
authed | Create a rule (name, condition, action required) |
| PUT | /api/mr-hire/workflow/rules/:id |
authed | Update own or system-default rule |
| DELETE | /api/mr-hire/workflow/rules/:id |
authed | Delete own rule only (403 on system defaults) |
| PATCH | /api/mr-hire/workflow/rules/:id/toggle |
authed | Flip enabled |
| GET | /api/mr-hire/workflow/email-templates |
authed | List own + system-default templates |
| POST | /api/mr-hire/workflow/email-templates |
authed | Create a template |
| PUT | /api/mr-hire/workflow/email-templates/:id |
authed | Update a template |
| POST | /api/mr-hire/workflow/email-templates/:id/send-test |
authed | Send a test email with placeholder substitution to to |
| DELETE | /api/mr-hire/workflow/email-templates/:id |
authed | Delete own template only |
User journeys¶
1. Define a workflow (author nodes / edges / trigger)¶
An admin lays out the graph in the React-Flow builder and saves. WorkflowService.validateGraph enforces structural sanity before anything is persisted, and the single trigger node's id is cached on the row so the engine can enter without re-scanning.
sequenceDiagram
participant Admin
participant API as SalesWorkflowController
participant WS as WorkflowService
participant DB as workflows table
Admin->>API: GET /api/admin/workflows/metadata
API->>WS: getMetadata
WS-->>API: templates, aarya agents, segment fields, event types
API-->>Admin: dropdown data for the builder
Admin->>API: POST /api/admin/workflows with name, graph, triggerConfig
API->>WS: create with createdBy from req.user
WS->>WS: validateGraph checks one trigger, unique ids, valid edges
Note over WS: rejects cycles, branch handles on non-branch nodes, and unconfigured send steps
alt graph invalid
WS-->>API: throw 400 with reason
API-->>Admin: 400 error message
else graph valid
WS->>DB: insert row enabled false with cached triggerNodeId
DB-->>WS: saved workflow
WS-->>API: workflow
API-->>Admin: 201 created
end
Validation rules in validateGraph: exactly one trigger node; no duplicate node ids; every edge references existing nodes; sourceHandle labels only on branch nodes; sendEmail must have a templateKey and sendWhatsApp a templateName; and the graph must be acyclic (DFS three-colour cycle check — a back-edge would loop forever).
2. Enable a workflow (and re-wake paused leads)¶
sequenceDiagram
participant Admin
participant API as SalesWorkflowController
participant WS as WorkflowService
participant QS as QueueService
participant DB as enrollments
Admin->>API: PATCH /api/admin/workflows/:id/enabled with enabled true
API->>WS: setEnabled id true
WS->>DB: save workflow enabled true
WS->>DB: find enrollments status paused
loop each paused enrollment
WS->>DB: set status active
WS->>QS: addWorkflowStepJob enrollmentId
end
WS->>QS: addWorkflowScanJob
WS-->>API: saved
API-->>Admin: 200 enabled
Disabling does the reverse lazily: the engine parks any in-flight enrollment as paused at its next step (it does not actively stop running jobs).
3. Segment scan enrolls newly-matching leads¶
The repeatable workflowScan job (every 5 minutes) is the default enrollment path for segment-mode workflows. The scan reuses RawLeadService.list so the segment filter is the exact same query the CRM lead table uses.
sequenceDiagram
participant Cron as workflowQueue repeat
participant WW as workflow.worker
participant ENG as WorkflowEngineService
participant RLS as RawLeadService
participant DB as enrollments
participant QS as QueueService
Cron->>WW: workflowScan job
WW->>ENG: runTriggerScan
ENG->>ENG: sweepStaleAaryaWaits then flushAaryaBuffers
ENG->>DB: find workflows enabled true
loop each enabled workflow
alt triggerType event or manual
ENG->>ENG: skip in scan
else segment mode
ENG->>RLS: list with segment filter limit min 200 or enrollmentLimit
RLS-->>ENG: matching lead rows
ENG->>DB: find existing enrollments for these leadIds
Note over ENG: leads already or previously enrolled are skipped
loop each fresh lead
ENG->>DB: insert active enrollment with orIgnore
ENG->>QS: addWorkflowStepJob enrollmentId
end
ENG->>DB: update workflow lastScanAt
end
end
WW-->>Cron: scan complete with enrolled count
The insert uses ON CONFLICT DO NOTHING (orIgnore) against the partial unique index, so concurrent scans or events can never double-enroll a lead. Only when a new row is actually created does the engine queue the first step.
4. The engine advances one node per job (the execution loop)¶
This is the core loop: each workflowStep job runs exactly one node for one enrollment and (usually) queues the next step. A depth-first pending-node queue lives in enrollment.context.queue so non-branch nodes can fan out to multiple successors and still run them sequentially within the same enrollment.
sequenceDiagram
participant QS as workflowQueue
participant WW as workflow.worker
participant ENG as WorkflowEngineService
participant DB as enrollments + node_runs
participant SND as send + AI services
QS->>WW: workflowStep enrollmentId
WW->>ENG: processEnrollmentStep enrollmentId
ENG->>DB: load enrollment
alt status completed failed or cancelled
ENG-->>WW: no-op
else workflow disabled mid-run
ENG->>DB: set status paused
else nodeRunCount over 100
ENG->>DB: finish failed max_node_runs
else lead converted or closed
ENG->>DB: finish completed
else runnable
ENG->>ENG: find current node in graph
alt node missing
ENG->>DB: finish completed end_of_graph
else node found
ENG->>SND: run node by type
ENG->>DB: write node_run row
ENG->>ENG: planNext folds successors into context queue
alt has next node
ENG->>DB: set currentNodeId and status
ENG->>QS: addWorkflowStepJob next
else nothing left
ENG->>DB: finish completed end_of_graph
end
end
end
Per-node behaviour in processEnrollmentStep:
- trigger — log success, advance.
- sendEmail —
LeadEmailService.sendwith the node'stemplateKey(or customsubject/body). A missing template AND missing body hard-fails the enrollment so it is visible and retryable. A 400 send error (no email on file) also hard-fails; transient 5xx/SMTP errors follow the node'sonError(skip by default). - sendWhatsApp — resolves
{{name}},{{phone}},{{email}},{{sessionYear}}, andextraDatatokens in each template param, sends viaWhatsAppService.sendMessage(sender = workflowcreatedBy), and records aLeadWhatsAppLogrow. No phone on file logs a failure and (by default) skips on. - wait — schedules a delayed
workflowStepjob (duration×unitof minutes/hours/days, floored at 1s), parks the enrollmentwaitingwithresumeAt. - aaryaCall — see journey 5.
- branch — see journey 6.
- unknown type — logged as
skippedand advanced, so a bad graph never strands a lead.
On any thrown step error, the node is logged failed; the enrollment then either hard-fails (if node.data.onError === 'fail') or skips past and advances. A runaway guard caps any single enrollment at MAX_NODE_RUNS_PER_ENROLLMENT = 100 nodes.
5. Aarya AI call node (immediate + batched, watchdog timeout)¶
An aaryaCall node dispatches an outbound AI phone call via ElevenLabsBatchService.submitBatch, then parks the enrollment waiting on the dispatch call-log. AaryaCallSyncService wakes it when the call's duration is written back; if the call never returns, a watchdog deadline nudges it forward as no_engagement.
sequenceDiagram
participant ENG as WorkflowEngineService
participant EL as ElevenLabsBatchService
participant DB as enrollments + call_logs
participant ACS as AaryaCallSyncService
participant QS as workflowQueue
Note over ENG: fresh arrival at aaryaCall node
alt batchSize is 1 immediate mode
ENG->>EL: submitBatch one lead
EL->>DB: write dispatch LeadCallLog
ENG->>DB: park waiting with waitingForCallLogId and resumeAt timeout
else batchSize over 1 buffered mode
ENG->>DB: park waiting buffered with maxWait deadline
ENG->>ENG: maybeFlushAaryaBuffer when pool full
ENG->>EL: submitBatch claimed leads via FOR UPDATE SKIP LOCKED
ENG->>DB: park each enrollment waiting on its dispatch log
end
Note over ACS: later the call completes and duration is recorded
ACS->>DB: find enrollments waitingForCallLogId in completed logs
ACS->>QS: addWorkflowWakeJob enrollmentId
QS->>ENG: workflowStep
ENG->>DB: read call log duration
alt duration present
ENG->>DB: clear waitingForCallLogId and advance
else still no duration and past watchdog
Note over ENG: scan sweepStaleAaryaWaits nudges it forward
ENG->>DB: advance treated as no_engagement downstream
end
Constants: DEFAULT_AARYA_TIMEOUT_MIN = 1440 (24h watchdog), DEFAULT_AARYA_BATCH_MAX_WAIT_MIN = 120 (2h max wait before a partial batch flushes). Batch claiming uses SELECT ... FOR UPDATE SKIP LOCKED with an aaryaDispatching context marker so concurrent workers never grab the same buffered leads; a failed dispatch un-claims them for retry.
6. Branch node routes on the latest AI-call outcome¶
sequenceDiagram
participant ENG as WorkflowEngineService
participant DB as lead_call_logs
participant ACS as AaryaCallSyncService
ENG->>DB: latest aarya call duration for lead
alt no call found
ENG->>ENG: bucket no_engagement
else call found
ENG->>ACS: bucketFromDuration seconds
ACS-->>ENG: hot 30s plus warm 10 to 30s cold under 10s
end
ENG->>ENG: pick edge by sourceHandle matching bucket
Note over ENG: no_engagement falls back to cold then first available edge
ENG->>DB: set currentNodeId to matched target and queue step
If no outgoing edge matches the bucket and there is no fallback, the enrollment finishes completed with reason branch_<bucket>_no_edge.
7. Event-driven enrollment (real time)¶
CRM mutations publish lifecycle events through LeadEventService (a fire-and-forget chokepoint that never throws into the caller). Event-mode workflows enroll the instant a subscribed event fires, with the segment filter acting as an optional guard.
sequenceDiagram
participant CRM as RawLeadService mutation
participant LES as LeadEventService
participant QS as workflowQueue
participant WW as workflow.worker
participant ENG as WorkflowEngineService
participant RLS as RawLeadService
CRM->>LES: emit lead.assigned leadIds
LES->>QS: addWorkflowEventJob best effort
QS->>WW: workflowEvent eventType leadIds
WW->>ENG: handleLeadEvent eventType leadIds
ENG->>ENG: find enabled event workflows subscribing to this event
loop each workflow and lead
ENG->>RLS: list singleLeadId with segment guard
alt lead still matches guard
ENG->>ENG: enrollLead with orIgnore
else guard not satisfied
ENG->>ENG: skip this lead
end
end
WW-->>QS: enrolled count
8. Manual run (admin hand-picks leads)¶
sequenceDiagram
participant Admin
participant API as SalesWorkflowController
participant ENG as WorkflowEngineService
participant DB as raw_leads + enrollments
Admin->>API: POST /api/admin/workflows/:id/run-manual leadIds
API->>ENG: runManual id leadIds
alt workflow not manual type
ENG-->>API: 400 not a manual-run workflow
API-->>Admin: error
else manual workflow
ENG->>DB: resolveEnrollableLeadIds existing and in head pool
loop each valid lead
ENG->>DB: enrollLead with orIgnore
end
ENG-->>API: enrolled skipped requested counts
API-->>Admin: 200 Enrolled N skipped M
end
Forged or out-of-pool lead ids are dropped server-side: resolveEnrollableLeadIds keeps only ids that exist and (when the workflow has an owningSalesHeadId) belong to that head's pool.
9. Monitor + retry a stuck lead¶
sequenceDiagram
participant Admin
participant API as SalesWorkflowController
participant WS as WorkflowService
participant DB as node_runs + enrollments
participant QS as workflowQueue
Admin->>API: GET /api/admin/workflows/:id/enrollments scope active
API->>WS: listEnrollments
WS-->>API: rows with lead identity, current step, lastBranch
Admin->>API: GET enrollments/:eid/runs
API->>WS: listRuns
WS-->>API: node run timeline ordered by createdAt
Admin->>API: POST enrollments/:eid/retry
API->>WS: retryEnrollment
WS->>DB: find earliest failed node with no later success
WS->>DB: rewind set status active currentNodeId completedAt null
WS->>QS: addWorkflowStepJob
API-->>Admin: 200 retried at nodeId
retry rewinds to the earliest still-unresolved failure so a multi-step failure re-walks in order; retry-from rewinds to an explicit nodeId chosen in the UI.
10. Mr. Hire workflow rule + test email (the simpler subsystem)¶
sequenceDiagram
participant User
participant API as WorkflowController
participant DB as workflow_rules + email_templates
participant ES as EmailService
User->>API: POST /api/mr-hire/workflow/rules name condition action
API->>DB: insert rule belongsTo user
API-->>User: 201 rule
User->>API: POST email-templates/:id/send-test to
API->>DB: load template own or system default
API->>API: substitute placeholder vars with sample data
API->>ES: sendEmail to subject html
ES-->>API: sent
API-->>User: 200 test email sent
Background jobs & async¶
Queue: workflowQueue (BullMQ), created in QueueService and drained by src/workers/workflow.worker.ts at concurrency 5.
| Job name | Producer | Schedule | Action |
|---|---|---|---|
workflowScan |
QueueService.scheduleWorkflowScan (boot) + addWorkflowScanJob (admin / enable) |
repeatable every 5 minutes, stable jobId: workflow-scan |
runTriggerScan — sweep stale Aarya waits, flush Aarya batch buffers, enroll new segment matches |
workflowStep |
addWorkflowStepJob / addWorkflowWakeJob |
on demand (with delay for Wait nodes) |
processEnrollmentStep — run one node, queue the next |
workflowEvent |
addWorkflowEventJob via LeadEventService |
on demand (fire-and-forget) | handleLeadEvent — enroll leads into matching event-mode workflows |
The repeatable scan is registered at startup (src/index.ts → queueService.scheduleWorkflowScan()), and re-registration replaces (not stacks) thanks to the stable job id. removeOnComplete / removeOnFail caps keep the queue tidy. There are no Socket.IO events or inbound webhooks in this domain; the only "callback" is AaryaCallSyncService waking parked enrollments when an ElevenLabs call's duration syncs back.
External integrations¶
The engine itself integrates nothing directly — it composes existing services. Failures are isolated so one bad lead never strands the rest.
| Integration | Via | Failure / fallback |
|---|---|---|
LeadEmailService.send (Gmail SMTP under the hood) |
Missing template/body or 400 (no email on file) → hard-fail the enrollment (visible + retryable). Transient errors → skip per onError. |
|
WhatsAppService.sendMessage + LeadWhatsAppService.recordSend (Meta Cloud API) |
No phone → log fail + skip. Send failure recorded as LeadWhatsAppStatus.FAILED. Builder enforces param count to avoid Meta error #132000. Template list in metadata is best-effort (returns [] if WhatsApp unconfigured). |
|
| Aarya AI calling | ElevenLabsBatchService.submitBatch / listAgents / listPhoneNumbers |
Best-effort in metadata. 24h call watchdog; partial batches flush after 2h. Dispatch failure un-claims buffered leads for retry. |
| CRM segment query | RawLeadService.list |
Reused verbatim so segment + event guards always match the CRM list view. |
| Call buckets | AaryaCallSyncService.bucketFromDuration |
Thresholds: hot ≥ 30s, warm 10–30s, cold < 10s, no call → no_engagement. |
There are no new env vars introduced by the engine; it inherits Redis (REDIS_HOST / REDIS_PORT) for BullMQ and whatever email / WhatsApp / ElevenLabs config the reused services need. No feature flag gates the engine — it is active whenever a workflow is enabled.
Status lifecycles¶
Enrollment lifecycle¶
stateDiagram-v2
[*] --> active : enrolled
active --> waiting : wait node or aarya call parks it
waiting --> active : timer fires or call completes
active --> active : advance to next node
active --> paused : workflow disabled mid-run
paused --> active : workflow re-enabled
active --> completed : end of graph or lead converted or closed
waiting --> completed : end after wait
active --> failed : missing template or max node runs or onError fail
active --> cancelled : workflow deleted
waiting --> cancelled : workflow deleted
paused --> cancelled : workflow deleted
completed --> active : admin retry rewinds
failed --> active : admin retry rewinds
completed --> [*]
failed --> [*]
cancelled --> [*]
Node run status¶
stateDiagram-v2
[*] --> success : node executed cleanly
[*] --> scheduled : wait delayed or aarya call dispatched or buffered
[*] --> skipped : unknown node type or non-fatal failure skipped past
[*] --> failed : send error or config gap
scheduled --> [*]
success --> [*]
skipped --> [*]
failed --> [*]
Workflow enable state¶
stateDiagram-v2
[*] --> disabled : created
disabled --> enabled : enable rewakes paused and scans
enabled --> disabled : disable parks in-flight as paused
enabled --> [*] : deleted cancels in-flight
disabled --> [*] : deleted
Edge cases, limits & gotchas¶
- Two unrelated "workflow" systems.
/api/admin/workflows/*(Sales engine, ADMIN-only) and/api/mr-hire/workflow/*(recruitment rules, any authed user) share a name but nothing else. The recruitmentWorkflowRuleentity stores config only — no engine in this repo executes those rules. - ADMIN-only, not sales-head.
adminMiddlewareallows onlyrole === ADMIN; sales reps and sales heads cannot reach the Sales workflow API even though workflows can be pool-scoped viaowningSalesHeadId. - No double enrollment. The partial unique index
(workflowId, leadId) WHERE status IN (active, waiting, paused)plusorIgnore(ON CONFLICT DO NOTHING) makes scan + event + manual paths idempotent against races. A previously-completed lead is not re-enrolled by a later scan (re-running is intentionally off by default). - Graph must be acyclic and singly-triggered. Save fails (400) on cycles, multiple/zero trigger nodes, dangling edges, branch handles on non-branch nodes, or unconfigured send steps.
- Runaway + lifecycle guards. Any enrollment over 100 node runs hard-fails (
max_node_runs). Converted/closed leads complete immediately without acting; deleted leads or workflows fail/cancel the enrollment. - Fan-out is depth-first and sequential. A non-branch node with multiple outgoing edges pushes all targets to the front of
context.queue; they run one job at a time within the same enrollment, not in parallel. - Disable is lazy. Disabling does not stop queued jobs; the next step of each in-flight enrollment parks it
paused. Re-enabling re-wakes paused rows and kicks a fresh scan. - WhatsApp sender identity. Sends use the workflow's
createdByas the sender user id; a workflow authored by a now-deleted user may send with an empty sender. - Aarya watchdog vs. real outcome. A call that never reports a duration is swept forward after the timeout and treated as
no_engagement— so a branch may route a lead as cold even though the call simply failed to sync. - Multi-tenant. This domain is Sales-CRM-scoped (
mas_crmschema) and not driven by thex-platformheader; isolation is viaowningSalesHeadIdpools, not platform. - Metadata is best-effort. WhatsApp templates, Aarya agents, and phone numbers all degrade to
[]if their upstream is unconfigured, so the builder always loads.