elsa 外部应用程序集成

一种常见的架构是拥有一个负责协调工作流程执行的工作流引擎,以及一个单独的应用程序,该程序负责执行构成工作流的任务。这些任务可以用任何编程语言实现,并可以托管在任何应用中。

为了了解其工作原理,我们将创建两个使用 webhooks 相互通信的应用程序。此应用代表员工入职流程,其中工作流引擎负责协调流程,而任务执行器负责执行构成流程的任务。

这些任务将由人类完成,但同样的方法也可用于运行自动化任务。最终目标是在任务完成后向工作流服务器反馈信息。

RunTask 活动

为了请求执行任务,我们将使用 RunTask 活动。当此活动执行时,它执行两个步骤:

  1. 它发布一个名为 RunTaskRequest 的领域事件。
  2. 它创建一个书签并等待系统恢复它。

开箱即用,Elsa 不提供处理 RunTaskRequest 领域事件的任何方式。您可以选择自己处理,或者使用 Elsa.Webhooks 包来处理,这正是本示例中我们要做的。

Elsa.Webhooks 包

Elsa.Webhooks 包通过向所有注册的匹配此事件的 webhook 端点发送 HTTP 请求来处理 RunTaskRequest 领域事件。webhook 端点在您的工作流服务器应用中配置,并指向将执行任务的外部应用。然后,外部应用将执行任务,并在完成后向工作流服务器应用发送响应。


示例

为了演示如何工作,我们将创建两个应用:

  1. 托管工作流引擎的工作流服务器应用。
  2. 执行构成工作流任务的任务执行器应用。

入职工作流将协调以下任务:

  1. 接收新员工的姓名和电子邮件地址作为输入。
  2. 运行名为创建邮箱账户的任务。
  3. 运行名为创建 Slack 账户的任务。
  4. 运行名为创建 GitHub 账户的任务。
  5. 运行名为添加到 HR 系统的任务。
  6. 运行名为添加到薪资系统的任务。
  7. 向新员工发送一封电子邮件,欢迎他们加入公司。

我们将让工作流运行第一个任务,一旦完成,剩余的任务将以并行方式运行,因为它们相互独立。

工作流服务器

工作流服务器应用是一个简单的 ASP.NET Core 应用,它托管工作流引擎。它使用 Elsa.Webhooks 包来处理 RunTaskRequest 领域事件。

要设置此应用,请遵循 Elsa Server 指南中的步骤,并添加以下包:

dotnet add package Elsa.Webhooks
dotnet add package Elsa.JavaScript
dotnet add package Elsa.Email
dotnet add package Elsa.Identity
dotnet add package Elsa.EntityFrameworkCore
dotnet add package Elsa.EntityFrameworkCore.Sqlite
dotnet add package Elsa.Workflows.Api

接下来,更新 Program.cs 文件如下:

Program.cs

using Elsa.EntityFrameworkCore.Modules.Management;
using Elsa.EntityFrameworkCore.Modules.Runtime;
using Elsa.Extensions;
using Elsa.Webhooks.Extensions;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddElsa(elsa =>
{
    // 配置管理层以使用EF Core。
    elsa.UseWorkflowManagement(management => management.UseEntityFrameworkCore());

    // 配置运行时层以使用EF Core。
    elsa.UseWorkflowRuntime(runtime => runtime.UseEntityFrameworkCore());

    // 默认身份验证/授权功能。
    elsa.UseIdentity(identity =>
    {
        identity.TokenOptions = options => options.SigningKey = "sufficiently-large-secret-signing-key"; // 此密钥需要至少256位长。
        identity.UseAdminUserProvider();
    });

    // 配置ASP.NET身份验证/授权。
    elsa.UseDefaultAuthentication(auth => auth.UseAdminApiKey());

    // 暴露Elsa API端点。
    elsa.UseWorkflowsApi();

    // 设置SignalR中心以便从服务器接收实时更新。
    elsa.UseRealTimeWorkflows();

    // 启用JavaScript工作流表达式
    elsa.UseJavaScript(options => options.AllowClrAccess = true);

    // 使用电子邮件活动。
    elsa.UseEmail(email =>
    {
        email.ConfigureOptions = options =>
        {
            options.Host = "localhost";
            options.Port = 2525;
        };
    });

    // 注册来自应用的自定义webhook定义(如果有)。
    elsa.UseWebhooks(webhooks => webhooks.WebhookOptions = options => builder.Configuration.GetSection("Webhooks").Bind(options));
});

