再见了,冗长的顺序代码!用工作流来优化它吧!

Avatar
不若风吹尘
2024-06-16T19:43:57
123
0

我们都曾编写过冗长的逐步执行的代码,其中包含众多逻辑分支,因为它们试图执行涉及多个步骤的复杂业务流程。相比写混乱的复杂过程代码,有更好的解决方案。

逐步执行(Procedural)

我们可能都曾编写或见过类似下面的代码:它首先将订单添加并保存到数据库,然后尝试处理信用卡支付,最后发送确认邮件。

public class Procedural
{
    private readonly Database _db;
    private readonly PaymentGateway _paymentGateway;
    private readonly Emailer _email;

    public Procedural(Database db, PaymentGateway paymentGateway, Emailer email)
    {
        _db = db;
        _paymentGateway = paymentGateway;
        _email = email;
    }

    public async Task PlaceOrder(Order order, CreditCard creditCard)
    {
        // Add the Order to DB
        _db.Add(order);

        // Process Payment
        try
        {
            await _paymentGateway.Process(creditCard);
        }
        catch (TaskCanceledException)
        {
            order.Status = OrderStatus.Cancelled;
            _db.Save(order);
            return;
        }

        // Email Confirmation
        try
        {
            await _email.SendConfirmation(order);
        }
        catch (Exception)
        {
            // Swallow/Log because we don't the whole thing to blow up...
        }
    }
}

你可能会注意到一些问题:如果信用卡请求超时抛出异常,我们会标记订单为取消,保存后退出方法。此外,由于不想让整个流程失败,我们还需要包裹发送确认邮件的代码在 try-catch 中。毕竟,订单已成功保存,而且已经扣除了费用。

image-1.png

可以争论说这不符合单一职责原则、关注点分离等,但即便如此,这也无法解决根本问题。问题在于我们没有分布式事务。有多个操作作为工作流的一部分,针对分布式组件。

这是一个相对简单的例子,但你可以想象在自己的上下文中,有许多独立的操作构成更大的工作流。通常,情况会更复杂,因为代码根据发生的情况执行不同的分支。

在理想情况下,一切运行良好。但那些用于控制流程的 try-catch 块(或基于返回值的条件语句)表示了工作流。特别是,我们希望每个操作独立执行,不受其他操作影响。

每个流程步骤独立工作后,可以决定接下来的操作。例如,如果支付处理失败,我们有逻辑来取消订单。

image-2.png

但我们也应该发送电子邮件通知客户订单已取消。在我的例子中,这行代码应该放在哪里?如果发送邮件失败,我们也需要将其包裹在 try-catch 中吗?如果是这样,这将是一场噩梦。

这就是长流程和业务流程最终变成难以遵循、管理和修改的复杂迷宫的方式。

如前所述,你可以将所有这些代码分解成不同的类/函数/方法,以避免代码审查者批评,但这并不能解决底层问题,因为工作流具有分布式特性。

当然,你想要关注点分离。但在一个可以有多个不同代码分支的工作流中保持一致性,需要所有操作独立执行。

image-3.png

工作流(Workflow)

那么解决方案是什么?具体来说,是支持开发异步工作流的工具。如果你使用 .NET,有很多优秀的工具,比如 NServiceBus,它支持工作流。非 .NET 环境下,也有为不同平台提供的 SDK,比如 Temporal

本质上,它们使用队列和消息以及各种消息模式,在工作流的各个任务周围增加容错和故障恢复能力。

