Yazılım

.NET Microservice’lerinde Outbox Pattern’ı ile Eventual Consistency için Atomicity Sağlama

Bildiğimiz gibi microservice architecture’ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı zorlayıcı noktaları da bulunmakta. Özellikle data consistency tarafında. Örneğin, monolithic dünyada birden fazla işlemi consistent bir şekilde kolayca gerçekleştirebilmek için, ACID database transaction’larından yararlanabilmekteyiz. Fakat microservice dünyasında işlemlerin farklı service’ler tarafından distributed olarak ele alınması gerektiği için, maalesef işler monolithic dünyası kadar kolay olmamaktadır ve data consistency’i genellikle event’ler aracılığıyla eventual olarak sağlamamız gerekmektedir.

Ayrıca long-running business işlemlerini de consistent bir şekilde gerçekeştirebilmek için, Saga gibi pattern’lerden yararlanabilmekteyiz. Yani her bir microservice’in bireysel olarak sorumluluklarını kendi database’inde yerine getirdikten sonra, diğer microservice’lerin de ilgili sorumluluklarını yerine getirebilmeleri için bir event publish etmesi veya bir orchestrator tarafından ilgili business sürecinin sırasıyla koordine edilmesi şeklinde.

Bu transaction management konusunu daha önce farklı makaleler altında, farklı senaryolar ile ele almıştım. Örneğin “.NET Microservice’lerinde Choreography-based Saga” makalesinde, saga pattern’ini choreography-based olarak kolayca nasıl implemente edebileceğimizden bahsetmiştim. Ek olarak event’ler de resiliency’nin büyük önem taşıdığını ve bu sebeple outbox gibi pattern’ler den yararlanabileceğimizden de bahsetmiştim. Dolayısıyla bu makaleyi de saga makalesinin devamı niteliğinde düşünebiliriz. Ayrıca saga makalesindeki aynı örnek senaryo üzerinden ilerleyeceğim. Bu nedenle konuyu daha iyi anlayabilmek adına, öncelikle bu makaleye hızlı bir göz atmak, faydalı olacaktır.

Distributed dünyada söz konusu business process’lerinin bütünlüğü / data’nın farklı noktalarda consistent ve reliable olması olduğunda, event’lerin resilient bir şekilde publish edilebilmeleri oldukça büyük bir öbnem taşımaktadır.

Örneğin yukarıdaki senaryoda asynchronous bir satın alma süreci bulunmaktadır (saga makaleside kullandığım senaryo). Her bir service ilgili işlemini gerçekleştirdikten sonra, message queue’ya bir event publish etmektedir ve bir sonraki süreci tetiklemektedir.

Peki, ilgili service kendi işlemini kendi database’inde gerçekleştirdikten sonra, ilgili event’ini herhangi bir nedenden dolayı publish edemezse? Örneğin “StockReservedEvent” inin publish edildiğini ve ödeme işlemlerinin “PaymentService” i tarafından gerçekleştirdiğini fakat “PaymentCompletedEvent” ini publish edemediğini düşünelim. Elbette geçici network kaynaklı hataları çeşitli retry mekanizmaları ile ele alabiliriz. Fakat message broker’ın down olduğu gibi farklı senaryolarda ise ilgili event’leri kaybediyor olacağız. Özellikle ödeme işlemi gibi önemli noktalarda inconsistent bir data’ya sahip olmak, sanırım hiç birimizin istemeyeceği bir durum.

Kısacası kritik business process’leri üzerinde çalışıyorsak, tahmin edebileceğimiz gibi data’nın consistent ve accurate durumda olması oldukça önem arz etmektedir ve dolayısıyla ilgili event’lerin atomic tarzda publish edilmeleri faydamıza olacaktır.

Outbox Pattern

Outbox pattern’ı kısaca bizlere event’lerimizi reliable bir şekilde publish edebileceğimiz bir yaklaşım sunmaktadır. Bu yaklaşımda event’leri doğrudan publish etmek yerine, onları ilgili microservice’in database’i içerisinde ilgili business process’inin bir parçası olarak atomic bir şekilde outbox table’ına kaydetmemiz ve daha sonra bir başka service aracılığı ile kaydetmiş olduğumuz event’leri publish etmemiz gerekmektedir. Böylelikle message broker down olsa bile, ilgili data’nın state’ini ve event’leri kaybetmemiş ve eventual consistency’nin reliable ve durable bir şekilde çalışabilmesini sağlamış olacağız.

