Mensageria Transacional no .NET: Integrando o Padrão Outbox do Brighter com SQL Server e RabbitMQ
Rafael Andrade

Rafael Andrade @actor-dev

About: I'm Rafael the Actor Dev and I like to talk about Actor Models, Brighter, Elixir & Design Patterns Eu me chamo Rafael o Actor Dev e eu gosto de falar sobre Actor Models, Brighter, Elixir & Design Pat

Location:
London, UK
Joined:
Jan 24, 2025

Mensageria Transacional no .NET: Integrando o Padrão Outbox do Brighter com SQL Server e RabbitMQ

Publish Date: Jun 2
0 0

Introdução

No artigo anterior, abordamos os conceitos básicos do padrão Outbox, que é uma estratégia usada em sistemas distribuídos para garantir que operações de banco de dados e mensageria sejam tratadas de forma consistente. Vimos como ele evita inconsistências causadas por falhas durante a publicação de eventos ou comandos, armazenando as mensagens em uma tabela de outbox antes de enviá-las ao broker de mensagens (ex: RabbitMQ).

Projeto

A ideia principal é enviar um comando para criar um pedido (CreateNewOrder). Ao criar o pedido, serão publicados dois eventos: OrderPlaced e OrderPaid. Em caso de falha, nenhuma mensagem deve ser enviada.

Requisitos

Mensagens

Para este projeto, precisaremos das seguintes mensagens:

  • CreateNewOrder (comando)
  • OrderPlaced (evento)
  • OrderPaid (evento)
public class CreateNewOrder : Command(Guid.NewGuid())
{
    public decimal Value { get; set; }
}

public class OrderPlaced : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Value { get; set; }
}

public class OrderPaid : Event(Guid.NewGuid())
{
    public string OrderId { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Mapeadores de Mensagem

Como apenas os eventos OrderPlaced e OrderPaid são publicados no RabbitMQ, implementamos mapeadores usando serialização JSON:

public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
    public Message MapToMessage(OrderPlaced request)
    {
        var header = new MessageHeader
        {
            Id = request.Id,
            TimeStamp = DateTime.UtcNow,
            Topic = "order-placed",
            MessageType = MessageType.MT_EVENT
        };

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPlaced MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
    }
}

public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
    public Message MapToMessage(OrderPaid request)
    {
        var header = new MessageHeader
        {
            Id = request.Id,
            TimeStamp = DateTime.UtcNow,
            Topic = "order-paid",
            MessageType = MessageType.MT_EVENT
        };

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public OrderPaid MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
    }
}
Enter fullscreen mode Exit fullscreen mode

Manipuladores de Requisições

Vamos registrar logs para os eventos OrderPlaced e OrderPaid.

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
    public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} foi registrado com valor {OrderValue}", command.OrderId, command.Value);
        return base.HandleAsync(command, cancellationToken);
    }
}