// 配置CORS以允许设计师应用托管在不同源上调用API。
builder.Services.AddCors(cors => cors
    .AddDefaultPolicy(policy => policy
        .AllowAnyOrigin() // 仅用于演示目的。应使用特定的源。
        .AllowAnyHeader()
        .AllowAnyMethod()
        .WithExposedHeaders("x-elsa-workflow-instance-id"))); // Elsa Studio为了支持从设计器运行工作流而需要。或者,您可以使用`*`通配符暴露所有标头。

// 添加健康检查。
builder.Services.AddHealthChecks();

// 构建Web应用。
var app = builder.Build();

// 配置Web应用的中间件管道。
app.UseCors();
app.UseRouting(); // 信号R所需。
app.UseAuthentication();
app.UseAuthorization();
app.UseWorkflowsApi(); // 使用Elsa API端点。
app.UseWorkflows(); // 使用Elsa中间件处理映射到HTTP端点活动的HTTP请求。
app.UseWorkflowsSignalRHubs(); // 可选的SignalR集成。Elsa Studio使用SignalR从服务器接收实时更新。

app.Run();

更新appsettings.json并添加以下部分:

{
  "Webhooks": {
    "Endpoints": [
      {
        "EventTypes": ["RunTask"],
        "Url": "https://localhost:5002/api/webhooks/run-task"
      }
    ]
  }
}

此配置告知工作流服务器,每当发布 RunTaskRequest 域事件时,应向任务执行器应用程序发送 webhook 请求。

启动工作流服务器应用并创建 如下工作流

员工入职流程图

上述工作流按照上述描述建模了流程:

  1. 接收新员工的姓名和电子邮件地址作为输入。
  2. 运行名为 创建邮箱账户 的任务。
  3. 运行名为 创建 Slack 账户 的任务。
  4. 运行名为 创建 GitHub 账户 的任务。
  5. 运行名为 添加至 HR 系统 的任务。
  6. 运行名为 添加至薪资系统 的任务。
  7. 向新员工发送邮件,欢迎他们加入公司。

除第一个和最后一个任务外,所有任务都由外部应用程序执行。

让我们逐一查看工作流中的每个活动:

设置员工信息从输入

这是一个 SetVariable 活动,使用以下 JavaScript 表达式将 Employee 变量设置为 Employee 输入的值:

getInput("Employee");

设置变量活动截图

注意,此活动设置了一个名为 Employee 的工作流变量。 确保首先从变量标签页创建这个变量,如上图(1)所示。

创建邮箱账户

这是一个 RunTask 活动,请求外部应用程序执行 创建邮箱账户 任务:

运行任务活动截图

创建 Slack 账户

这与前一个活动相同,但针对的是 创建 Slack 账户 任务,并使用以下表达式设置 Payload 输入:

return {
  employee: getEmployee(),
  description: "为新员工创建一个 Slack 账户。",
};

创建 GitHub 账户

这与前一个活动相同,但针对的是 创建 GitHub 账户 任务,并使用以下表达式设置 Payload 输入:

return {
  employee: getEmployee(),
  description: "为新员工创建一个 GitHub 账户。",
};

添加至 HR 系统

这与前一个活动相同,但针对的是 添加至 HR 系统 任务,并使用以下表达式设置 Payload 输入:

return {
  employee: getEmployee(),
  description: "将新员工添加到 HR 系统中。",
};

发送欢迎邮件

这是一个 SendEmail 活动,向新员工发送欢迎邮件,包含以下设置:

发件人
hr@acme.com
收件人
getEmployee().Email;
主题
`欢迎加入,${getEmployee().Name}!`;
正文
`你好,${getEmployee().Name},<br><br>你的所有账户已经设置完毕,欢迎加入!`;

外部应用程序

外部应用程序是一个 ASP.NET Core MVC 应用,用于执行构成工作流的任务。 该应用包含一个视图以显示用户应完成的任务列表,以及一个控制器来处理来自工作流服务器的 webhook 请求。

安装设置

运行以下 CLI 命令以搭建新项目:

