Skip to content

Workflow Automation Engine

The Workflow Automation Engine is a generic, node-based automation runtime that drives Sales CRM lead nurture end to end. An admin authors a visual graph (trigger → wait → send email / WhatsApp / AI call → branch) on a React-Flow canvas; the backend persists it as JSONB, scans CRM lead segments (or reacts to lead lifecycle events, or runs on demand), enrolls matching leads, and a BullMQ worker advances each enrollment one node at a time — reusing the existing email, WhatsApp, and Aarya AI-calling services rather than re-implementing any send logic. A second, much simpler subsystem under the same "workflow" name stores Mr. Hire recruitment rules + email templates as plain CRUD config.

Status: documented from source on this branch.


Overview

There are two separate things in this codebase that carry the word "workflow". This document covers both, but the headline is the first.

Subsystem Schema / store Author Executes? Source
Sales Automation Engine (node-based) mas_crm.workflows, workflow_enrollments, workflow_node_runs ADMIN Yes — BullMQ engine src/services/WorkflowEngineService.ts, src/services/WorkflowService.ts
Mr. Hire Workflow Rules (recruitment) workflow_rules, email_templates any authed user No engine here — CRUD config only src/controllers/WorkflowController.ts, src/entities/WorkflowRule.entity.ts

Who uses it

  • Sales admins (role === ADMIN only — sales reps and even sales heads are intentionally excluded by adminMiddleware) build, enable, monitor, and retry Sales workflows at /api/admin/workflows/*.
  • The WorkflowWorker (src/workers/workflow.worker.ts, queue workflowQueue, concurrency 5) is the runtime: it runs the scheduled segment scan, advances each enrollment one node per job, and reacts to lead events.
  • Recruiters / Mr. Hire users manage workflow rules and email templates at /api/mr-hire/workflow/* (auth only, no admin gate).

Where it sits in the suite — the Sales engine is a pure consumer of the Sales CRM (RawLead, LeadCallLog, lead email/WhatsApp logs) and of the comms + AI-calling services. It owns its own tables in the mas_crm schema and adds zero new send mechanics; everything routes through LeadEmailService, WhatsAppService / LeadWhatsAppService, and ElevenLabsBatchService.


Key concepts & entities

Glossary

  • Workflow — a named, enable/disable-able automation. Holds a visual graph (nodes + edges) and a triggerConfig (which leads enter, and how).
  • Node — one step in the graph. Types: trigger, wait, sendEmail, sendWhatsApp, aaryaCall, branch (WorkflowNodeType in src/entities/workflow/workflowTypes.ts).
  • Edge — a directed connection between two nodes. A branch node's edges carry a sourceHandle (hot / warm / cold / no_engagement) to pick the path.
  • Trigger modesegment (scheduled scan of all matching leads), event (enroll the instant a lifecycle event fires), or manual (admin hand-picks leads via a Run modal).
  • Enrollment — one lead's journey through one workflow. Tracks the currentNodeId it sits on and a context scratchpad.
  • Node run — one audit row per time the engine executes a node for an enrollment; powers the timeline UI.
  • Aarya call — an outbound AI phone call (ElevenLabs) whose recorded duration is later bucketed into hot/warm/cold/no_engagement and used by branch nodes.

Main entities

Entity File Table Notes
Workflow src/entities/workflow/Workflow.entity.ts mas_crm.workflows graph JSONB, triggerConfig JSONB, cached triggerNodeId, enabled, enrollmentLimit, owningSalesHeadId, createdBy, lastScanAt
WorkflowEnrollment src/entities/workflow/WorkflowEnrollment.entity.ts mas_crm.workflow_enrollments workflowId, leadId, status, currentNodeId, context JSONB, waitingForCallLogId, resumeAt, completedAt. Partial unique index (workflowId, leadId) WHERE status IN active/waiting/paused
WorkflowNodeRun src/entities/workflow/WorkflowNodeRun.entity.ts mas_crm.workflow_node_runs enrollmentId, workflowId, nodeId, nodeType, status, result JSONB, error
WorkflowRule src/entities/WorkflowRule.entity.ts workflow_rules Recruitment rules: condition, conditionValue, action, actionTemplate, enabled, triggerCount, jobPostId, belongsTo

Type shapes (src/entities/workflow/workflowTypes.ts)

  • WorkflowGraph = { nodes: WorkflowNode[]; edges: WorkflowEdge[] } — stored verbatim as JSONB, walked in memory, never queried via SQL.
  • WorkflowTriggerConfig — a whitelisted subset of RawLeadService.ListInput (createdFrom/To, assignedTo, assignedCounsellor, owningSalesHeadId, sourceType, vendorApiKeyId, interestLevel, aaryaInterest, webinarStatus, sessionYear, tagIds, leadStatus) plus triggerType and events[].
  • LeadEventTypelead.created, lead.assigned, lead.counsellor_assigned, lead.owner_changed, lead.interest_changed, lead.webinar_changed, lead.converted, lead.closed.

Architecture

flowchart TD
  subgraph FE["Admin UI (React Flow builder)"]
    UI["Sales Workflows canvas"]
  end

  subgraph API["Express API (mounted at /api)"]
    SWR["SalesWorkflowRoutes (admin-only)"]
    WR["WorkflowRoutes (Mr. Hire rules)"]
  end

  subgraph CTRL["Controllers"]
    SWC["SalesWorkflowController"]
    WC["WorkflowController"]
  end

  subgraph SVC["Services"]
    WS["WorkflowService (CRUD + graph validation)"]
    ENG["WorkflowEngineService (execution)"]
    LES["LeadEventService (event chokepoint)"]
    QS["QueueService (workflowQueue)"]
  end

  subgraph WRK["BullMQ worker"]
    WW["workflow.worker (scan / step / event)"]
  end

  subgraph EXT["Reused send + AI services"]
    LEM["LeadEmailService"]
    WA["WhatsAppService + LeadWhatsAppService"]
    EL["ElevenLabsBatchService (Aarya)"]
    ACS["AaryaCallSyncService (call buckets)"]
    RLS["RawLeadService (segment query)"]
  end

  subgraph DB["PostgreSQL (mas_crm schema)"]
    WFT["workflows"]
    ENR["workflow_enrollments"]
    NRUN["workflow_node_runs"]
    LEADS["raw_leads + lead_call_logs"]
    RULES["workflow_rules + email_templates"]
  end

  UI --> SWR --> SWC --> WS
  WC --> RULES
  WR --> WC
  WS --> WFT
  WS --> ENG
  WS --> QS
  ENG --> RLS --> LEADS
  ENG --> ENR
  ENG --> NRUN
  ENG --> LEM
  ENG --> WA
  ENG --> EL
  ENG --> ACS
  LES --> QS
  QS --> WW
  WW --> ENG

Data model

erDiagram
  WORKFLOW ||--o{ WORKFLOW_ENROLLMENT : "enrolls"
  WORKFLOW ||--o{ WORKFLOW_NODE_RUN : "audits"
  WORKFLOW_ENROLLMENT ||--o{ WORKFLOW_NODE_RUN : "produces"
  RAW_LEAD ||--o{ WORKFLOW_ENROLLMENT : "is enrolled as"
  LEAD_CALL_LOG ||--o{ WORKFLOW_ENROLLMENT : "wakes via waitingForCallLogId"

  WORKFLOW {
    uuid id PK
    string name
    text description
    boolean enabled
    jsonb graph
    jsonb trigger_config
    string trigger_node_id
    int enrollment_limit
    uuid owning_sales_head_id
    uuid created_by
    timestamp last_scan_at
  }
  WORKFLOW_ENROLLMENT {
    uuid id PK
    uuid workflow_id FK
    uuid lead_id FK
    enum status
    string current_node_id
    jsonb context
    uuid waiting_for_call_log_id
    timestamp resume_at
    timestamp completed_at
    timestamp enrolled_at
  }
  WORKFLOW_NODE_RUN {
    uuid id PK
    uuid enrollment_id FK
    uuid workflow_id FK
    string node_id
    string node_type
    enum status
    jsonb result
    text error
    timestamp created_at
  }
  WORKFLOW_RULE {
    uuid id PK
    string name
    string condition
    int conditionValue
    string action
    string actionTemplate
    boolean enabled
    int triggerCount
    uuid jobPostId FK
    uuid belongsTo
  }

Notable enums / status fields

  • WorkflowEnrollmentStatus: active, waiting, paused, completed, failed, cancelled.
  • WorkflowNodeRunStatus: success, failed, skipped, scheduled.
  • WorkflowNodeType: trigger, aaryaCall, branch, sendEmail, sendWhatsApp, wait.
  • WorkflowTriggerConfig.triggerType: segment (default when absent), event, manual.
  • WorkflowRule relates only loosely to JobPost (onDelete: SET NULL); WorkflowEnrollment.leadId is a loose link to mas_crm.raw_leads (no cascade).

API surface

All routes are mounted under /api (src/routes/index.ts). Two route classes:

Sales Automation Engine — SalesWorkflowRoutes (admin only)

Guard on every route: authMiddleware + adminMiddleware (ADMIN role only).

Method Path Auth/role Purpose
GET /api/admin/workflows/metadata ADMIN Builder dropdown data — email/WhatsApp templates, Aarya agents + phone numbers, vendors, assignees, sales heads, segment field options, lead event types, branch outputs
POST /api/admin/workflows/scan-now ADMIN Enqueue an immediate segment scan pass
GET /api/admin/workflows ADMIN List workflows + active/total enrollment counts
POST /api/admin/workflows ADMIN Create a workflow (validates graph; starts disabled)
GET /api/admin/workflows/:id ADMIN Get one workflow (full graph + config)
PUT /api/admin/workflows/:id ADMIN Update a workflow (re-validates graph)
PATCH /api/admin/workflows/:id/enabled ADMIN Enable / disable; enabling re-wakes paused enrollments + kicks a scan
DELETE /api/admin/workflows/:id ADMIN Cancel in-flight enrollments, then delete workflow + runs
POST /api/admin/workflows/:id/run-manual ADMIN Enroll a hand-picked leadIds[] (manual-trigger workflows only)
GET /api/admin/workflows/:id/enrollments ADMIN List enrollments (page, limit, scope=active\|all) with lead identity + current-step label
GET /api/admin/workflows/:id/enrollments/:eid/runs ADMIN Node-run timeline for one enrollment
POST /api/admin/workflows/:id/enrollments/:eid/retry ADMIN Rewind to the earliest unresolved failed node and re-activate
POST /api/admin/workflows/:id/enrollments/:eid/retry-from ADMIN Re-run from a specific nodeId in the body

Mr. Hire Workflow Rules + Templates — WorkflowRoutes (auth only)

Guard: authMiddleware. Rows are owned per-user (belongsTo) with shared system defaults (belongsTo IS NULL).

Method Path Auth/role Purpose
GET /api/mr-hire/workflow/rules authed List own + system-default rules
POST /api/mr-hire/workflow/rules authed Create a rule (name, condition, action required)
PUT /api/mr-hire/workflow/rules/:id authed Update own or system-default rule
DELETE /api/mr-hire/workflow/rules/:id authed Delete own rule only (403 on system defaults)
PATCH /api/mr-hire/workflow/rules/:id/toggle authed Flip enabled
GET /api/mr-hire/workflow/email-templates authed List own + system-default templates
POST /api/mr-hire/workflow/email-templates authed Create a template
PUT /api/mr-hire/workflow/email-templates/:id authed Update a template
POST /api/mr-hire/workflow/email-templates/:id/send-test authed Send a test email with placeholder substitution to to
DELETE /api/mr-hire/workflow/email-templates/:id authed Delete own template only

User journeys

1. Define a workflow (author nodes / edges / trigger)

An admin lays out the graph in the React-Flow builder and saves. WorkflowService.validateGraph enforces structural sanity before anything is persisted, and the single trigger node's id is cached on the row so the engine can enter without re-scanning.

sequenceDiagram
  participant Admin
  participant API as SalesWorkflowController
  participant WS as WorkflowService
  participant DB as workflows table

  Admin->>API: GET /api/admin/workflows/metadata
  API->>WS: getMetadata
  WS-->>API: templates, aarya agents, segment fields, event types
  API-->>Admin: dropdown data for the builder
  Admin->>API: POST /api/admin/workflows with name, graph, triggerConfig
  API->>WS: create with createdBy from req.user
  WS->>WS: validateGraph checks one trigger, unique ids, valid edges
  Note over WS: rejects cycles, branch handles on non-branch nodes, and unconfigured send steps
  alt graph invalid
    WS-->>API: throw 400 with reason
    API-->>Admin: 400 error message
  else graph valid
    WS->>DB: insert row enabled false with cached triggerNodeId
    DB-->>WS: saved workflow
    WS-->>API: workflow
    API-->>Admin: 201 created
  end

Validation rules in validateGraph: exactly one trigger node; no duplicate node ids; every edge references existing nodes; sourceHandle labels only on branch nodes; sendEmail must have a templateKey and sendWhatsApp a templateName; and the graph must be acyclic (DFS three-colour cycle check — a back-edge would loop forever).

2. Enable a workflow (and re-wake paused leads)

sequenceDiagram
  participant Admin
  participant API as SalesWorkflowController
  participant WS as WorkflowService
  participant QS as QueueService
  participant DB as enrollments

  Admin->>API: PATCH /api/admin/workflows/:id/enabled with enabled true
  API->>WS: setEnabled id true
  WS->>DB: save workflow enabled true
  WS->>DB: find enrollments status paused
  loop each paused enrollment
    WS->>DB: set status active
    WS->>QS: addWorkflowStepJob enrollmentId
  end
  WS->>QS: addWorkflowScanJob
  WS-->>API: saved
  API-->>Admin: 200 enabled

Disabling does the reverse lazily: the engine parks any in-flight enrollment as paused at its next step (it does not actively stop running jobs).

3. Segment scan enrolls newly-matching leads

The repeatable workflowScan job (every 5 minutes) is the default enrollment path for segment-mode workflows. The scan reuses RawLeadService.list so the segment filter is the exact same query the CRM lead table uses.

sequenceDiagram
  participant Cron as workflowQueue repeat
  participant WW as workflow.worker
  participant ENG as WorkflowEngineService
  participant RLS as RawLeadService
  participant DB as enrollments
  participant QS as QueueService

  Cron->>WW: workflowScan job
  WW->>ENG: runTriggerScan
  ENG->>ENG: sweepStaleAaryaWaits then flushAaryaBuffers
  ENG->>DB: find workflows enabled true
  loop each enabled workflow
    alt triggerType event or manual
      ENG->>ENG: skip in scan
    else segment mode
      ENG->>RLS: list with segment filter limit min 200 or enrollmentLimit
      RLS-->>ENG: matching lead rows
      ENG->>DB: find existing enrollments for these leadIds
      Note over ENG: leads already or previously enrolled are skipped
      loop each fresh lead
        ENG->>DB: insert active enrollment with orIgnore
        ENG->>QS: addWorkflowStepJob enrollmentId
      end
      ENG->>DB: update workflow lastScanAt
    end
  end
  WW-->>Cron: scan complete with enrolled count

The insert uses ON CONFLICT DO NOTHING (orIgnore) against the partial unique index, so concurrent scans or events can never double-enroll a lead. Only when a new row is actually created does the engine queue the first step.

4. The engine advances one node per job (the execution loop)

This is the core loop: each workflowStep job runs exactly one node for one enrollment and (usually) queues the next step. A depth-first pending-node queue lives in enrollment.context.queue so non-branch nodes can fan out to multiple successors and still run them sequentially within the same enrollment.

sequenceDiagram
  participant QS as workflowQueue
  participant WW as workflow.worker
  participant ENG as WorkflowEngineService
  participant DB as enrollments + node_runs
  participant SND as send + AI services

  QS->>WW: workflowStep enrollmentId
  WW->>ENG: processEnrollmentStep enrollmentId
  ENG->>DB: load enrollment
  alt status completed failed or cancelled
    ENG-->>WW: no-op
  else workflow disabled mid-run
    ENG->>DB: set status paused
  else nodeRunCount over 100
    ENG->>DB: finish failed max_node_runs
  else lead converted or closed
    ENG->>DB: finish completed
  else runnable
    ENG->>ENG: find current node in graph
    alt node missing
      ENG->>DB: finish completed end_of_graph
    else node found
      ENG->>SND: run node by type
      ENG->>DB: write node_run row
      ENG->>ENG: planNext folds successors into context queue
      alt has next node
        ENG->>DB: set currentNodeId and status
        ENG->>QS: addWorkflowStepJob next
      else nothing left
        ENG->>DB: finish completed end_of_graph
      end
    end
  end

Per-node behaviour in processEnrollmentStep:

  • trigger — log success, advance.
  • sendEmailLeadEmailService.send with the node's templateKey (or custom subject/body). A missing template AND missing body hard-fails the enrollment so it is visible and retryable. A 400 send error (no email on file) also hard-fails; transient 5xx/SMTP errors follow the node's onError (skip by default).
  • sendWhatsApp — resolves {{name}}, {{phone}}, {{email}}, {{sessionYear}}, and extraData tokens in each template param, sends via WhatsAppService.sendMessage (sender = workflow createdBy), and records a LeadWhatsAppLog row. No phone on file logs a failure and (by default) skips on.
  • wait — schedules a delayed workflowStep job (duration × unit of minutes/hours/days, floored at 1s), parks the enrollment waiting with resumeAt.
  • aaryaCall — see journey 5.
  • branch — see journey 6.
  • unknown type — logged as skipped and advanced, so a bad graph never strands a lead.

On any thrown step error, the node is logged failed; the enrollment then either hard-fails (if node.data.onError === 'fail') or skips past and advances. A runaway guard caps any single enrollment at MAX_NODE_RUNS_PER_ENROLLMENT = 100 nodes.

5. Aarya AI call node (immediate + batched, watchdog timeout)

An aaryaCall node dispatches an outbound AI phone call via ElevenLabsBatchService.submitBatch, then parks the enrollment waiting on the dispatch call-log. AaryaCallSyncService wakes it when the call's duration is written back; if the call never returns, a watchdog deadline nudges it forward as no_engagement.

sequenceDiagram
  participant ENG as WorkflowEngineService
  participant EL as ElevenLabsBatchService
  participant DB as enrollments + call_logs
  participant ACS as AaryaCallSyncService
  participant QS as workflowQueue

  Note over ENG: fresh arrival at aaryaCall node
  alt batchSize is 1 immediate mode
    ENG->>EL: submitBatch one lead
    EL->>DB: write dispatch LeadCallLog
    ENG->>DB: park waiting with waitingForCallLogId and resumeAt timeout
  else batchSize over 1 buffered mode
    ENG->>DB: park waiting buffered with maxWait deadline
    ENG->>ENG: maybeFlushAaryaBuffer when pool full
    ENG->>EL: submitBatch claimed leads via FOR UPDATE SKIP LOCKED
    ENG->>DB: park each enrollment waiting on its dispatch log
  end
  Note over ACS: later the call completes and duration is recorded
  ACS->>DB: find enrollments waitingForCallLogId in completed logs
  ACS->>QS: addWorkflowWakeJob enrollmentId
  QS->>ENG: workflowStep
  ENG->>DB: read call log duration
  alt duration present
    ENG->>DB: clear waitingForCallLogId and advance
  else still no duration and past watchdog
    Note over ENG: scan sweepStaleAaryaWaits nudges it forward
    ENG->>DB: advance treated as no_engagement downstream
  end

Constants: DEFAULT_AARYA_TIMEOUT_MIN = 1440 (24h watchdog), DEFAULT_AARYA_BATCH_MAX_WAIT_MIN = 120 (2h max wait before a partial batch flushes). Batch claiming uses SELECT ... FOR UPDATE SKIP LOCKED with an aaryaDispatching context marker so concurrent workers never grab the same buffered leads; a failed dispatch un-claims them for retry.

6. Branch node routes on the latest AI-call outcome

sequenceDiagram
  participant ENG as WorkflowEngineService
  participant DB as lead_call_logs
  participant ACS as AaryaCallSyncService

  ENG->>DB: latest aarya call duration for lead
  alt no call found
    ENG->>ENG: bucket no_engagement
  else call found
    ENG->>ACS: bucketFromDuration seconds
    ACS-->>ENG: hot 30s plus warm 10 to 30s cold under 10s
  end
  ENG->>ENG: pick edge by sourceHandle matching bucket
  Note over ENG: no_engagement falls back to cold then first available edge
  ENG->>DB: set currentNodeId to matched target and queue step

If no outgoing edge matches the bucket and there is no fallback, the enrollment finishes completed with reason branch_<bucket>_no_edge.

7. Event-driven enrollment (real time)

CRM mutations publish lifecycle events through LeadEventService (a fire-and-forget chokepoint that never throws into the caller). Event-mode workflows enroll the instant a subscribed event fires, with the segment filter acting as an optional guard.

sequenceDiagram
  participant CRM as RawLeadService mutation
  participant LES as LeadEventService
  participant QS as workflowQueue
  participant WW as workflow.worker
  participant ENG as WorkflowEngineService
  participant RLS as RawLeadService

  CRM->>LES: emit lead.assigned leadIds
  LES->>QS: addWorkflowEventJob best effort
  QS->>WW: workflowEvent eventType leadIds
  WW->>ENG: handleLeadEvent eventType leadIds
  ENG->>ENG: find enabled event workflows subscribing to this event
  loop each workflow and lead
    ENG->>RLS: list singleLeadId with segment guard
    alt lead still matches guard
      ENG->>ENG: enrollLead with orIgnore
    else guard not satisfied
      ENG->>ENG: skip this lead
    end
  end
  WW-->>QS: enrolled count

8. Manual run (admin hand-picks leads)

sequenceDiagram
  participant Admin
  participant API as SalesWorkflowController
  participant ENG as WorkflowEngineService
  participant DB as raw_leads + enrollments

  Admin->>API: POST /api/admin/workflows/:id/run-manual leadIds
  API->>ENG: runManual id leadIds
  alt workflow not manual type
    ENG-->>API: 400 not a manual-run workflow
    API-->>Admin: error
  else manual workflow
    ENG->>DB: resolveEnrollableLeadIds existing and in head pool
    loop each valid lead
      ENG->>DB: enrollLead with orIgnore
    end
    ENG-->>API: enrolled skipped requested counts
    API-->>Admin: 200 Enrolled N skipped M
  end

Forged or out-of-pool lead ids are dropped server-side: resolveEnrollableLeadIds keeps only ids that exist and (when the workflow has an owningSalesHeadId) belong to that head's pool.

9. Monitor + retry a stuck lead

sequenceDiagram
  participant Admin
  participant API as SalesWorkflowController
  participant WS as WorkflowService
  participant DB as node_runs + enrollments
  participant QS as workflowQueue

  Admin->>API: GET /api/admin/workflows/:id/enrollments scope active
  API->>WS: listEnrollments
  WS-->>API: rows with lead identity, current step, lastBranch
  Admin->>API: GET enrollments/:eid/runs
  API->>WS: listRuns
  WS-->>API: node run timeline ordered by createdAt
  Admin->>API: POST enrollments/:eid/retry
  API->>WS: retryEnrollment
  WS->>DB: find earliest failed node with no later success
  WS->>DB: rewind set status active currentNodeId completedAt null
  WS->>QS: addWorkflowStepJob
  API-->>Admin: 200 retried at nodeId

retry rewinds to the earliest still-unresolved failure so a multi-step failure re-walks in order; retry-from rewinds to an explicit nodeId chosen in the UI.

10. Mr. Hire workflow rule + test email (the simpler subsystem)

sequenceDiagram
  participant User
  participant API as WorkflowController
  participant DB as workflow_rules + email_templates
  participant ES as EmailService

  User->>API: POST /api/mr-hire/workflow/rules name condition action
  API->>DB: insert rule belongsTo user
  API-->>User: 201 rule
  User->>API: POST email-templates/:id/send-test to
  API->>DB: load template own or system default
  API->>API: substitute placeholder vars with sample data
  API->>ES: sendEmail to subject html
  ES-->>API: sent
  API-->>User: 200 test email sent

Background jobs & async

Queue: workflowQueue (BullMQ), created in QueueService and drained by src/workers/workflow.worker.ts at concurrency 5.

Job name Producer Schedule Action
workflowScan QueueService.scheduleWorkflowScan (boot) + addWorkflowScanJob (admin / enable) repeatable every 5 minutes, stable jobId: workflow-scan runTriggerScan — sweep stale Aarya waits, flush Aarya batch buffers, enroll new segment matches
workflowStep addWorkflowStepJob / addWorkflowWakeJob on demand (with delay for Wait nodes) processEnrollmentStep — run one node, queue the next
workflowEvent addWorkflowEventJob via LeadEventService on demand (fire-and-forget) handleLeadEvent — enroll leads into matching event-mode workflows

The repeatable scan is registered at startup (src/index.tsqueueService.scheduleWorkflowScan()), and re-registration replaces (not stacks) thanks to the stable job id. removeOnComplete / removeOnFail caps keep the queue tidy. There are no Socket.IO events or inbound webhooks in this domain; the only "callback" is AaryaCallSyncService waking parked enrollments when an ElevenLabs call's duration syncs back.


External integrations

The engine itself integrates nothing directly — it composes existing services. Failures are isolated so one bad lead never strands the rest.

Integration Via Failure / fallback
Email LeadEmailService.send (Gmail SMTP under the hood) Missing template/body or 400 (no email on file) → hard-fail the enrollment (visible + retryable). Transient errors → skip per onError.
WhatsApp WhatsAppService.sendMessage + LeadWhatsAppService.recordSend (Meta Cloud API) No phone → log fail + skip. Send failure recorded as LeadWhatsAppStatus.FAILED. Builder enforces param count to avoid Meta error #132000. Template list in metadata is best-effort (returns [] if WhatsApp unconfigured).
Aarya AI calling ElevenLabsBatchService.submitBatch / listAgents / listPhoneNumbers Best-effort in metadata. 24h call watchdog; partial batches flush after 2h. Dispatch failure un-claims buffered leads for retry.
CRM segment query RawLeadService.list Reused verbatim so segment + event guards always match the CRM list view.
Call buckets AaryaCallSyncService.bucketFromDuration Thresholds: hot ≥ 30s, warm 10–30s, cold < 10s, no call → no_engagement.

There are no new env vars introduced by the engine; it inherits Redis (REDIS_HOST / REDIS_PORT) for BullMQ and whatever email / WhatsApp / ElevenLabs config the reused services need. No feature flag gates the engine — it is active whenever a workflow is enabled.


Status lifecycles

Enrollment lifecycle

stateDiagram-v2
  [*] --> active : enrolled
  active --> waiting : wait node or aarya call parks it
  waiting --> active : timer fires or call completes
  active --> active : advance to next node
  active --> paused : workflow disabled mid-run
  paused --> active : workflow re-enabled
  active --> completed : end of graph or lead converted or closed
  waiting --> completed : end after wait
  active --> failed : missing template or max node runs or onError fail
  active --> cancelled : workflow deleted
  waiting --> cancelled : workflow deleted
  paused --> cancelled : workflow deleted
  completed --> active : admin retry rewinds
  failed --> active : admin retry rewinds
  completed --> [*]
  failed --> [*]
  cancelled --> [*]

Node run status

stateDiagram-v2
  [*] --> success : node executed cleanly
  [*] --> scheduled : wait delayed or aarya call dispatched or buffered
  [*] --> skipped : unknown node type or non-fatal failure skipped past
  [*] --> failed : send error or config gap
  scheduled --> [*]
  success --> [*]
  skipped --> [*]
  failed --> [*]

Workflow enable state

stateDiagram-v2
  [*] --> disabled : created
  disabled --> enabled : enable rewakes paused and scans
  enabled --> disabled : disable parks in-flight as paused
  enabled --> [*] : deleted cancels in-flight
  disabled --> [*] : deleted

Edge cases, limits & gotchas

  • Two unrelated "workflow" systems. /api/admin/workflows/* (Sales engine, ADMIN-only) and /api/mr-hire/workflow/* (recruitment rules, any authed user) share a name but nothing else. The recruitment WorkflowRule entity stores config only — no engine in this repo executes those rules.
  • ADMIN-only, not sales-head. adminMiddleware allows only role === ADMIN; sales reps and sales heads cannot reach the Sales workflow API even though workflows can be pool-scoped via owningSalesHeadId.
  • No double enrollment. The partial unique index (workflowId, leadId) WHERE status IN (active, waiting, paused) plus orIgnore (ON CONFLICT DO NOTHING) makes scan + event + manual paths idempotent against races. A previously-completed lead is not re-enrolled by a later scan (re-running is intentionally off by default).
  • Graph must be acyclic and singly-triggered. Save fails (400) on cycles, multiple/zero trigger nodes, dangling edges, branch handles on non-branch nodes, or unconfigured send steps.
  • Runaway + lifecycle guards. Any enrollment over 100 node runs hard-fails (max_node_runs). Converted/closed leads complete immediately without acting; deleted leads or workflows fail/cancel the enrollment.
  • Fan-out is depth-first and sequential. A non-branch node with multiple outgoing edges pushes all targets to the front of context.queue; they run one job at a time within the same enrollment, not in parallel.
  • Disable is lazy. Disabling does not stop queued jobs; the next step of each in-flight enrollment parks it paused. Re-enabling re-wakes paused rows and kicks a fresh scan.
  • WhatsApp sender identity. Sends use the workflow's createdBy as the sender user id; a workflow authored by a now-deleted user may send with an empty sender.
  • Aarya watchdog vs. real outcome. A call that never reports a duration is swept forward after the timeout and treated as no_engagement — so a branch may route a lead as cold even though the call simply failed to sync.
  • Multi-tenant. This domain is Sales-CRM-scoped (mas_crm schema) and not driven by the x-platform header; isolation is via owningSalesHeadId pools, not platform.
  • Metadata is best-effort. WhatsApp templates, Aarya agents, and phone numbers all degrade to [] if their upstream is unconfigured, so the builder always loads.