NOT: Outbox pattern’ı “at-least-once” delivery yaklaşımıyla çalıştığı için, event’leri outbox table’ından publish edeceğimiz zaman da duplication’lar gerçekleşebilir. Dolayısıyla consumer’ların duplication’ı handle edebilmeleri için, event’lerin içerisine bir identifier dahil etmemiz ve olabildiğince idempotent olabilmelerine özen göstermemiz gerekmektedir.

Tech stack’e bağlı olarak bu pattern’ı uygulamanın farklı yolları bulunmakta. Örneğin message broker olarak Kafka kullanıyorsak, Debezium ile kolay bir şekilde log-based change data capturing işlemi yapabilir ve ilgili değişiklikleri kafka tarafına stream edebiliriz. Farklı bir broker ile ise service bus olarak NServiceBusMassTransit veya Cap kullanıyorsak, onların da sunmuş olduğu outbox özelliklerinden faydalanabilir veya manuel olarak kendimiz de implemente edebiliriz.

Bu makale kapsamında ise implementation işlemini, saga makalesinde kullanmış olduğum asynchronous satın alma süreci örnek senaryosu üzerinde manuel olarak gerçekleştireceğiz.

Implemente Edelim

Outbox pattern’ının implementation işlemini manuel olarak gerçekleştireceğiz çünkü gerçek dünyada herhangi bir ihtiyaç karşısında bu özelliği destekleyen service’bus’ları kullanmıyor olabiliriz veya farklı bir message broker kullanıyor olabiliriz.

Geçtiğimiz sene üzerinde çalıştığım özel bir projede benzer bir ihtiyaçla karşılaşmıştım. Bizim için kritik sayılacak bir noktada nadirende olsa publish olamayan event’lerden dolayı, müşterilerimiz satın aldıkları online hizmetlere maalesef erişememişlerdi. Message broker olarak RabbitMQ ve service bus olarak da EasyNetQ kullandığımız için ödeme işlemleri tarafında outbox pattern’ını manuel olarak implement etme ihtiyacım oluşmuştu.

Şimdi ilk olarak örnek projeye buradan erişim sağlayalım. Payment Service microservice’i içerisinde ödeme işlemlerini gerçekleştikten sonra, PaymentCompletedEvent ‘ini atomic bir şekilde publish etmek istediğimizi varsayalım. Çünkü daha önce aşağıdaki gibi ödeme işlemleri başarıyla gerçekleştikten sonra, direkt olarak ilgili event’i publish ediyorduk.

public class StocksReservedEventConsumer : IConsumeAsync<StocksReservedEvent>
{
    private readonly IPaymentService _paymentService;
    private readonly IBus _bus;

    public StocksReservedEventConsumer(IPaymentService paymentService, IBus bus)
    {
        _paymentService = paymentService;
        _bus = bus;
    }

    public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default)
    {   
        Tuple<bool, string> isPaymentCompleted = await _paymentService.DoPaymentAsync(message.WalletId, message.UserId, message.TotalAmount);

        if (isPaymentCompleted.Item1)
        {
            await _bus.PubSub.PublishAsync(new PaymentCompletedEvent
            {
                OrderId = message.OrderId
            });
        }
        else
        {
            await _bus.PubSub.PublishAsync(new PaymentRejectedEvent
            {
                OrderId = message.OrderId,
                Reason = isPaymentCompleted.Item2
            });
        }
    }
}

İlk olarak “PaymentService.Infra” (.NET 5.0) adında bir class library oluşturalım ve ardından aşağıdaki paketleri NuGet üzerinden dahil edelim.

Microsoft.EntityFrameworkCore - 5.0.0
Microsoft.EntityFrameworkCore.SqlServer - 5.0.0

Şimdi event’lerimizi store edeceğimiz outbox table’ını, aşağıdaki gibi “Models” adında bir klasör oluşturarak içerisinde tanımlayalım.

using System;

namespace PaymentService.Infra.Models
{
	public class OutboxMessage
	{
		public OutboxMessage()
		{
			EventDate = DateTime.UtcNow;
		}
		public int Id { get; set; }
		public string EventType { get; set; }
		public string EventPayload { get; set; }
		public DateTime EventDate { get; set; }
		public bool IsSent { get; set; }
		public DateTime? SentDate { get; set; }
	}
}
CREATE TABLE OutboxMessages
(
	Id int IDENTITY PRIMARY KEY,
	EventType nvarchar(255) NOT NULL,
	EventPayload nvarchar(Max) NOT NULL,
	EventDate datetime NOT NULL,
	IsSent bit NOT NULL,
	SentDate datetime
);

