WEBSITE ĐANG PHÁT TRIỂN

Azure Event Hub trong pipeline xử lý đơn hàng real-time

Azure Event Hub không phải message queue - đây là streaming platform. Sự khác biệt này ảnh hưởng đến mọi design decision. Bài này là architecture và code cho order processing pipeline dùng Event Hub - từ kinh nghiệm thực tế.

Vấn đề

Peak traffic của một hệ thống e-commerce là unpredictable. Bình thường 100 orders/giờ, nhưng flash sale có thể là 10,000 orders trong 5 phút đầu.

Nếu order processing pipeline xử lý synchronously, peak load sẽ:

  • Overload database với thousands of concurrent writes
  • Khiến checkout timeout → frustrated customers
  • Potential data corruption nếu không có proper transaction handling

Giải pháp: Decouple order intake từ order processing bằng event streaming.


Event Hub vs Service Bus - Chọn cái nào?

Đây là câu hỏi tôi thấy nhiều người nhầm.

Khía cạnh Azure Service Bus Azure Event Hub
Pattern Message queue Event streaming
Message ordering Per session Per partition
Message retention Đến khi consumed Time-based (1-7 days)
Replay Không Có (seek by offset)
Throughput ~10K msg/s ~1M events/s per partition
Use case Task queue, RPC Telemetry, audit log, stream processing
Dead letter queue Built-in Phải implement

Với order processing pipeline: Tôi dùng cả hai - pattern phổ biến:

  • Event Hub: Ingest high-volume events (order created, payment confirmed, inventory deducted)
  • Service Bus: Process each step sequentially với guaranteed delivery và dead-letter handling

Giải thích đơn giản: Event Hub như highway

Hãy nghĩ Event Hub như một highway nhiều làn (partitions). Xe (events) vào highway theo partition key (ví dụ: order_id). Xe trong cùng một làn đi theo thứ tự. Nhiều "đội xe cứu hộ" (consumer groups) có thể đọc tất cả xe mà không ảnh hưởng nhau.

Service Bus thì giống bãi đỗ xe có attendant - xe đến, attendant đưa cho người xử lý, xe không bao giờ đến nơi khác trước khi được confirm xử lý.

Kết hợp cả hai: Highway để nhận nhanh → Bãi đỗ xe để xử lý an toàn.


Code minh họa: End-to-end Order Pipeline

Bước 1: Publish event khi nhận order

public class OrderController : ControllerBase
{
    private readonly EventHubProducerClient _eventHubClient;

    [HttpPost("orders")]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
    {
        // Validate và persist order với status "Pending"
        var order = await _orderRepository.CreateAsync(new Order
        {
            Id = Guid.NewGuid(),
            CustomerId = request.CustomerId,
            Items = request.Items,
            Status = OrderStatus.Pending,
            CreatedAt = DateTime.UtcNow
        });

        // Publish event ngay lập tức - non-blocking
        var orderEvent = new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.TotalAmount,
            CreatedAt = order.CreatedAt
        };

        var eventData = new EventData(
            BinaryData.FromObjectAsJson(orderEvent));

        // Partition by CustomerId để đảm bảo sequential processing per customer
        await _eventHubClient.SendAsync(
            new[] { eventData },
            new SendEventOptions { PartitionKey = order.CustomerId.ToString() }
        );

        // Return ngay - không chờ processing hoàn thành
        return Accepted(new { orderId = order.Id, status = "Pending" });
    }
}

Bước 2: Consumer xử lý từng event

public class OrderEventProcessor : BackgroundService
{
    private readonly EventProcessorClient _processor;
    private readonly IOrderProcessingService _orderService;
    private readonly ILogger<OrderEventProcessor> _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessEventAsync += ProcessEventAsync;
        _processor.ProcessErrorAsync += ProcessErrorAsync;

        await _processor.StartProcessingAsync(stoppingToken);

        try
        {
            await Task.Delay(Timeout.Infinite, stoppingToken);
        }
        finally
        {
            await _processor.StopProcessingAsync();
        }
    }

    private async Task ProcessEventAsync(ProcessEventArgs args)
    {
        var orderEvent = args.Data.EventBody.ToObjectFromJson<OrderCreatedEvent>();

        try
        {
            _logger.LogInformation("Processing order {OrderId}", orderEvent.OrderId);

            // Idempotency check - Event Hub có thể deliver at-least-once
            if (await _orderService.IsAlreadyProcessedAsync(orderEvent.OrderId))
            {
                _logger.LogWarning("Order {OrderId} already processed, skipping", orderEvent.OrderId);
                await args.UpdateCheckpointAsync();
                return;
            }

            // Process: validate inventory, charge payment, update status
            await _orderService.ProcessOrderAsync(orderEvent.OrderId);

            // Checkpoint SAU khi process thành công
            await args.UpdateCheckpointAsync();
        }
        catch (InventoryException ex)
        {
            // Business error - gửi notification, update status, checkpoint
            await _orderService.MarkOrderFailedAsync(orderEvent.OrderId, ex.Message);
            await args.UpdateCheckpointAsync(); // Vẫn checkpoint để không retry
            _logger.LogWarning("Order {OrderId} failed - inventory: {Message}", orderEvent.OrderId, ex.Message);
        }
        catch (Exception ex)
        {
            // Infrastructure error - KHÔNG checkpoint → retry tự động
            _logger.LogError(ex, "Failed to process order {OrderId}, will retry", orderEvent.OrderId);
            // Không gọi UpdateCheckpointAsync() → Event Hub sẽ retry từ checkpoint cũ
        }
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, "Event Hub error on partition {PartitionId}", args.PartitionId);
        return Task.CompletedTask;
    }
}