dotnet new mvc -o EmployeeOnboarding.Web -f net8.0

添加以下 NuGet 包:

cd EmployeeOnboarding.Web
dotnet add package Microsoft.EntityFrameworkCore
dotnet add package Microsoft.EntityFrameworkCore.Sqlite
dotnet add package Microsoft.EntityFrameworkCore.Design
dotnet add package Microsoft.EntityFrameworkCore.Sqlite.Design
dotnet add package Elsa.EntityFrameworkCore

对于此应用,我们将使用 Entity Framework Core 将入职任务存储在 SQLite 数据库中。 首先,让我们这样定义入职任务实体:

Entities/OnboardingTask.cs

namespace EmployeeOnboarding.Web.Entities;

/// <summary>
/// 需要用户完成的任务。
/// </summary>
public class OnboardingTask
{
    /// <summary>
    /// 任务的ID。
    /// </summary>
    public long Id { get; set; }

    /// <summary>
    /// 可用于引用任务的外部ID。
    /// </summary>
    public string ExternalId { get; set; } = default!;

    /// <summary>
    /// 任务所属的入职流程ID。
    /// </summary>
    public string ProcessId { get; set; } = default!;

    /// <summary>
    /// 任务的名称。
    /// </summary>
    public string Name { get; set; } = default!;

    /// <summary>
    /// 任务描述。
    /// </summary>
    public string Description { get; set; } = default!;

    /// <summary>
    /// 正在接受入职的员工姓名。
    /// </summary>
    public string EmployeeName { get; set; } = default!;

    /// <summary>
    /// 正在接受入职的员工电子邮件地址。
    /// </summary>
    public string EmployeeEmail { get; set; } = default!;

    /// <summary>
    /// 任务是否已完成。
    /// </summary>
    public bool IsCompleted { get; set; }

    /// <summary>
    /// 任务创建的日期和时间。
    /// </summary>
    public DateTimeOffset CreatedAt { get; set; }

    /// <summary>
    /// 任务完成的日期和时间。
    /// </summary>
    public DateTimeOffset? CompletedAt { get; set; }
}

接下来,我们将创建数据库上下文:

Data/OnboardingDbContext.cs

using EmployeeOnboarding.Web.Entities;
using Microsoft.EntityFrameworkCore;

namespace EmployeeOnboarding.Web.Data;

public class OnboardingDbContext : DbContext
{
    public OnboardingDbContext(DbContextOptions<OnboardingDbContext> options) : base(options)
    {
    }

    public DbSet<OnboardingTask> Tasks { get; set; } = default!;
}

然后,在 Program.cs 中配置数据库上下文:

Program.cs

builder.Services.AddDbContextFactory<OnboardingDbContext>(options => options.UseSqlite("Data Source=onboarding.db"));

注意我们使用了 DbContextFactory 来创建数据库上下文,这使得我们能够从一个托管服务自动运行迁移,我们将在下一步创建它。

数据库迁移

执行以下命令以生成初始迁移:

dotnet ef migrations add Initial

创建一个托管服务,该服务在应用程序启动时自动运行迁移:

HostedServices/MigrationsHostedService.cs

using EmployeeOnboarding.Web.Data;
using Microsoft.EntityFrameworkCore;

namespace EmployeeOnboarding.Web.HostedServices;