Daha önce örnek projeyi geliştirirken ana konuya odaklanabilmek için bazı noktaları es geçmiştik ve ödeme işlemlerini gerçekleştiren “DoPaymentAsync” method’unu implemente etmemiştik. Bu yüzden şimdi basit bir “Payment” modelini de aşağıdaki gibi projeye dahil edelim.

using System;

namespace PaymentService.Infra.Models
{
	public class Payment
	{
		public Payment()
		{
			PaymentDate = DateTime.UtcNow;
		}

		public int Id { get; set; }
		public int UserId { get; set; }
		public int OrderId { get; set; }
		public int WalletId { get; set; }
		public decimal TotalAmount { get; set; }
		public bool IsPaid { get; set; }
		public DateTime PaymentDate { get; set; }
	}
}
CREATE TABLE Payments
(
	Id int IDENTITY PRIMARY KEY,
	UserId int NOT NULL,
	OrderId int NOT NULL,
	WalletId int NOT NULL,
	TotalAmount decimal(18,4) NOT NULL,
	IsPaid bit NOT NULL,
	PaymentDate datetime NOT NULL
);

Ardından “AppDbContext” adında bir class oluşturarak içerisinde dbcontext’i de tanımlayalım.

using Microsoft.EntityFrameworkCore;
using PaymentService.Models;

namespace PaymentService.Infra
{
	public class AppDbContext : DbContext
	{
		public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
		{ }

		public DbSet<Payment> Payments { get; set; }
		public DbSet<OutboxMessage> OutboxEvents { get; set; }

		protected override void OnModelCreating(ModelBuilder builder)
		{
			builder.Entity<Payment>(entity =>
			{
				entity.HasKey(k => k.Id);
				entity.Property(p => p.TotalAmount).HasPrecision(18, 4);
				entity.ToTable("Payments");
			});

			builder.Entity<OutboxMessage>(entity =>
			{
				entity.HasKey(k => k.Id);
				entity.ToTable("OutboxMessages");
			});

			base.OnModelCreating(builder);
		}
	}
}

Şimdi “PaymentService.Infra” class library’sini Payment Service projesine referans olarak ekleyelim ve  “IPaymentService” interface’i içerisindeki “DoPaymentAsync” method’unun signature’ını da aşağıdaki gibi güncelleyelim.

using System.Threading.Tasks;

namespace PaymentService.Services
{
    public interface IPaymentService
    {
        Task DoPaymentAsync(int orderId, int walletId, int userId, decimal totalAmount);
    }
}

Ardından outbox implementasyonunu “PaymentService” class’ı içerisinde aşağıdaki gibi gerçekleştirelim.

using System.Threading.Tasks;
using Newtonsoft.Json;
using PaymentService.Infra;
using PaymentService.Infra.Models;
using Shared.Contracts;

namespace PaymentService.Services
{
	public class PaymentService : IPaymentService
	{
		private readonly AppDbContext _appDbContext;

		public PaymentService(AppDbContext appDbContext)
		{
			_appDbContext = appDbContext;
		}

		public async Task DoPaymentAsync(int orderId, int walletId, int userId, decimal totalAmount)
		{
			// after payment operation is done...

			var isPaid = true;

			var payment = new Payment
			{
				OrderId = orderId,
				WalletId = walletId,
				UserId = userId,
				TotalAmount = totalAmount,
				IsPaid = isPaid
			};

			await _appDbContext.Payments.AddAsync(payment);

			object paymentResultEvent;
			if (isPaid)
			{
				paymentResultEvent = new PaymentCompletedEvent
				{
					OrderId = orderId
				};
			}
			else
			{
				paymentResultEvent = new PaymentRejectedEvent
				{
					OrderId = orderId,
					Reason = ""
				};
			}

			var outboxMessage = new OutboxMessage
			{
				EventPayload = JsonConvert.SerializeObject(paymentResultEvent),
				EventType = paymentResultEvent.GetType().AssemblyQualifiedName
			};

			await _appDbContext.OutboxEvents.AddAsync(outboxMessage);

			await _appDbContext.SaveChangesAsync();
		}
	}
}

