Introdução
No artigo anterior, abordamos os conceitos básicos do padrão Outbox, que é uma estratégia usada em sistemas distribuídos para garantir que operações de banco de dados e mensageria sejam tratadas de forma consistente. Vimos como ele evita inconsistências causadas por falhas durante a publicação de eventos ou comandos, armazenando as mensagens em uma tabela de outbox antes de enviá-las ao broker de mensagens (ex: RabbitMQ).
Projeto
A ideia principal é enviar um comando para criar um pedido (CreateNewOrder
). Ao criar o pedido, serão publicados dois eventos: OrderPlaced
e OrderPaid
. Em caso de falha, nenhuma mensagem deve ser enviada.
Requisitos
- .NET 8+
-
Podman (ou Docker) para executar containers locais:
- SQL Server
- RabbitMQ
- Conhecimento prévio sobre Brighter e RabbitMQ.
- Pacotes NuGet necessários:
Mensagens
Para este projeto, precisaremos das seguintes mensagens:
-
CreateNewOrder
(comando) -
OrderPlaced
(evento) -
OrderPaid
(evento)
public class CreateNewOrder : Command(Guid.NewGuid())
{
public decimal Value { get; set; }
}
public class OrderPlaced : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
}
Mapeadores de Mensagem
Como apenas os eventos OrderPlaced
e OrderPaid
são publicados no RabbitMQ, implementamos mapeadores usando serialização JSON:
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
public Message MapToMessage(OrderPlaced request)
{
var header = new MessageHeader
{
Id = request.Id,
TimeStamp = DateTime.UtcNow,
Topic = "order-placed",
MessageType = MessageType.MT_EVENT
};
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPlaced MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
}
}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
public Message MapToMessage(OrderPaid request)
{
var header = new MessageHeader
{
Id = request.Id,
TimeStamp = DateTime.UtcNow,
Topic = "order-paid",
MessageType = MessageType.MT_EVENT
};
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPaid MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
}
}
Manipuladores de Requisições
Vamos registrar logs para os eventos OrderPlaced
e OrderPaid
.
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} foi registrado com valor {OrderValue}", command.OrderId, command.Value);
return base.HandleAsync(command, cancellationToken);
}
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} foi pago", command.OrderId);
return base.HandleAsync(command, cancellationToken);
}
}
Criação do Pedido
O manipulador CreateNewOrderHandler
vai esperar 10ms para simular um processo, publicar OrderPlaced
, verificar se o valor é divisível por 3 (simulando uma regra de negócio), e publicar OrderPaid
somente em casos válidos.
public class CreateNewOrderHandler(
IAmACommandProcessor commandProcessor,
IUnitOfWork unitOfWork,
ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
{
await unitOfWork.BeginTransactionAsync(cancellationToken);
try
{
string id = Guid.NewGuid().ToString();
logger.LogInformation("Criando novo pedido: {OrderId}", id);
await Task.Delay(10, cancellationToken); // Simula um processo
_ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken);
if (command.Value % 3 == 0)
{
throw new InvalidOperationException("valor inválido");
}
_ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken);
await unitOfWork.CommitAsync(cancellationToken);
return await base.HandleAsync(command, cancellationToken);
}
catch
{
logger.LogError("Dados inválidos");
await unitOfWork.RollbackAsync(cancellationToken);
throw;
}
}
}
Entendendo o Funcionamento
- O
IUnitOfWork
compartilha a transação SQL do Brighter, garantindo atomicidade (persistência do pedido + gravação no outbox). - Os eventos só são publicados se a transação for confirmada.
Configuração do Microsoft SQL Server
Para integrar o padrão Outbox com o SQL Server, primeiro crie a tabela OutboxMessages
.
1. Esquema da Tabela SQL
IF OBJECT_ID('OutboxMessages', 'U') IS NULL
BEGIN
CREATE TABLE [OutboxMessages]
(
[Id] [BIGINT] NOT NULL IDENTITY,
[MessageId] UNIQUEIDENTIFIER NOT NULL,
[Topic] NVARCHAR(255) NULL,
[MessageType] NVARCHAR(32) NULL,
[Timestamp] DATETIME NULL,
[CorrelationId] UNIQUEIDENTIFIER NULL,
[ReplyTo] NVARCHAR(255) NULL,
[ContentType] NVARCHAR(128) NULL,
[Dispatched] DATETIME NULL,
[HeaderBag] NTEXT NULL,
[Body] NTEXT NULL,
PRIMARY KEY ([Id])
);
END
Registro de Dependências
Registre o outbox e a conexão transacional no contêiner de injeção de dependência:
services
.AddServiceActivator(opt => { /* Configuração de inscrição */ })
.UseMsSqlOutbox(new MsSqlConfiguration(ConnectionString, "OutboxMessages"), typeof(SqlConnectionProvider))
.UseMsSqlTransactionConnectionProvider(typeof(SqlConnectionProvider))
.UseOutboxSweeper(opt => opt.BatchSize = 10);
Explicação Técnica
-
UseMsSqlOutbox
: Associa o outbox ao SQL Server. -
UseOutboxSweeper
: Configura a verificação periódica de mensagens não entregues.
Gerenciamento de Transações
Para garantir atomicidade entre lógica de negócio e publicação de mensagens no Brighter, implementamos IMsSqlTransactionConnectionProvider
e IUnitOfWork
, compartilhando o mesmo contexto transacional.
a. SqlConnectionProvider
public class SqlConnectionProvider(SqlUnitOfWork sqlConnection) : IMsSqlTransactionConnectionProvider
{
private readonly SqlUnitOfWork _sqlConnection = sqlConnection;
public SqlConnection GetConnection()
{
return _sqlConnection.Connection;
}
public Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(_sqlConnection.Connection);
}
public SqlTransaction? GetTransaction()
{
return _sqlConnection.Transaction;
}
public bool HasOpenTransaction => _sqlConnection.Transaction != null;
public bool IsSharedConnection => true;
}
b. Interface IUnitOfWork
public interface IUnitOfWork
{
Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable);
Task CommitAsync(CancellationToken cancellationToken);
Task RollbackAsync(CancellationToken cancellationToken);
}
c. Implementação do SqlUnitOfWork
public class SqlUnitOfWork(MsSqlConfiguration configuration) : IUnitOfWork
{
public SqlConnection Connection { get; } = new(configuration.ConnectionString);
public SqlTransaction? Transaction { get; private set; }
public async Task BeginTransactionAsync(CancellationToken cancellationToken,
IsolationLevel isolationLevel = IsolationLevel.Serializable)
{
if (Transaction == null)
{
if (Connection.State != ConnectionState.Open)
{
await Connection.OpenAsync(cancellationToken);
}
Transaction = Connection.BeginTransaction(isolationLevel);
}
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.CommitAsync(cancellationToken);
}
}
public async Task RollbackAsync(CancellationToken cancellationToken)
{
if (Transaction != null)
{
await Transaction.RollbackAsync(cancellationToken);
}
}
public async Task<SqlCommand> CreateSqlCommandAsync(string sql, SqlParameter[] parameters, CancellationToken cancellationToken)
{
if (Connection.State != ConnectionState.Open)
{
await Connection.OpenAsync(cancellationToken);
}
SqlCommand command = Connection.CreateCommand();
if (Transaction != null)
{
command.Transaction = Transaction;
}
command.CommandText = sql;
if (parameters.Length > 0)
{
command.Parameters.AddRange(parameters);
}
return command;
}
}
d. Registro no DI
services
.AddScoped<SqlUnitOfWork, SqlUnitOfWork>()
.TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Conclusão
Ao implementar o padrão Outbox com o Brighter e SQL Server, demonstramos como garantir consistência transacional entre atualizações no banco de dados e publicação de mensagens. Este exemplo mostrou que:
-
Mensagens só são publicadas se a transação for bem-sucedida:
- Usando
DepositPostAsync
, mensagens comoOrderPlaced
eOrderPaid
são armazenadas na tabelaOutboxMessages
dentro da mesma transação dos dados de negócio. - Se o manipulador falhar (ex: erro simulado), a transação é revertida e nenhuma mensagem órfã é enviada.
- O
IMsSqlTransactionConnectionProvider
garante que atualizações no banco e registros no outbox compartilhem a mesma transação.
- Usando
-
Tolerância a Falhas via Sweeper do Outbox:
- O
UseOutboxSweeper
verifica periodicamente por mensagens não entregues e tenta reenviá-las até que o RabbitMQ confirme o recebimento. - Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo em falhas temporárias do broker.
- O
-
Arquitetura Desacoplada:
- A aplicação se concentra em transações locais.
- O Brighter trata a entrega de mensagens assincronamente, simplificando a escalabilidade e evitando acoplamento com a infraestrutura de mensageria.
Esta abordagem mostra como o Brighter abstrai complexidade, permitindo que desenvolvedores foquem na lógica de negócio enquanto garante confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com:
- Ferramentas de monitoramento (ex: Prometheus)
- Filas de Cartas Mortas (DLQ) para tratar mensagens problemáticas
-
Índices na tabela de outbox nas colunas
Dispatched
eTimestamp
para melhorar performance.