public class MigrationsHostedService(IServiceProvider serviceProvider) : IHostedService
{
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        using var scope = serviceProvider.CreateScope();
        var dbContextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<OnboardingDbContext>>();
        await using var dbContext = await dbContextFactory.CreateDbContextAsync(cancellationToken);
        await dbContext.Database.MigrateAsync(cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

Program.cs 中注册此托管服务:

Program.cs

builder.Services.AddHostedService<MigrationsHostedService>();

任务列表

现在我们已经设置了数据访问层,接下来更新主页控制器以显示需要完成的任务列表。为此,我们将为 HomeControllerIndex 操作引入一个名为 IndexViewModel 的视图模型:

Views/Home/IndexViewModel.cs

using EmployeeOnboarding.Web.Entities;

namespace EmployeeOnboarding.Web.Views.Home;

public class IndexViewModel(ICollection<OnboardingTask> tasks)
{
    public ICollection<OnboardingTask> Tasks { get; set; } = tasks;
}

然后更新 HomeController 中的 Index 操作来使用视图模型:

Controllers/HomeController.cs

using EmployeeOnboarding.Web.Data;
using EmployeeOnboarding.Web.Views.Home;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;

namespace EmployeeOnboarding.Web.Controllers;

public class HomeController(OnboardingDbContext dbContext) : Controller
{
    public async Task<IActionResult> Index(CancellationToken cancellationToken)
    {
        var tasks = await dbContext.Tasks.Where(x => !x.IsCompleted).ToListAsync(cancellationToken: cancellationToken);
        var model = new IndexViewModel(tasks);
        return View(model);
    }
}

最后,更新 Index.cshtml 视图以展示任务列表:

Views/Home/Index.cshtml

@model EmployeeOnboarding.Web.Views.Home.IndexViewModel
@{
    ViewData["Title"] = "Home Page";
}

<div class="text-center">
    <h1 class="display-4">Tasks</h1>
    <p>Please complete the following tasks.</p>
</div>

<div class="container">
    <table class="table table-bordered table-hover">
        <thead class="table-light">
        <tr>
            <th scope="col">Task ID</th>
            <th scope="col">Name</th>
            <th scope="col">Description</th>
            <th scope="col">Employee</th>
            <th scope="col"></th>
        </tr>
        </thead>
        <tbody>
        @foreach (var task in Model.Tasks)
        {
            <tr>

                <th scope="row">@task.Id</th>
                <td>@task.Name</td>
                <td>@task.Description</td>
                <td>@($"{task.EmployeeName} <{task.EmployeeEmail}>")</td>
                <td>
                    <form asp-action="CompleteTask">
                        <input type="hidden" name="TaskId" value="@task.Id"/>
                        <button type="submit" class="btn btn-primary">Complete</button>
                    </form>
                </td>
            </tr>
        }
        </tbody>
    </table>
</div>

接收任务

既然我们有了展示任务列表的方法,接下来设置一个 webhook 控制器,用于接收来自工作流程服务器应用的任务。首先,创建一个新的控制器 WebhookController

Controllers/WebhookController.cs

using EmployeeOnboarding.Web.Data;
using EmployeeOnboarding.Web.Entities;
using EmployeeOnboarding.Web.Models;
using Microsoft.AspNetCore.Mvc;

namespace EmployeeOnboarding.Web.Controllers;

[ApiController]
[Route("api/webhooks")]
public class WebhookController(OnboardingDbContext dbContext) : Controller
{
    [HttpPost("run-task")]
    public async Task<IActionResult> RunTask(WebhookEvent webhookEvent)
    {
        var payload = webhookEvent.Payload;
        var taskPayload = payload.TaskPayload;
        var employee = taskPayload.Employee;
        
        var task = new OnboardingTask
        {
            ProcessId = payload.WorkflowInstanceId,
            ExternalId = payload.TaskId,
            Name = payload.TaskName,
            Description = taskPayload.Description,
            EmployeeEmail = employee.Email,
            EmployeeName = employee.Name,
            CreatedAt = DateTimeOffset.Now
        };

        await dbContext.Tasks.AddAsync(task);
        await dbContext.SaveChangesAsync();

        return Ok();
    }
}

上述代码使用了 WebhookEvent 模型来反序列化 webhook 载荷。WebhookEvent 及其相关模型定义如下:

Models/WebhookEvent.cs

namespace EmployeeOnboarding.Web.Models;

public record WebhookEvent(string EventType, RunTaskWebhook Payload, DateTimeOffset Timestamp);

Models/RunTaskWebhook.cs

namespace EmployeeOnboarding.Web.Models;

public record RunTaskWebhook(string WorkflowInstanceId, string TaskId, string TaskName, TaskPayload TaskPayload);

Models/TaskPayload.cs

namespace EmployeeOnboarding.Web.Models;

public record TaskPayload(Employee Employee, string Description);

Models/Employee.cs

namespace EmployeeOnboarding.Web.Models;

public record Employee(string Name, string Email);

完成任务

接下来,我们在 HomeController 中添加 CompleteTask 动作。下面是完整的 HomeController

Controllers/HomeController.cs

using EmployeeOnboarding.Web.Data;
using EmployeeOnboarding.Web.Services;
using EmployeeOnboarding.Web.Views.Home;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;

namespace EmployeeOnboarding.Web.Controllers;

public class HomeController(OnboardingDbContext dbContext, ElsaClient elsaClient) : Controller
{
    public async Task<IActionResult> Index(CancellationToken cancellationToken)
    {
        var tasks = await dbContext.Tasks.Where(x => !x.IsCompleted).ToListAsync(cancellationToken: cancellationToken);
        var model = new IndexViewModel(tasks);
        return View(model);
    }
    
    public async Task<IActionResult> CompleteTask(int taskId, CancellationToken cancellationToken)
    {
        var task = dbContext.Tasks.FirstOrDefault(x => x.Id == taskId);
    
        if (task == null)
            return NotFound();
    
        await elsaClient.ReportTaskCompletedAsync(task.ExternalId, cancellationToken: cancellationToken);
    
        task.IsCompleted = true;
        task.CompletedAt = DateTimeOffset.Now;
    
        dbContext.Tasks.Update(task);
        await dbContext.SaveChangesAsync(cancellationToken);
    
        return RedirectToAction("Index");
    }
}

上述代码使用了 ElsaClient 来报告任务已完成,其定义如下:

Services/ElsaClient.cs

namespace EmployeeOnboarding.Web.Services;

/// <summary>
/// Elsa API 的客户端。
/// </summary>
public class ElsaClient(HttpClient httpClient)
{
    /// <summary>
    /// 报告任务已完成。
    /// </summary>
    /// <param name="taskId">要完成的任务的ID。</param>
    /// <param name="result">任务的结果(可选)。</param>
    /// <param name="cancellationToken">取消操作的令牌(可选)。</param>
    public async Task ReportTaskCompletedAsync(string taskId, object? result = default, CancellationToken cancellationToken = default)
    {
        var url = new Uri($"tasks/{taskId}/complete", UriKind.Relative);
        var request = new { Result = result };
        await httpClient.PostAsJsonAsync(url, request, cancellationToken);
    }
}

HttpClient 是在 Program.cs 中配置的,如下所示:

Program.cs

// Elsa API 客户端配置。
var configuration = builder.Configuration;

builder.Services.AddHttpClient<ElsaClient>(httpClient =>
{
    var url = configuration["Elsa:ServerUrl"]!.TrimEnd('/') + '/';
    var apiKey = configuration["Elsa:ApiKey"]!;
    httpClient.BaseAddress = new Uri(url);
    httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("ApiKey", apiKey);
});

appsettings.json 中,Elsa 部分定义如下:

appsettings.json

{
  "Elsa": {
    "ServerUrl": "https://localhost:5001/elsa/api",
    "ApiKey": "00000000-0000-0000-0000-000000000000"
  }
}

运行以下命令行指令以启动应用:

dotnet run --urls=https://localhost:5002

工作流运行

现在我们有了显示任务列表和完成任务的方法,接下来运行工作流。

发送以下请求以运行工作流:

curl --location 'https://localhost:5001/elsa/api/workflow-definitions/{workflow_definition_id}/execute' \
--header 'Content-Type: application/json' \
--header 'Authorization: ApiKey 00000000-0000-0000-0000-000000000000' \
--data-raw '{
    "input": {
        "Employee": {
            "Name": "Alice Smith",
            "Email": "alice.smith@acme.com"
        }
    }
}'

确保将 {workflow_definition_id} 替换为我们之前创建的实际工作流定义 ID。

上述请求的效果是在数据库中创建一个新的任务,该任务将在 Web 应用中显示。

任务列表1

当你点击 完成 按钮后,该任务将在数据库中被标记为已完成,并且工作流将在其他应用中异步继续执行。 当你刷新任务列表页面时,这个任务会消失,但数据库中会新增 4 个任务:

任务列表2

一旦你完成了所有任务,工作流将向员工发送欢迎邮件,并完成工作流。

员工欢迎邮件

你可以在此处找到本指南的最终源代码:这里

结论

在这篇指南中,我们学习了如何通过使用 webhooks 将外部应用与 Elsa 集成。我们了解了如何创建一个工作流,将任务发送到外部应用,以及如何从外部应用接收任务并报告它们已完成。

在本文档中