我们都曾编写过冗长的逐步执行的代码,其中包含众多逻辑分支,因为它们试图执行涉及多个步骤的复杂业务流程。相比写混乱的复杂过程代码,有更好的解决方案。
逐步执行(Procedural)
我们可能都曾编写或见过类似下面的代码:它首先将订单添加并保存到数据库,然后尝试处理信用卡支付,最后发送确认邮件。
public class Procedural
{
private readonly Database _db;
private readonly PaymentGateway _paymentGateway;
private readonly Emailer _email;
public Procedural(Database db, PaymentGateway paymentGateway, Emailer email)
{
_db = db;
_paymentGateway = paymentGateway;
_email = email;
}
public async Task PlaceOrder(Order order, CreditCard creditCard)
{
// Add the Order to DB
_db.Add(order);
// Process Payment
try
{
await _paymentGateway.Process(creditCard);
}
catch (TaskCanceledException)
{
order.Status = OrderStatus.Cancelled;
_db.Save(order);
return;
}
// Email Confirmation
try
{
await _email.SendConfirmation(order);
}
catch (Exception)
{
// Swallow/Log because we don't the whole thing to blow up...
}
}
}
你可能会注意到一些问题:如果信用卡请求超时抛出异常,我们会标记订单为取消,保存后退出方法。此外,由于不想让整个流程失败,我们还需要包裹发送确认邮件的代码在 try-catch
中。毕竟,订单已成功保存,而且已经扣除了费用。
可以争论说这不符合单一职责原则、关注点分离等,但即便如此,这也无法解决根本问题。问题在于我们没有分布式事务。有多个操作作为工作流的一部分,针对分布式组件。
这是一个相对简单的例子,但你可以想象在自己的上下文中,有许多独立的操作构成更大的工作流。通常,情况会更复杂,因为代码根据发生的情况执行不同的分支。
在理想情况下,一切运行良好。但那些用于控制流程的 try-catch
块(或基于返回值的条件语句)表示了工作流。特别是,我们希望每个操作独立执行,不受其他操作影响。
每个流程步骤独立工作后,可以决定接下来的操作。例如,如果支付处理失败,我们有逻辑来取消订单。
但我们也应该发送电子邮件通知客户订单已取消。在我的例子中,这行代码应该放在哪里?如果发送邮件失败,我们也需要将其包裹在 try-catch
中吗?如果是这样,这将是一场噩梦。
这就是长流程和业务流程最终变成难以遵循、管理和修改的复杂迷宫的方式。
如前所述,你可以将所有这些代码分解成不同的类/函数/方法,以避免代码审查者批评,但这并不能解决底层问题,因为工作流具有分布式特性。
当然,你想要关注点分离。但在一个可以有多个不同代码分支的工作流中保持一致性,需要所有操作独立执行。
工作流(Workflow)
那么解决方案是什么?具体来说,是支持开发异步工作流的工具。如果你使用 .NET,有很多优秀的工具,比如 NServiceBus,它支持工作流。非 .NET 环境下,也有为不同平台提供的 SDK,比如 Temporal。
本质上,它们使用队列和消息以及各种消息模式,在工作流的各个任务周围增加容错和故障恢复能力。
以下是 NServiceBus Saga 示例。它维护订单的状态。这个示例说明了一些有趣之处:当 PlaceOrder
消息入队时,Saga 开始。处理 PlaceOrder
消息时,它首先设置一个 5 秒超时,以便在必要时取消订单。然后发送 ProcessPayment
消息。超时的原因是,如果在规定时间内未处理付款(这只是一个演示),我们可以取消订单。处理 ProcessPayment
后,发送 EmailConfirmation
消息。最后,当这一切完成时,调用 MarkAsComplete()
结束 Saga。
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<PlaceOrder>,
IHandleMessages<ProcessPayment>,
IHandleMessages<EmailConfirmation>,
IHandleTimeouts<CancelOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(sagaData => sagaData.OrderId)
.ToMessage<PlaceOrder>(message => message.OrderId)
.ToMessage<ProcessPayment>(message => message.OrderId)
.ToMessage<EmailConfirmation>(message => message.OrderId);
}
public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
Log.Info($"PlaceOrder received with OrderId {message.OrderId}");
var timeout = DateTimeOffset.UtcNow.AddSeconds(5);
Log.Info("Requesting a CancelOrder that will be executed in 5 seconds.");
await RequestTimeout<CancelOrder>(context, timeout);
var processPayment = new ProcessPayment
{
OrderId = Data.OrderId
};
await context.SendLocal(processPayment);
}
public async Task Handle(ProcessPayment message, IMessageHandlerContext context)
{
Data.PaymentProcessed = true;
Log.Info($"ProcessPayment received with OrderId {message.OrderId}");
var emailConfirmation = new EmailConfirmation
{
OrderId = Data.OrderId
};
await context.SendLocal(emailConfirmation);
}
public Task Handle(EmailConfirmation message, IMessageHandlerContext context)
{
Log.Info($"Email Confirmation received with OrderId {message.OrderId}");
MarkAsComplete();
Log.Info($"Process Complete for OrderId {message.OrderId}");
return Task.CompletedTask;
}
public Task Timeout(CancelOrder state, IMessageHandlerContext context)
{
if (Data.PaymentProcessed)
{
Log.Info($"Cannot cancel order since payment has been processed for OrderId {Data.OrderId}.");
return Task.CompletedTask;
}
Log.Info($"CompleteOrder not received soon enough OrderId {Data.OrderId}.");
MarkAsComplete();
Log.Info($"Process Complete for OrderId {Data.OrderId}");
return Task.CompletedTask;
}
}
重要的是,每个 Handle
或 Timeout
方法都是异步调用的。没有逐行执行,也没有时间上的依赖(除了超时)。由于 NServiceBus 的默认设置,一个消息处理失败会被自动重试。你不必为所有这些不同的条件编写大量逻辑,就像在逐步执行的代码中那样。
这背后使用了队列,提供了处理失败的不同方式(如重试、延退、丢弃等)。当然,你可以自己实现这些功能,或者使用像 Polly 这样的库定义重试策略,但这些通常直接内置在消息库中。你无需编写这些代码或底层实现。
更多关于这些概念的讨论,请查看文章:构建健壮架构的 5 个技巧。
这里还有另一个使用 Temporal 的相同概念示例,它使用工作流和活动的概念。
[Workflow]
public class MyWorkflow
{
[WorkflowRun]
public async Task<MyActivities.Order> RunAsync()
{
var order = new MyActivities.Order(Guid.NewGuid());
await Workflow.ExecuteActivityAsync<MyActivities>(activities => activities.PlaceOrder(order), new ActivityOptions() {
StartToCloseTimeout = TimeSpan.FromMinutes(5),
});
var transactionId = await Workflow.ExecuteActivityAsync<MyActivities, Guid>(activities => activities.ProcessPayment(order, new MyActivities.CreditCard()), new ActivityOptions {
StartToCloseTimeout = TimeSpan.FromMinutes(5),
});
Console.WriteLine($"Transaction Id: {transactionId} for Order Id {order.OrderId}");
await Workflow.ExecuteActivityAsync<MyActivities>(activities => activities.EmailConfirmation(order), new ActivityOptions {
StartToCloseTimeout = TimeSpan.FromMinutes(5),
});
Console.WriteLine("Workflow Complete");
return order;
}
}
public class MyActivities
{
[Activity]
public Task PlaceOrder(Order order)
{
Console.WriteLine($"Place Order Id {order.OrderId}");
return Task.CompletedTask;
}
[Activity]
public async Task<Guid> ProcessPayment(Order order, CreditCard creditCard)
{
Console.WriteLine($"Process Payment for Order Id {order.OrderId}");
await Task.Delay(TimeSpan.FromSeconds(5));
return Guid.NewGuid();
}
[Activity]
public Task EmailConfirmation(Order order)
{
Console.WriteLine($"Email Confirmation for Order Id {order.OrderId}");
return Task.CompletedTask;
}
public record Order(Guid OrderId);
public record CreditCard();
}
逐步执行代码的噩梦
如果你在一个包含大量逐行执行代码的代码库中,这些代码实际上是长流程中的多个操作,请考虑使用消息库或工作流库来简化添加容错的繁重工作。
市面上有专为此目的设计的优秀工具。如果你忽视它们,你可能会尝试自己构建不完善的解决方案,或者在处理工作流程中的失败时创建更多的条件代码混乱。
业务流程和工作流无处不在,通常是非阻塞的。拥抱这种异步性。
Comments