Gördüğümüz gibi ödeme işlemlerini gerçekleştikten sonra “Payment” kayıtı ile birlikte ilgili “PaymentCompletedEvent” ini de outbox table’ına aynı transaction altında ilgili business’ın bir parçası olarak kaydediyoruz. Böylelikle ilgili event’in herhangi bir hata karşısında dayanıklı olduğundan emin olacağız ve diğer ilgili domain’lerin de consistent bir state’e sahip olabilmelerini sağlayacağız.

Şimdi “StocksReservedEvent” ini consume ettiğimiz “StocksReservedEventConsumer” class’ına gidelim ve aşağıdaki gibi refacor edelim.

using System.Threading;
using System.Threading.Tasks;
using EasyNetQ.AutoSubscribe;
using PaymentService.Services;
using Shared.Contracts;

namespace PaymentService.Consumers
{
	public class StocksReservedEventConsumer : IConsumeAsync<StockReservedEvent>
	{
		private readonly IPaymentService _paymentService;

		public StocksReservedEventConsumer(IPaymentService paymentService)
		{
			_paymentService = paymentService;
		}

		public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default)
		{
			await _paymentService.DoPaymentAsync(message.OrderId, message.WalletId, message.UserId, message.TotalAmount);
		}
	}
}

Ardından “appsettings.json” file’ına oluşturmuş olduğumuz örnek “PaymentDB” nin connection string bilgilerini ekleyelim ve “Program.cs” class’ında gerekli injection işlemlerini de gerçekleştirelim.

"ConnectionStrings": {
  "PaymentDB": "Server=127.0.0.1;Database=PaymentDB;Trusted_Connection=true;MultipleActiveResultSets=true"
}
services.AddDbContext<AppDbContext>(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("PaymentDB")));

Böylelikle outbox pattern’ının ilk kısmını tamamlamış olduk.

Hızlı bir test yapabilmek için Order Service, Stock Service ve Payment Service’i çalıştıralım, ardından Order Service’in Swagger UI‘ı üzerinden bir test siparişi oluşturalım.

Gördüğümüz gibi “Payment” kayıtı ile birlikte, ilgili event kayıt’ı da “OutboxMessages” table’ına kayıt edilmiş durumda. Şimdi ise bu table’daki kayıtları dinleyecek ve publish edecek bir background service’ine ihtiyacımız var.

Bunun için solution içerisine “PaymentServiceOutboxWorker” adında ayrı bir console application’ı oluşturalım ve ardından “PaymentService.Infra” ve “Shared.Contracts” class library’lerini referans ekleyelim. Ayrıca NuGet üzerinden aşağıdaki paket’leri de projeye dahil edelim.

Microsoft.Extensions.Configuration.Json - 5.0.0
Microsoft.Extensions.DependencyInjection - 5.0.2
Microsoft.Extensions.Hosting - 5.0.0
EasyNetQ - 6.3.1

Şimdi proje içerisinde “Worker” adında bir class tanımlayalım ve aşağıdaki gibi code’layalım.

using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EasyNetQ;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using PaymentService.Infra;
using PaymentService.Infra.Models;

namespace PaymentServiceOutboxWorker
{
	public class Worker : BackgroundService
	{
		private readonly IServiceScopeFactory _scopeFactory;

		public Worker(IServiceScopeFactory scopeFactory)
		{
			_scopeFactory = scopeFactory;
		}

		protected override async Task ExecuteAsync(CancellationToken stoppingToken)
		{
			while (!stoppingToken.IsCancellationRequested)
			{
				await PublishOutboxMessages(stoppingToken);
			}
		}

		private async Task PublishOutboxMessages(CancellationToken stoppingToken)
		{
			try
			{
				using var scope = _scopeFactory.CreateScope();
				await using var appDbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();

				IBus bus = scope.ServiceProvider.GetRequiredService<IBus>();

				List<OutboxMessage> messages = appDbContext.OutboxMessages.Where(om => !om.IsSent).ToList();

				foreach (OutboxMessage outboxMessage in messages)
				{
					try
					{
						var messageType = Type.GetType(outboxMessage.EventType);
						var message = JsonConvert.DeserializeObject(outboxMessage.EventPayload, messageType!);

						await bus.PubSub.PublishAsync(message, messageType);

						outboxMessage.IsSent = true;
						outboxMessage.SentDate = DateTime.UtcNow;

						appDbContext.OutboxMessages.Update(outboxMessage);
						await appDbContext.SaveChangesAsync();
					}
					catch (Exception e)
					{
						Console.WriteLine(e);
					}
				}
			}
			catch (Exception e)
			{
				Console.WriteLine(e);
			}

			await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
		}
	}
}

