Vấn đề
Peak traffic của một hệ thống e-commerce là unpredictable. Bình thường 100 orders/giờ, nhưng flash sale có thể là 10,000 orders trong 5 phút đầu.
Nếu order processing pipeline xử lý synchronously, peak load sẽ:
- Overload database với thousands of concurrent writes
- Khiến checkout timeout → frustrated customers
- Potential data corruption nếu không có proper transaction handling
Giải pháp: Decouple order intake từ order processing bằng event streaming.
Event Hub vs Service Bus - Chọn cái nào?
Đây là câu hỏi tôi thấy nhiều người nhầm.
| Khía cạnh | Azure Service Bus | Azure Event Hub |
|---|---|---|
| Pattern | Message queue | Event streaming |
| Message ordering | Per session | Per partition |
| Message retention | Đến khi consumed | Time-based (1-7 days) |
| Replay | Không | Có (seek by offset) |
| Throughput | ~10K msg/s | ~1M events/s per partition |
| Use case | Task queue, RPC | Telemetry, audit log, stream processing |
| Dead letter queue | Built-in | Phải implement |
Với order processing pipeline: Tôi dùng cả hai - pattern phổ biến:
- Event Hub: Ingest high-volume events (order created, payment confirmed, inventory deducted)
- Service Bus: Process each step sequentially với guaranteed delivery và dead-letter handling
Giải thích đơn giản: Event Hub như highway
Hãy nghĩ Event Hub như một highway nhiều làn (partitions). Xe (events) vào highway theo partition key (ví dụ: order_id). Xe trong cùng một làn đi theo thứ tự. Nhiều "đội xe cứu hộ" (consumer groups) có thể đọc tất cả xe mà không ảnh hưởng nhau.
Service Bus thì giống bãi đỗ xe có attendant - xe đến, attendant đưa cho người xử lý, xe không bao giờ đến nơi khác trước khi được confirm xử lý.
Kết hợp cả hai: Highway để nhận nhanh → Bãi đỗ xe để xử lý an toàn.
Code minh họa: End-to-end Order Pipeline
Bước 1: Publish event khi nhận order
public class OrderController : ControllerBase
{
private readonly EventHubProducerClient _eventHubClient;
[HttpPost("orders")]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
// Validate và persist order với status "Pending"
var order = await _orderRepository.CreateAsync(new Order
{
Id = Guid.NewGuid(),
CustomerId = request.CustomerId,
Items = request.Items,
Status = OrderStatus.Pending,
CreatedAt = DateTime.UtcNow
});
// Publish event ngay lập tức - non-blocking
var orderEvent = new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
CreatedAt = order.CreatedAt
};
var eventData = new EventData(
BinaryData.FromObjectAsJson(orderEvent));
// Partition by CustomerId để đảm bảo sequential processing per customer
await _eventHubClient.SendAsync(
new[] { eventData },
new SendEventOptions { PartitionKey = order.CustomerId.ToString() }
);
// Return ngay - không chờ processing hoàn thành
return Accepted(new { orderId = order.Id, status = "Pending" });
}
}
Bước 2: Consumer xử lý từng event
public class OrderEventProcessor : BackgroundService
{
private readonly EventProcessorClient _processor;
private readonly IOrderProcessingService _orderService;
private readonly ILogger<OrderEventProcessor> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessEventAsync += ProcessEventAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync(stoppingToken);
try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
finally
{
await _processor.StopProcessingAsync();
}
}
private async Task ProcessEventAsync(ProcessEventArgs args)
{
var orderEvent = args.Data.EventBody.ToObjectFromJson<OrderCreatedEvent>();
try
{
_logger.LogInformation("Processing order {OrderId}", orderEvent.OrderId);
// Idempotency check - Event Hub có thể deliver at-least-once
if (await _orderService.IsAlreadyProcessedAsync(orderEvent.OrderId))
{
_logger.LogWarning("Order {OrderId} already processed, skipping", orderEvent.OrderId);
await args.UpdateCheckpointAsync();
return;
}
// Process: validate inventory, charge payment, update status
await _orderService.ProcessOrderAsync(orderEvent.OrderId);
// Checkpoint SAU khi process thành công
await args.UpdateCheckpointAsync();
}
catch (InventoryException ex)
{
// Business error - gửi notification, update status, checkpoint
await _orderService.MarkOrderFailedAsync(orderEvent.OrderId, ex.Message);
await args.UpdateCheckpointAsync(); // Vẫn checkpoint để không retry
_logger.LogWarning("Order {OrderId} failed - inventory: {Message}", orderEvent.OrderId, ex.Message);
}
catch (Exception ex)
{
// Infrastructure error - KHÔNG checkpoint → retry tự động
_logger.LogError(ex, "Failed to process order {OrderId}, will retry", orderEvent.OrderId);
// Không gọi UpdateCheckpointAsync() → Event Hub sẽ retry từ checkpoint cũ
}
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Event Hub error on partition {PartitionId}", args.PartitionId);
return Task.CompletedTask;
}
}
Bước 3: Idempotency - Quan trọng nhất
public class OrderProcessingService : IOrderProcessingService
{
private readonly IOrderRepository _orderRepository;
private readonly IDistributedCache _cache;
public async Task<bool> IsAlreadyProcessedAsync(Guid orderId)
{
// Check cache first (fast)
var cacheKey = $"order-processed:{orderId}";
if (await _cache.GetAsync(cacheKey) != null)
return true;
// Check database (authoritative)
var order = await _orderRepository.GetByIdAsync(orderId);
return order?.Status != OrderStatus.Pending;
}
public async Task ProcessOrderAsync(Guid orderId)
{
var order = await _orderRepository.GetByIdAsync(orderId);
if (order.Status != OrderStatus.Pending)
return; // Already processed by another consumer
// Use optimistic concurrency to prevent race conditions
await using var transaction = await _orderRepository.BeginTransactionAsync();
try
{
// Update with version check
var updated = await _orderRepository.UpdateStatusWithVersionCheckAsync(
orderId, OrderStatus.Pending, OrderStatus.Processing, order.Version);
if (!updated)
{
// Another consumer already picked this up
return;
}
await _inventoryService.DeductAsync(order);
await _paymentService.ChargeAsync(order);
await _orderRepository.UpdateStatusAsync(orderId, OrderStatus.Confirmed);
await transaction.CommitAsync();
// Cache processed status
await _cache.SetAsync($"order-processed:{orderId}",
new byte[] { 1 },
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24) });
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
Best practices từ kinh nghiệm production
1. Partition key strategy là critical.
Dùng customerId làm partition key đảm bảo orders của cùng một customer được xử lý theo thứ tự. Tránh dùng orderId (random → uneven distribution không giúp gì).
2. Checkpoint sau khi process, không trước.
Nếu checkpoint trước process rồi process fail → event bị bỏ. Checkpoint sau process → at-least-once delivery, nhưng idempotency check handle được.
3. Separate business errors từ infrastructure errors.
Business error (hết hàng, card declined) → checkpoint, gửi notification, đừng retry mãi.
Infrastructure error (DB timeout, network) → không checkpoint, để Event Hub retry.
4. Monitor lag.
Consumer group lag = số events chưa processed. Nếu lag tăng nhanh → processing không kịp throughput. Alert khi lag > threshold.
Kết
Azure Event Hub + idempotent consumer là một pattern rất solid cho high-throughput order processing. Tôi đã dùng nó trong production với throughput vài ngàn events/giây - stable, scalable, và recovery tốt khi có sự cố.
Cái quan trọng nhất không phải là code - mà là hiểu rõ delivery semantics (at-least-once) và thiết kế idempotency từ đầu.
Tham khảo
- Azure Event Hub documentation: learn.microsoft.com/azure/event-hubs
- Event Hub .NET SDK: github.com/Azure/azure-sdk-for-net
/Son Do - believe in basic
#1percentbetter #Azure #EventHub #ecommerce #dotnet #CloudArchitecture