以下是 NServiceBus Saga 示例。它维护订单的状态。这个示例说明了一些有趣之处:当 PlaceOrder 消息入队时,Saga 开始。处理 PlaceOrder 消息时,它首先设置一个 5 秒超时,以便在必要时取消订单。然后发送 ProcessPayment 消息。超时的原因是,如果在规定时间内未处理付款(这只是一个演示),我们可以取消订单。处理 ProcessPayment 后,发送 EmailConfirmation 消息。最后,当这一切完成时,调用 MarkAsComplete() 结束 Saga。

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<PlaceOrder>,
    IHandleMessages<ProcessPayment>,
    IHandleMessages<EmailConfirmation>,
    IHandleTimeouts<CancelOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(sagaData => sagaData.OrderId)
            .ToMessage<PlaceOrder>(message => message.OrderId)
            .ToMessage<ProcessPayment>(message => message.OrderId)
            .ToMessage<EmailConfirmation>(message => message.OrderId);
    }

    public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
    {
        Log.Info($"PlaceOrder received with OrderId {message.OrderId}");

        var timeout = DateTimeOffset.UtcNow.AddSeconds(5);
        Log.Info("Requesting a CancelOrder that will be executed in 5 seconds.");
        await RequestTimeout<CancelOrder>(context, timeout);

        var processPayment = new ProcessPayment
        {
            OrderId = Data.OrderId
        };
        await context.SendLocal(processPayment);
    }

    public async Task Handle(ProcessPayment message, IMessageHandlerContext context)
    {
        Data.PaymentProcessed = true;
        Log.Info($"ProcessPayment received with OrderId {message.OrderId}");

        var emailConfirmation = new EmailConfirmation
        {
            OrderId = Data.OrderId
        };
        await context.SendLocal(emailConfirmation);
    }

    public Task Handle(EmailConfirmation message, IMessageHandlerContext context)
    {
        Log.Info($"Email Confirmation received with OrderId {message.OrderId}");

        MarkAsComplete();

        Log.Info($"Process Complete for OrderId {message.OrderId}");

        return Task.CompletedTask;
    }

    public Task Timeout(CancelOrder state, IMessageHandlerContext context)
    {
        if (Data.PaymentProcessed)
        {
            Log.Info($"Cannot cancel order since payment has been processed for OrderId {Data.OrderId}.");
            return Task.CompletedTask;
        }

        Log.Info($"CompleteOrder not received soon enough OrderId {Data.OrderId}.");

        MarkAsComplete();

        Log.Info($"Process Complete for OrderId {Data.OrderId}");

        return Task.CompletedTask;
    }
}

重要的是,每个 HandleTimeout 方法都是异步调用的。没有逐行执行,也没有时间上的依赖(除了超时)。由于 NServiceBus 的默认设置,一个消息处理失败会被自动重试。你不必为所有这些不同的条件编写大量逻辑,就像在逐步执行的代码中那样。

这背后使用了队列,提供了处理失败的不同方式(如重试、延退、丢弃等)。当然,你可以自己实现这些功能,或者使用像 Polly 这样的库定义重试策略,但这些通常直接内置在消息库中。你无需编写这些代码或底层实现。

更多关于这些概念的讨论,请查看文章:构建健壮架构的 5 个技巧

这里还有另一个使用 Temporal 的相同概念示例,它使用工作流和活动的概念。

[Workflow]
public class MyWorkflow
{
    [WorkflowRun]
    public async Task<MyActivities.Order> RunAsync()
    {
        var order = new MyActivities.Order(Guid.NewGuid());

        await Workflow.ExecuteActivityAsync<MyActivities>(activities => activities.PlaceOrder(order), new ActivityOptions() {
            StartToCloseTimeout = TimeSpan.FromMinutes(5),
        });

        var transactionId = await Workflow.ExecuteActivityAsync<MyActivities, Guid>(activities => activities.ProcessPayment(order, new MyActivities.CreditCard()), new ActivityOptions {
            StartToCloseTimeout = TimeSpan.FromMinutes(5),
        });

        Console.WriteLine($"Transaction Id: {transactionId} for Order Id {order.OrderId}");

        await Workflow.ExecuteActivityAsync<MyActivities>(activities => activities.EmailConfirmation(order), new ActivityOptions {
            StartToCloseTimeout = TimeSpan.FromMinutes(5),
        });

        Console.WriteLine("Workflow Complete");

        return order;
    }
}

public class MyActivities
{
    [Activity]
    public Task PlaceOrder(Order order)
    {
        Console.WriteLine($"Place Order Id {order.OrderId}");
        return Task.CompletedTask;
    }

    [Activity]
    public async Task<Guid> ProcessPayment(Order order, CreditCard creditCard)
    {
        Console.WriteLine($"Process Payment for Order Id {order.OrderId}");
        await Task.Delay(TimeSpan.FromSeconds(5));

        return Guid.NewGuid();
    }

    [Activity]
    public Task EmailConfirmation(Order order)
    {
        Console.WriteLine($"Email Confirmation for Order Id {order.OrderId}");
        return Task.CompletedTask;
    }

    public record Order(Guid OrderId);

    public record CreditCard();
}

逐步执行代码的噩梦

如果你在一个包含大量逐行执行代码的代码库中,这些代码实际上是长流程中的多个操作,请考虑使用消息库或工作流库来简化添加容错的繁重工作。

市面上有专为此目的设计的优秀工具。如果你忽视它们,你可能会尝试自己构建不完善的解决方案,或者在处理工作流程中的失败时创建更多的条件代码混乱。

业务流程和工作流无处不在,通常是非阻塞的。拥抱这种异步性。

译自:Goodbye, Long Procedural Code. Fix It With Workflows

Last Modification : 9/20/2024 4:43:36 AM


In This Document