Burada basit bir şekilde “BackgroundService” base class’ını kullanarak bir long-running background service’i tanımlıyoruz. Her 5 saniye de bir çalışacak olan bu service, outbox table’ını poll ederek publish edilmemiş olan event’lerin publish işlemlerini kesin bir şekilde gerçekleştirecektir.

Böylece bizim için kritik olan event’lerin ilgili business process’inin devam edebilmesi için kaybolmadan istenilen hedefe ulaştığından emin olabileceğiz.

Şimdi “Program.cs” class’ını aşağıdaki gibi güncelleyerek gerekli configuration ve injection işlemlerini gerçekleştirebiliriz.

using Microsoft.Extensions.Hosting;
using EasyNetQ;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using PaymentService.Infra;

namespace PaymentServiceOutboxWorker
{
	class Program
	{
		static void Main(string[] args)
		{
			CreateHostBuilder(args).Build().Run();
		}

		static IHostBuilder CreateHostBuilder(string[] args) =>
			Host.CreateDefaultBuilder(args)
				.ConfigureServices((hostContext, services) =>
				{
					services.AddDbContext<AppDbContext>(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("PaymentDB")));

					var bus = RabbitHutch.CreateBus(hostContext.Configuration["RabbitMQ:ConnectionString"]);

					services.AddSingleton<IBus>(bus);

					services.AddHostedService<Worker>();
				});
	}
}

Ardından “appsettings.json” dosyasını aşağıdaki gibi tanımlayabiliriz.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "RabbitMQ": {
    "ConnectionString": "host=localhost;username=guest;password=guest"
  },
  "ConnectionStrings": {
    "PaymentDB": "Server=127.0.0.1;Database=PaymentDB;Trusted_Connection=true;MultipleActiveResultSets=true"
  }
}

Hepsi bu kadar. “PaymentServiceOutboxWorker” service’ini çalıştırdığımızda event’lerin başarılı bir şekilde ilgili queue’ya publish edildiğini görebiliriz.

Bazı Noktalar

Bu pattern’ın “at-least-once” delivery yaklaşımıyla çalıştığından bahsetmiştik (örnek kod’dan da görebileceğimiz üzere). Yani ilgili event’lerin birden fazla publish edildiği senaryolar ile karşı karşıya kalabiliriz.

Outbox pattern’ı ile data bütünlüğünü sağlamaya çalışırken, diğer taraftan duplication’lar yaratarak sistemin ve ilgili business process’inin yine inconsistent data’ya sahip olmasına sebep olabiliriz. Bu sebeple olabildiğince event’lerimizin ve ilgili event-consumer’larının idempotent olabilmelerine özen göstermemiz gerekmektedir.

Örneğin event’lerin içerisine bir identifier dahil edebilir ve consumer tarafında her bir event’in kaydını database’de tutabiliriz. Böylece herhangi bir event’i işlemeden önce, daha önce işlenip işlenmediğinden emin olabiliriz. Inbox pattern olarak isimlendirilen bu yaklaşımla da kısaca exactly-once processing’i garanti altına alabilmekteyiz.

Ayrıca event ordering’e ihtiyaç duyulan noktalarda da, ilgili event’leri belirleyecek olduğumuz identifier’lara göre sıralayıp, işleye de biliriz.

Kısaca

Maalesef microservice dünyasında communication ve data bütünlüğü sorunlarını çözmek, bazen oldukça zahmetli ve dikkat gereken bir konu. Bazen bir event’in fail olması, sistemdeki tutarsızlık nedeniyle diğer business process’lerinin de fail olmasına neden olabilir. Bu sebeple kritik event’lerin atomic bir şekilde gönderilebilmeleri ve idempotent consumer’lar tarafından işlenebilmeleri oldukça önemli bir konu. Her ne kadar basit bir çözüm gibi görünse de outbox&inbox pattern’ları, bazen gereksiz bir karardan dolayı sistemimiz için bir overkill haline de gelebilirler. Bu yüzden ilgili business use-case’lerimiz için neyin doğru olacağına karar verip, onu uygulamamız en doğrusu olacaktır.

Referanslar

https://microservices.io/patterns/data/transactional-outbox.html

İlgili Makaleler

Bir Yorum

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

Başa dön tuşu