The migrations package handles schema creation and versioning for supported databases including SQL Server, PostgreSQL, and MySQL.
Note: For custom schema requirements or unsupported databases, you can use the PostgreSQL schema above as a reference and adapt it to your database's SQL dialect.
Transactions
Dapper operations participate in ambient transactions. For explicit control:
Performance Tuning
Connection Pool Configuration
Batch Operations
Dapper excels at batch operations with low overhead:
Migration Strategy
Since Dapper doesn't manage migrations, use one of these approaches:
Option 1: FluentMigrator
Option 2: DbUp
Option 3: Plain SQL Scripts
Maintain versioned SQL scripts and apply via CI/CD:
Troubleshooting
Connection Issues
Error:Connection refused or timeout
Solutions:
Verify database server is running
Check connection string format
Ensure network connectivity
Schema Mismatch
Error:relation "elsa.workflow_instances" does not exist
Solution: Schema must be created before running the application. Run schema creation scripts.
Performance Issues
Slow queries:
Verify indexes exist
Use database query analyzer (EXPLAIN ANALYZE in PostgreSQL)
-- Create schema
CREATE SCHEMA IF NOT EXISTS elsa;
-- Workflow Definitions
CREATE TABLE elsa.workflow_definitions (
id VARCHAR(255) PRIMARY KEY,
definition_id VARCHAR(255) NOT NULL,
name VARCHAR(500),
description TEXT,
version INT NOT NULL DEFAULT 1,
is_published BOOLEAN NOT NULL DEFAULT FALSE,
is_latest BOOLEAN NOT NULL DEFAULT FALSE,
is_readonly BOOLEAN NOT NULL DEFAULT FALSE,
is_system BOOLEAN NOT NULL DEFAULT FALSE,
materialized_name VARCHAR(500),
provider_name VARCHAR(255),
custom_properties JSONB,
variables JSONB,
inputs JSONB,
outputs JSONB,
outcomes JSONB,
root JSONB,
options JSONB,
use_activity_id_as_node_id BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE (definition_id, version)
);
-- Workflow Instances
CREATE TABLE elsa.workflow_instances (
id VARCHAR(255) PRIMARY KEY,
definition_id VARCHAR(255) NOT NULL,
definition_version_id VARCHAR(255) NOT NULL,
version INT NOT NULL DEFAULT 1,
status VARCHAR(50) NOT NULL,
sub_status VARCHAR(50) NOT NULL,
correlation_id VARCHAR(255),
name VARCHAR(500),
incident_count INT NOT NULL DEFAULT 0,
is_system BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
finished_at TIMESTAMP WITH TIME ZONE,
workflow_state JSONB
);
-- Bookmarks
CREATE TABLE elsa.bookmarks (
id VARCHAR(255) PRIMARY KEY,
activity_type_name VARCHAR(500) NOT NULL,
hash VARCHAR(255) NOT NULL,
workflow_instance_id VARCHAR(255) NOT NULL,
correlation_id VARCHAR(255),
activity_id VARCHAR(255) NOT NULL,
activity_node_id VARCHAR(255),
activity_instance_id VARCHAR(255),
payload JSONB,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Activity Execution Records
CREATE TABLE elsa.activity_execution_records (
id VARCHAR(255) PRIMARY KEY,
workflow_instance_id VARCHAR(255) NOT NULL,
activity_id VARCHAR(255) NOT NULL,
activity_node_id VARCHAR(255),
activity_type VARCHAR(500) NOT NULL,
activity_type_version INT NOT NULL DEFAULT 1,
activity_name VARCHAR(500),
status VARCHAR(50) NOT NULL,
has_bookmarks BOOLEAN NOT NULL DEFAULT FALSE,
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
completed_at TIMESTAMP WITH TIME ZONE,
activity_state JSONB,
outputs JSONB,
exception JSONB
);
-- Workflow Execution Log Records
CREATE TABLE elsa.workflow_execution_log_records (
id VARCHAR(255) PRIMARY KEY,
workflow_instance_id VARCHAR(255) NOT NULL,
activity_id VARCHAR(255),
activity_node_id VARCHAR(255),
activity_type VARCHAR(500),
activity_type_version INT,
activity_name VARCHAR(500),
message TEXT,
event_name VARCHAR(255),
source VARCHAR(255),
payload JSONB,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
sequence BIGINT NOT NULL
);
-- Workflow Inbox Messages
CREATE TABLE elsa.workflow_inbox_messages (
id VARCHAR(255) PRIMARY KEY,
activity_type_name VARCHAR(500) NOT NULL,
hash VARCHAR(255) NOT NULL,
workflow_instance_id VARCHAR(255),
correlation_id VARCHAR(255),
activity_instance_id VARCHAR(255),
input JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE
);
-- Create indexes (see indexing-notes.md for details)
CREATE INDEX idx_workflow_definitions_definition_id ON elsa.workflow_definitions(definition_id);
CREATE INDEX idx_workflow_definitions_is_published ON elsa.workflow_definitions(is_published);
CREATE INDEX idx_workflow_instances_correlation_id ON elsa.workflow_instances(correlation_id);
CREATE INDEX idx_workflow_instances_status ON elsa.workflow_instances(status);
CREATE INDEX idx_workflow_instances_definition_id ON elsa.workflow_instances(definition_id);
CREATE INDEX idx_workflow_instances_updated_at ON elsa.workflow_instances(updated_at);
CREATE INDEX idx_bookmarks_hash ON elsa.bookmarks(hash);
CREATE INDEX idx_bookmarks_activity_type_hash ON elsa.bookmarks(activity_type_name, hash);
CREATE INDEX idx_bookmarks_workflow_instance_id ON elsa.bookmarks(workflow_instance_id);
CREATE INDEX idx_activity_records_workflow_instance ON elsa.activity_execution_records(workflow_instance_id);
CREATE INDEX idx_execution_logs_workflow_instance ON elsa.workflow_execution_log_records(workflow_instance_id);
CREATE INDEX idx_inbox_hash ON elsa.workflow_inbox_messages(hash);
using System.Transactions;
public class MyWorkflowService
{
private readonly IWorkflowInstanceStore _store;
public async Task PerformTransactionalOperation()
{
using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
// Multiple operations in a single transaction
await _store.SaveAsync(instance1);
await _store.SaveAsync(instance2);
scope.Complete(); // Commit
}
}
// PostgreSQL with connection pool settings
var connectionString = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Database = "elsa",
Username = "elsa",
Password = "YOUR_PASSWORD",
MaxPoolSize = 100,
MinPoolSize = 10,
ConnectionIdleLifetime = 300,
CommandTimeout = 60
}.ToString();
dapper.ConnectionFactory = () => new NpgsqlConnection(connectionString);
// Example: Batch delete with Dapper
using var connection = new NpgsqlConnection(connectionString);
await connection.ExecuteAsync(
@"DELETE FROM elsa.workflow_instances
WHERE status = @Status
AND finished_at < @Threshold",
new { Status = "Finished", Threshold = DateTime.UtcNow.AddDays(-30) }
);
[Migration(1)]
public class CreateElsaTables : Migration
{
public override void Up()
{
Execute.Sql(@"CREATE TABLE elsa.workflow_instances (...)");
}
public override void Down()
{
Execute.Sql("DROP TABLE elsa.workflow_instances");
}
}
dotnet add package DbUp
var upgrader = DeployChanges.To
.PostgresqlDatabase(connectionString)
.WithScriptsFromFileSystem("./Migrations")
.Build();
var result = upgrader.PerformUpgrade();