Workflow Dispatcher Architecture
Deep dive into IWorkflowDispatcher: the core dispatching abstraction for queuing and executing workflows, covering request types, event ordering, and custom dispatcher implementations.
Overview
The IWorkflowDispatcher is Elsa's core abstraction for enqueuing and dispatching workflows for execution. It provides a flexible mechanism to start, resume, and trigger workflows, with support for both in-process and distributed execution scenarios.
Understanding the dispatcher is crucial for:
Custom execution strategies: Implementing background processing, queueing, or distributed workflows
Event-driven architectures: Understanding how triggers and bookmarks flow through the system
Multi-process deployments: Coordinating workflow execution across multiple nodes
Debugging and troubleshooting: Knowing the sequence of events during workflow execution
IWorkflowDispatcher vs IWorkflowRunner vs IWorkflowRuntime
Before diving into the dispatcher, it's important to understand how it relates to other workflow execution services:
IWorkflowRunner
Direct, in-process execution
Synchronous, immediate
Testing, simple workflows, in-process scenarios
IWorkflowRuntime
Runtime abstraction with persistence
Async, with persistence and client API
Most application scenarios, managed execution
IWorkflowDispatcher
Dispatching and queuing abstraction
Async, queue-based
Background processing, distributed systems, custom execution strategies
When to Use Each
IWorkflowRunner: Use when you need immediate, synchronous execution in the same process. Ideal for unit tests or simple, non-persistent workflows.
IWorkflowRuntime: Use for most application scenarios where you need workflow persistence, state management, and the ability to resume workflows. Provides a high-level client API for workflow operations.
IWorkflowDispatcher: Use when you need custom control over how workflows are queued and executed, or when building distributed/multi-process architectures. Also used internally by triggers and the runtime.
IWorkflowDispatcher Interface
The IWorkflowDispatcher defines four primary dispatch methods, each handling a different workflow execution scenario:
public interface IWorkflowDispatcher
{
Task<DispatchWorkflowDefinitionResponse> DispatchAsync(
DispatchWorkflowDefinitionRequest request,
CancellationToken cancellationToken = default);
Task<DispatchWorkflowInstanceResponse> DispatchAsync(
DispatchWorkflowInstanceRequest request,
CancellationToken cancellationToken = default);
Task<DispatchTriggerWorkflowsResponse> DispatchAsync(
DispatchTriggerWorkflowsRequest request,
CancellationToken cancellationToken = default);
Task<DispatchResumeWorkflowsResponse> DispatchAsync(
DispatchResumeWorkflowsRequest request,
CancellationToken cancellationToken = default);
}Dispatch Request Types
1. DispatchWorkflowDefinitionRequest
Purpose: Start a new workflow instance from a workflow definition.
Use Cases:
Starting a workflow via REST API
Programmatically creating and starting workflows
Batch processing where each item starts a new workflow instance
Request Properties:
DefinitionId: The workflow definition IDVersionOptions: Options for selecting the workflow version (latest, specific version, etc.)CorrelationId: Optional correlation ID for tracking related workflowsInput: Dictionary of input parametersInstanceId: Optional predefined instance IDTriggerActivityId: Optional ID of a specific trigger activity to start fromProperties: Additional metadata for the workflow instance
Event Flow:
1. Client calls DispatchAsync(DispatchWorkflowDefinitionRequest)
↓
2. Dispatcher validates the definition exists and is published
↓
3. Dispatcher creates a new workflow instance with the provided inputs
↓
4. Dispatcher enqueues the workflow for execution
↓
5. Background worker/executor picks up the request
↓
6. Workflow execution begins
↓
7. Activities execute in sequence/parallel based on workflow definition
↓
8. Workflow state is persisted (if persistence is enabled)
↓
9. Workflow completes, suspends (on bookmark), or faults
↓
10. Response returned with workflow stateExample:
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
using Elsa.Workflows.Models;
using Microsoft.Extensions.DependencyInjection;
var dispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();
var request = new DispatchWorkflowDefinitionRequest
{
DefinitionId = "order-processing-workflow",
VersionOptions = VersionOptions.Latest,
CorrelationId = $"order-{orderId}",
Input = new Dictionary<string, object>
{
["OrderId"] = orderId,
["CustomerId"] = customerId,
["Amount"] = orderAmount
}
};
var response = await dispatcher.DispatchAsync(request);
Console.WriteLine($"Workflow instance created: {response.WorkflowInstanceId}");2. DispatchWorkflowInstanceRequest
Purpose: Resume or continue execution of an existing workflow instance.
Use Cases:
Resuming a suspended workflow that was persisted
Re-executing a workflow that faulted
Dispatching a loaded workflow instance for execution
Request Properties:
InstanceId: The ID of the workflow instance to dispatchInput: Optional input to provide to the workflow on resumeBookmarkId: Optional bookmark ID if resuming from a specific bookmarkActivityId: Optional activity ID to resume fromActivityNodeId: Optional activity node ID in the workflow graph
Event Flow:
1. Client calls DispatchAsync(DispatchWorkflowInstanceRequest)
↓
2. Dispatcher loads the workflow instance from persistence
↓
3. Dispatcher validates the instance exists and is in a resumable state
↓
4. Dispatcher enqueues the instance for execution/resumption
↓
5. Background worker picks up the request
↓
6. Workflow execution resumes from the point of suspension or specified activity
↓
7. Activities execute, state is persisted
↓
8. Workflow completes, suspends, or faults
↓
9. Response returned with updated workflow stateExample:
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
var dispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();
var request = new DispatchWorkflowInstanceRequest
{
InstanceId = workflowInstanceId,
Input = new Dictionary<string, object>
{
["ApprovalDecision"] = "Approved",
["ApprovedBy"] = userId
}
};
var response = await dispatcher.DispatchAsync(request);
Console.WriteLine($"Workflow resumed: {response.WorkflowInstanceId}");3. DispatchTriggerWorkflowsRequest
Purpose: Trigger workflows based on an external stimulus (event, HTTP request, message, etc.).
Use Cases:
HTTP endpoints triggering workflows
Message broker events (RabbitMQ, Azure Service Bus)
Timer/scheduled triggers
Custom event sources
Request Properties:
ActivityTypeName: The type of trigger activityBookmarkPayload: Payload data for bookmark matchingCorrelationId: Optional correlation IDWorkflowInstanceId: Optional specific instance to triggerInput: Input data for triggered workflows
Event Flow:
1. External event occurs (HTTP request, message, timer fires)
↓
2. Trigger handler calls DispatchAsync(DispatchTriggerWorkflowsRequest)
↓
3. Dispatcher queries for workflow definitions with matching triggers
↓
4. Dispatcher filters by trigger type and payload hash
↓
5. For each matching workflow definition:
a. Create new workflow instance
b. Enqueue for execution
↓
6. Background workers pick up instances
↓
7. Workflows execute from the trigger activity
↓
8. State persisted
↓
9. Response includes list of triggered workflow instancesExample:
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
var dispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();
// Example: Triggering workflows with HTTP endpoint trigger
var request = new DispatchTriggerWorkflowsRequest
{
ActivityTypeName = "Elsa.HttpEndpoint",
BookmarkPayload = new
{
Path = "/api/webhooks/order-created",
Method = "POST"
},
Input = new Dictionary<string, object>
{
["RequestBody"] = requestBody,
["Headers"] = headers
}
};
var response = await dispatcher.DispatchAsync(request);
Console.WriteLine($"Triggered {response.WorkflowInstanceIds.Count} workflow(s)");4. DispatchResumeWorkflowsRequest
Purpose: Resume workflows that are suspended at a bookmark (waiting for an event).
Use Cases:
Resuming workflows waiting for user approval
Continuing workflows after receiving a callback
Processing events for suspended workflows
Timer-based resumption of delayed workflows
Request Properties:
ActivityTypeName: Type of activity that created the bookmarkBookmarkPayload: Payload for matching the bookmarkCorrelationId: Optional correlation IDWorkflowInstanceId: Optional specific instance to resumeInput: Input data to provide on resume
Event Flow:
1. External event occurs (approval received, callback, timer)
↓
2. Event handler calls DispatchAsync(DispatchResumeWorkflowsRequest)
↓
3. Dispatcher queries for bookmarks matching:
- Activity type
- Payload hash
- Optional correlation ID or instance ID
↓
4. For each matching bookmark:
a. Load the suspended workflow instance
b. Validate instance is suspended and bookmark exists
c. Enqueue for resumption
↓
5. Background workers pick up instances
↓
6. Workflows resume from the bookmarked activity
↓
7. Bookmark is "burned" (deleted) if AutoBurn is true
↓
8. Workflow continues execution, state persisted
↓
9. Response includes list of resumed workflow instancesExample:
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
var dispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();
// Example: Resuming workflows waiting for approval
var request = new DispatchResumeWorkflowsRequest
{
ActivityTypeName = "MyApp.WaitForApproval",
BookmarkPayload = new
{
ApprovalId = approvalId
},
Input = new Dictionary<string, object>
{
["Decision"] = "Approved",
["ApprovedBy"] = userId,
["ApprovedAt"] = DateTime.UtcNow
}
};
var response = await dispatcher.DispatchAsync(request);
Console.WriteLine($"Resumed {response.WorkflowInstanceIds.Count} workflow(s)");Event Ordering and Execution Flow
Understanding the order of events during workflow dispatch is crucial for debugging and implementing custom dispatchers.
Starting a New Workflow (DispatchWorkflowDefinitionRequest)
Detailed Sequence:
Validate Definition: Check that the workflow definition exists and is published
Create Instance: Instantiate a new
WorkflowInstancewith unique IDSet Input: Apply input parameters to the workflow execution context
Set Correlation: Apply correlation ID if provided
Enqueue: Add the dispatch request to the execution queue
Dequeue (by worker): Background worker picks up the request
Load Workflow: Materialize the workflow definition into an executable graph
Initialize Context: Create workflow execution context with variables and state
Execute: Begin execution from the root activity or specified trigger
Persist State: Save workflow state after each activity or at suspension points
Complete/Suspend/Fault: Workflow reaches a terminal state
Return Response: Response includes instance ID and final/current state
Resuming an Existing Workflow (DispatchWorkflowInstanceRequest)
Detailed Sequence:
Validate Instance: Check that the instance exists and is resumable
Load State: Retrieve persisted workflow state from storage
Apply Input: Merge any new input with existing workflow state
Enqueue: Add the resume request to the execution queue
Dequeue (by worker): Background worker picks up the request
Reconstruct Context: Rebuild the workflow execution context from persisted state
Resume Execution: Continue from the point of suspension or specified activity
Persist State: Save updated state after each activity
Complete/Suspend/Fault: Workflow reaches next state transition
Return Response: Response includes updated workflow state
Triggering Workflows (DispatchTriggerWorkflowsRequest)
Detailed Sequence:
Query Triggers: Find all workflow definitions with matching trigger activities
Filter by Type: Match activity type (e.g., HttpEndpoint, TimerTrigger)
Filter by Payload: Match bookmark payload hash
Create Instances: For each matching definition, create a new instance
Set Correlation: Apply correlation ID from the trigger
Batch Enqueue: Add all triggered instances to the execution queue
Dequeue (by workers): Workers pick up and execute each instance
Execute from Trigger: Each workflow starts from the trigger activity
Persist State: State saved for each instance
Return Response: Response includes all triggered instance IDs
Resuming on Bookmark (DispatchResumeWorkflowsRequest)
Detailed Sequence:
Query Bookmarks: Find all bookmarks matching the criteria:
Activity type name
Payload hash
Optional correlation ID or instance ID
Acquire Locks: For each bookmark, acquire distributed lock on the instance
Validate State: Ensure instance is still suspended and bookmark hasn't been burned
Load Instances: Load persisted state for each matching instance
Batch Enqueue: Add all resume requests to the execution queue
Dequeue (by workers): Workers pick up each resume request
Resume from Bookmark: Execution continues from the bookmarked activity
Burn Bookmark: Delete the bookmark if AutoBurn is enabled
Execute Activities: Continue through the workflow
Persist State: Save updated state
Return Response: Response includes all resumed instance IDs
Custom Dispatcher Implementations
Why Implement a Custom Dispatcher?
The default dispatcher (DefaultWorkflowDispatcher) executes workflows immediately in the same process. Custom dispatchers enable:
Background Processing: Queue workflows to a message broker (RabbitMQ, Azure Service Bus, Kafka)
Distributed Execution: Send workflows to specific worker nodes based on criteria (load balancing, affinity)
Priority Queuing: Execute high-priority workflows first
Rate Limiting: Throttle workflow execution to prevent overload
Custom Routing: Route workflows to specialized workers (e.g., CPU-intensive vs I/O-bound)
Implementing a Custom Dispatcher
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Messages;
using System.Text.Json;
// NOTE: This example uses placeholder types (IMessageQueue and WorkflowDispatchMessage)
// for demonstration purposes. Replace these with your actual message queue infrastructure:
// - For RabbitMQ: Use MassTransit.IBus or RabbitMQ.Client
// - For Azure Service Bus: Use Azure.Messaging.ServiceBus.ServiceBusClient
// - For AWS SQS: Use Amazon.SQS.IAmazonSQS
// - For Kafka: Use Confluent.Kafka.IProducer
public class QueueBasedWorkflowDispatcher : IWorkflowDispatcher
{
private readonly IMessageQueue _messageQueue;
private readonly ILogger<QueueBasedWorkflowDispatcher> _logger;
public QueueBasedWorkflowDispatcher(
IMessageQueue messageQueue,
ILogger<QueueBasedWorkflowDispatcher> logger)
{
_messageQueue = messageQueue;
_logger = logger;
}
public async Task<DispatchWorkflowDefinitionResponse> DispatchAsync(
DispatchWorkflowDefinitionRequest request,
CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Queuing workflow definition {DefinitionId} for execution",
request.DefinitionId);
// Generate instance ID
var instanceId = Guid.NewGuid().ToString();
// Serialize the request and enqueue
var message = new WorkflowDispatchMessage
{
InstanceId = instanceId,
RequestType = "StartDefinition",
Payload = JsonSerializer.Serialize(request)
};
await _messageQueue.EnqueueAsync("workflow-execution-queue", message, cancellationToken);
return new DispatchWorkflowDefinitionResponse
{
WorkflowInstanceId = instanceId,
Status = WorkflowStatus.Pending
};
}
public async Task<DispatchWorkflowInstanceResponse> DispatchAsync(
DispatchWorkflowInstanceRequest request,
CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Queuing workflow instance {InstanceId} for resumption",
request.InstanceId);
var message = new WorkflowDispatchMessage
{
InstanceId = request.InstanceId,
RequestType = "ResumeInstance",
Payload = JsonSerializer.Serialize(request)
};
await _messageQueue.EnqueueAsync("workflow-execution-queue", message, cancellationToken);
return new DispatchWorkflowInstanceResponse
{
WorkflowInstanceId = request.InstanceId
};
}
public async Task<DispatchTriggerWorkflowsResponse> DispatchAsync(
DispatchTriggerWorkflowsRequest request,
CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Queuing trigger request for activity type {ActivityType}",
request.ActivityTypeName);
// Query for matching workflow definitions (implementation depends on your store)
var matchingDefinitions = await FindMatchingTriggersAsync(request, cancellationToken);
var instanceIds = new List<string>();
foreach (var definition in matchingDefinitions)
{
var instanceId = Guid.NewGuid().ToString();
instanceIds.Add(instanceId);
var message = new WorkflowDispatchMessage
{
InstanceId = instanceId,
RequestType = "Trigger",
Payload = JsonSerializer.Serialize(new
{
DefinitionId = definition.DefinitionId,
TriggerRequest = request
})
};
await _messageQueue.EnqueueAsync("workflow-execution-queue", message, cancellationToken);
}
return new DispatchTriggerWorkflowsResponse
{
WorkflowInstanceIds = instanceIds
};
}
public async Task<DispatchResumeWorkflowsResponse> DispatchAsync(
DispatchResumeWorkflowsRequest request,
CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Queuing resume request for activity type {ActivityType}",
request.ActivityTypeName);
// Query for matching bookmarks (implementation depends on your store)
var matchingBookmarks = await FindMatchingBookmarksAsync(request, cancellationToken);
var instanceIds = new List<string>();
foreach (var bookmark in matchingBookmarks)
{
instanceIds.Add(bookmark.WorkflowInstanceId);
var message = new WorkflowDispatchMessage
{
InstanceId = bookmark.WorkflowInstanceId,
RequestType = "Resume",
Payload = JsonSerializer.Serialize(new
{
BookmarkId = bookmark.Id,
ResumeRequest = request
})
};
await _messageQueue.EnqueueAsync("workflow-execution-queue", message, cancellationToken);
}
return new DispatchResumeWorkflowsResponse
{
WorkflowInstanceIds = instanceIds
};
}
// NOTE: The following methods are intentionally incomplete example code.
// They demonstrate the pattern for querying workflow definitions and bookmarks
// but should be implemented based on your specific storage configuration.
private async Task<List<WorkflowDefinition>> FindMatchingTriggersAsync(
DispatchTriggerWorkflowsRequest request,
CancellationToken cancellationToken)
{
// Query your workflow definition store for definitions with triggers matching the request.
// Recommended implementation using Elsa's built-in services:
//
// 1. Inject IWorkflowDefinitionStore from Elsa.Workflows.Management namespace
// 2. Use FindManyAsync with a filter:
// - IsPublished = true
// - Filter by definitions containing trigger activities matching request.ActivityTypeName
// 3. For each definition, check if trigger payload hash matches request.BookmarkPayload
// 4. Return list of matching WorkflowDefinition objects
//
// Example:
// var filter = new WorkflowDefinitionFilter { IsPublished = true };
// var definitions = await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken);
// return definitions.Where(def => HasMatchingTrigger(def, request)).ToList();
throw new NotImplementedException("Implement using IWorkflowDefinitionStore from Elsa.Workflows.Management");
}
private async Task<List<Bookmark>> FindMatchingBookmarksAsync(
DispatchResumeWorkflowsRequest request,
CancellationToken cancellationToken)
{
// Query your bookmark store for bookmarks matching the request.
// Recommended implementation using Elsa's built-in services:
//
// 1. Inject IBookmarkStore from Elsa.Workflows.Runtime namespace
// 2. Use FindManyAsync with a BookmarkFilter:
// - ActivityTypeName = request.ActivityTypeName
// - Hash = compute hash from request.BookmarkPayload
// - Optionally: CorrelationId, WorkflowInstanceId
// 3. Return list of matching Bookmark objects
//
// Example:
// var filter = new BookmarkFilter
// {
// ActivityTypeName = request.ActivityTypeName,
// Hash = _hasher.Hash(request.BookmarkPayload),
// CorrelationId = request.CorrelationId,
// WorkflowInstanceId = request.WorkflowInstanceId
// };
// return await _bookmarkStore.FindManyAsync(filter, cancellationToken);
throw new NotImplementedException("Implement using IBookmarkStore from Elsa.Workflows.Runtime");
}
}Registering a Custom Dispatcher
using Microsoft.Extensions.DependencyInjection;
builder.Services.AddElsa(elsa =>
{
// Replace the default dispatcher with your custom implementation
elsa.Services.AddSingleton<IWorkflowDispatcher, QueueBasedWorkflowDispatcher>();
});Multi-Process and Multi-Node Considerations
When running Elsa in a distributed environment (multiple nodes/processes), understanding dispatcher behavior is critical:
Distributed Locking
The dispatcher itself doesn't implement locking
Locking happens at the execution level via
IDistributedLockProviderWhen resuming workflows, ensure distributed locks prevent concurrent execution of the same instance
Bookmark Resolution
Bookmarks are stored in a shared database
Multiple nodes can query bookmarks simultaneously
The first node to acquire the lock on an instance wins
Bookmark hashing must be deterministic across all nodes
Queue-Based Dispatch
For true distributed execution:
Dispatcher enqueues to a message broker
Worker nodes consume from the queue
Workers execute workflows using
IWorkflowRunnerState is persisted to shared storage
Workers release locks after execution
Singleton Scheduler
For timer/scheduled workflows in clusters:
Use Quartz clustering to ensure only one node schedules timers
Or designate a single "scheduler" node
See Clustering Guide for configuration
Troubleshooting Dispatcher Issues
Workflows Not Starting
Symptoms: Dispatch calls succeed but workflows don't execute
Checks:
Verify the dispatcher is properly registered
Check for background worker or queue consumer running
Verify workflow definition is published
Check logs for exceptions during dispatch or execution
Duplicate Executions
Symptoms: Same workflow executes multiple times from a single trigger
Causes:
Multiple nodes dispatching the same trigger without coordination
Missing distributed locks during resume
Bookmark not burned after first use
Solutions:
Implement distributed locking
Set
AutoBurn = trueon bookmarksUse idempotent activities
Bookmarks Not Matching
Symptoms: Resume requests don't find bookmarks
Causes:
Payload structure mismatch between create and resume
Hash computed differently on different nodes
Case sensitivity in payload properties
Solutions:
Use shared payload classes/records
Ensure consistent serialization settings
Log and compare payload hashes
Related Documentation
Running Workflows - High-level guide to workflow execution
Clustering Guide - Multi-node deployment
Distributed Hosting - Distributed architecture patterns
Blocking Activities & Triggers - Bookmark fundamentals
Troubleshooting Guide - Debugging workflows
Summary
The IWorkflowDispatcher is the core dispatching abstraction in Elsa Workflows:
Four dispatch types: Start definition, resume instance, trigger workflows, resume bookmarks
Event-driven: Enables decoupled, asynchronous workflow execution
Customizable: Implement custom dispatchers for background processing, queuing, and distributed scenarios
Orchestrates execution: Manages the flow from dispatch to enqueue to execution
Foundation for triggers: All triggers use the dispatcher to start/resume workflows
Understanding the dispatcher's role and event ordering helps you:
Design robust distributed workflow systems
Troubleshoot execution issues
Implement custom execution strategies
Optimize workflow performance
For most applications, the default dispatcher works well. Consider custom implementations when you need:
Background/queued processing
Distributed execution across nodes
Custom routing or load balancing
Integration with existing message brokers
Last updated