Bước 3: Idempotency - Quan trọng nhất

public class OrderProcessingService : IOrderProcessingService
{
    private readonly IOrderRepository _orderRepository;
    private readonly IDistributedCache _cache;

    public async Task<bool> IsAlreadyProcessedAsync(Guid orderId)
    {
        // Check cache first (fast)
        var cacheKey = $"order-processed:{orderId}";
        if (await _cache.GetAsync(cacheKey) != null)
            return true;

        // Check database (authoritative)
        var order = await _orderRepository.GetByIdAsync(orderId);
        return order?.Status != OrderStatus.Pending;
    }

    public async Task ProcessOrderAsync(Guid orderId)
    {
        var order = await _orderRepository.GetByIdAsync(orderId);

        if (order.Status != OrderStatus.Pending)
            return; // Already processed by another consumer

        // Use optimistic concurrency to prevent race conditions
        await using var transaction = await _orderRepository.BeginTransactionAsync();

        try
        {
            // Update with version check
            var updated = await _orderRepository.UpdateStatusWithVersionCheckAsync(
                orderId, OrderStatus.Pending, OrderStatus.Processing, order.Version);

            if (!updated)
            {
                // Another consumer already picked this up
                return;
            }

            await _inventoryService.DeductAsync(order);
            await _paymentService.ChargeAsync(order);
            await _orderRepository.UpdateStatusAsync(orderId, OrderStatus.Confirmed);

            await transaction.CommitAsync();

            // Cache processed status
            await _cache.SetAsync($"order-processed:{orderId}",
                new byte[] { 1 },
                new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24) });
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Best practices từ kinh nghiệm production

1. Partition key strategy là critical.

Dùng customerId làm partition key đảm bảo orders của cùng một customer được xử lý theo thứ tự. Tránh dùng orderId (random → uneven distribution không giúp gì).

2. Checkpoint sau khi process, không trước.

Nếu checkpoint trước process rồi process fail → event bị bỏ. Checkpoint sau process → at-least-once delivery, nhưng idempotency check handle được.

3. Separate business errors từ infrastructure errors.

Business error (hết hàng, card declined) → checkpoint, gửi notification, đừng retry mãi.

Infrastructure error (DB timeout, network) → không checkpoint, để Event Hub retry.

4. Monitor lag.

Consumer group lag = số events chưa processed. Nếu lag tăng nhanh → processing không kịp throughput. Alert khi lag > threshold.


Kết

Azure Event Hub + idempotent consumer là một pattern rất solid cho high-throughput order processing. Tôi đã dùng nó trong production với throughput vài ngàn events/giây - stable, scalable, và recovery tốt khi có sự cố.

Cái quan trọng nhất không phải là code - mà là hiểu rõ delivery semantics (at-least-once) và thiết kế idempotency từ đầu.

Tham khảo

  • Azure Event Hub documentation: learn.microsoft.com/azure/event-hubs
  • Event Hub .NET SDK: github.com/Azure/azure-sdk-for-net

/Son Do - believe in basic

#1percentbetter #Azure #EventHub #ecommerce #dotnet #CloudArchitecture


Bài viết liên quan

Xem thêm
E-commerce & Search Systems

Caching strategy cho product catalog - invalidation mới là bài toán khó

Cache thì dễ thêm vào. Nhưng khi product price thay đổi lúc 11:59 PM trước flash sale lúc 12:00 AM, bạn mới biết cache invalidation khó như thế nào. Bài này đi sâu vào các pattern thực tế và code C# cho product catalog. Phil Karlton có câu nói nổi tiếng: "There are only two hard things in Computer Science: cache invalidation and naming things." Tôi thêm vào: cache invalidation trong e-commerce còn khó hơn cache invalidation ở chỗ khác. Vì trong e-commerce, data thay đổi liên tục - price, stock, promotion - và mỗi inconsistency đều có thể cost bạn money (hoặc cost khách hàng).

E-commerce & Search Systems

Search relevance: tại sao người dùng tìm 'áo đỏ' lại ra 'váy xanh

Search relevance không phải là "tìm từ nào match từ đó". Đằng sau một kết quả tìm kiếm là cả một hệ thống scoring phức tạp - và nếu không hiểu nó, bạn sẽ cứ nhận complaint "search dở" mà không biết fix ở đâu.