public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
    public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
    {
        logger.LogInformation("{OrderId} foi pago", command.OrderId);
        return base.HandleAsync(command, cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

Criação do Pedido

O manipulador CreateNewOrderHandler vai esperar 10ms para simular um processo, publicar OrderPlaced, verificar se o valor é divisível por 3 (simulando uma regra de negócio), e publicar OrderPaid somente em casos válidos.

public class CreateNewOrderHandler(
    IAmACommandProcessor commandProcessor,
    IUnitOfWork unitOfWork,
    ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
    public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
    {
        await unitOfWork.BeginTransactionAsync(cancellationToken);

        try
        {
            string id = Guid.NewGuid().ToString();
            logger.LogInformation("Criando novo pedido: {OrderId}", id);
            await Task.Delay(10, cancellationToken); // Simula um processo

            _ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);

            if (command.Value % 3 == 0)
            {
                throw new InvalidOperationException("valor inválido");
            }

            _ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);
            await unitOfWork.CommitAsync(cancellationToken);

            return await base.HandleAsync(command, cancellationToken);
        }
        catch
        {
            logger.LogError("Dados inválidos");
            await unitOfWork.RollbackAsync(cancellationToken);
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Entendendo o Funcionamento

  • O IUnitOfWork compartilha a transação SQL do Brighter, garantindo atomicidade (persistência do pedido + gravação no outbox).
  • Os eventos só são publicados se a transação for confirmada.

Configuração do Microsoft SQL Server

Para integrar o padrão Outbox com o SQL Server, primeiro crie a tabela OutboxMessages.

1. Esquema da Tabela SQL

IF OBJECT_ID('OutboxMessages', 'U') IS NULL
BEGIN 
    CREATE TABLE [OutboxMessages]
    (
        [Id] [BIGINT] NOT NULL IDENTITY,
        [MessageId] UNIQUEIDENTIFIER NOT NULL,
        [Topic] NVARCHAR(255) NULL,
        [MessageType] NVARCHAR(32) NULL,
        [Timestamp] DATETIME NULL,
        [CorrelationId] UNIQUEIDENTIFIER NULL,
        [ReplyTo] NVARCHAR(255) NULL,
        [ContentType] NVARCHAR(128) NULL,  
        [Dispatched] DATETIME NULL,
        [HeaderBag] NTEXT NULL,
        [Body] NTEXT NULL,
        PRIMARY KEY ([Id])
    );
END
Enter fullscreen mode Exit fullscreen mode

Registro de Dependências

Registre o outbox e a conexão transacional no contêiner de injeção de dependência:

services
    .AddServiceActivator(opt => { /* Configuração de inscrição */ })
    .UseMsSqlOutbox(new MsSqlConfiguration(ConnectionString, "OutboxMessages"), typeof(SqlConnectionProvider))
    .UseMsSqlTransactionConnectionProvider(typeof(SqlConnectionProvider))
    .UseOutboxSweeper(opt => opt.BatchSize = 10);
Enter fullscreen mode Exit fullscreen mode

Explicação Técnica

  • UseMsSqlOutbox: Associa o outbox ao SQL Server.
  • UseOutboxSweeper: Configura a verificação periódica de mensagens não entregues.

Gerenciamento de Transações

Para garantir atomicidade entre lógica de negócio e publicação de mensagens no Brighter, implementamos IMsSqlTransactionConnectionProvider e IUnitOfWork, compartilhando o mesmo contexto transacional.

a. SqlConnectionProvider

public class SqlConnectionProvider(SqlUnitOfWork sqlConnection) : IMsSqlTransactionConnectionProvider
{
    private readonly SqlUnitOfWork _sqlConnection = sqlConnection;

    public SqlConnection GetConnection()
    {
        return _sqlConnection.Connection;
    }

    public Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
    {
        return Task.FromResult(_sqlConnection.Connection);
    }

    public SqlTransaction? GetTransaction()
    {
        return _sqlConnection.Transaction;
    }

    public bool HasOpenTransaction => _sqlConnection.Transaction != null;
    public bool IsSharedConnection => true;
}
Enter fullscreen mode Exit fullscreen mode

b. Interface IUnitOfWork

public interface IUnitOfWork
{
    Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
    Task CommitAsync(CancellationToken cancellationToken);
    Task RollbackAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

c. Implementação do SqlUnitOfWork

public class SqlUnitOfWork(MsSqlConfiguration configuration) : IUnitOfWork
{
    public SqlConnection Connection { get; } = new(configuration.ConnectionString);
    public SqlTransaction? Transaction { get; private set; }

    public async Task BeginTransactionAsync(CancellationToken cancellationToken,
        IsolationLevel isolationLevel = IsolationLevel.Serializable)
    {
        if (Transaction == null)
        {
            if (Connection.State != ConnectionState.Open)
            {
                await Connection.OpenAsync(cancellationToken);
            }
            Transaction = Connection.BeginTransaction(isolationLevel);
        }
    }

    public async Task CommitAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.CommitAsync(cancellationToken);
        }
    }

    public async Task RollbackAsync(CancellationToken cancellationToken)
    {
        if (Transaction != null)
        {
            await Transaction.RollbackAsync(cancellationToken);
        }
    }

    public async Task<SqlCommand> CreateSqlCommandAsync(string sql, SqlParameter[] parameters, CancellationToken cancellationToken)
    {
        if (Connection.State != ConnectionState.Open)
        {
            await Connection.OpenAsync(cancellationToken);
        }

        SqlCommand command = Connection.CreateCommand();

        if (Transaction != null)
        {
            command.Transaction = Transaction;
        }

        command.CommandText = sql;

        if (parameters.Length > 0)
        {
            command.Parameters.AddRange(parameters);
        }

        return command;
    }
}
Enter fullscreen mode Exit fullscreen mode

d. Registro no DI

services
    .AddScoped<SqlUnitOfWork, SqlUnitOfWork>()
    .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Enter fullscreen mode Exit fullscreen mode

Conclusão

Ao implementar o padrão Outbox com o Brighter e SQL Server, demonstramos como garantir consistência transacional entre atualizações no banco de dados e publicação de mensagens. Este exemplo mostrou que:

  1. Mensagens só são publicadas se a transação for bem-sucedida:

    • Usando DepositPostAsync, mensagens como OrderPlaced e OrderPaid são armazenadas na tabela OutboxMessages dentro da mesma transação dos dados de negócio.
    • Se o manipulador falhar (ex: erro simulado), a transação é revertida e nenhuma mensagem órfã é enviada.
    • O IMsSqlTransactionConnectionProvider garante que atualizações no banco e registros no outbox compartilhem a mesma transação.
  2. Tolerância a Falhas via Sweeper do Outbox:

    • O UseOutboxSweeper verifica periodicamente por mensagens não entregues e tenta reenviá-las até que o RabbitMQ confirme o recebimento.
    • Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo em falhas temporárias do broker.
  3. Arquitetura Desacoplada:

    • A aplicação se concentra em transações locais.
    • O Brighter trata a entrega de mensagens assincronamente, simplificando a escalabilidade e evitando acoplamento com a infraestrutura de mensageria.

Esta abordagem mostra como o Brighter abstrai complexidade, permitindo que desenvolvedores foquem na lógica de negócio enquanto garante confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com:

  • Ferramentas de monitoramento (ex: Prometheus)
  • Filas de Cartas Mortas (DLQ) para tratar mensagens problemáticas
  • Índices na tabela de outbox nas colunas Dispatched e Timestamp para melhorar performance.

Referências

Comments 0 total

    Add comment