Caching in distributed applications can pose a complex problem. When multiple instances of an application are running simultaneously (typically websites or web services deployed in the cloud or web farms), it's crucial to ensure that the cache is appropriately invalidated for all application instances.
A common solution to this issue is the use of a centralized cache server (or a cluster of cache servers), such as a Redis server or a Redis cluster. However, operating a cache server or cluster incurs a cost, which may not always be justified for medium-sized applications, such as a small business website.
An alternative solution to distributed caching is to maintain a local in-memory cache in each application instance. Instead of using a shared distributed cache, each application instance caches its data into its local cache. However, when one application instance modifies a piece of data, it must ensure that all instances remove the relevant items from their local cache. This process is known as distributed cache invalidation. It can be achieved easily and inexpensively with a publish/subscribe (Pub/Sub) message bus, which is much less costly than a cache cluster.
Metalama facilitates the easy addition of pub/sub cache invalidation to your existing Metalama caching using either Azure Service Bus or Redis Pub/Sub.
Warning
With pub/sub invalidation, there may be some latency in the invalidation mechanism, i.e., different application instances running on different servers may see different data for a few dozen milliseconds. While generally harmless when application clients are affinitized to one server (for instance, with geo-based request routing), it can cause issues when the same client can randomly connect to different servers.
Using Azure Service Bus
Configuring a topic
The first step is to create a topic. To do this using the Microsoft Azure portal, follow these steps:
Navigate to the Microsoft Azure portal, open the Service Bus panel and create a new Topic. Choose a small value for the time-to-live setting, such as 30 seconds. Visit the Microsoft Azure website for more details.
In the Microsoft Azure portal, create a Shared access policy and include the Send, Listen, and Manage rights. Your application will use this policy.
Copy the primary or secondary connection string to your clipboard.
Configuring your application
Add caching to your application as described in Getting started with Metalama Caching.
Add a reference to the Metalama.Patterns.Caching.Backends.Azure NuGet package.
Return to the code that initialized the Metalama Caching by calling serviceCollection.AddMetalamaCaching or CachingService.Create. Call the WithBackend method and supply a delegate that calls the Memory method. Then, call WithAzureSynchronization and pass the topic connection string as an argument.
30 // Add the caching service. 31 builder.Services.AddMetalamaCaching( 32 caching => 33 caching.WithBackend( 34 backend => 35 backend.Memory() 36 .WithAzureSynchronization( 37 connectionString ) ) );
We recommend initializing the caching service during your application's initialization sequence; otherwise, the service will be initialized lazily upon its first use. Retrieve the ICachingService interface from the IServiceProvider and call the <xref"Metalama.Patterns.Caching.ICachingService.InitializeAsync*> method.
48 await app.Services.GetRequiredService<ICachingService>().InitializeAsync();
Warning
Ensure that the ICachingService is properly disposed of before the application exits. Failure to do so may leave some background cache write operations unprocessed, leading to cache inconsistency.
Example: A Distributed Application Synchronized by Azure Service Bus
The following example simulates a multi-instance application. For ease of testing, both instances live in the same process. Both instances read and write to a shared database simulated by a concurrent dictionary, which sits behind an in-memory cache. These two cache instances are synchronized using WithAzureSynchronization.
1using Metalama.Patterns.Caching.Aspects;
2using System;
3using System.Collections.Concurrent;
4using System.Collections.Generic;
5
6using Metalama.Patterns.Caching;
7
8namespace Doc.AzureSynchronized;
9
10public record Product( string Id, decimal Price, string? Remarks = null );
11
12public sealed class ProductCatalogue
13{
14 // This instance is intentionally shared between both app instances to simulate
15 // a shared database.
16 private static readonly ConcurrentDictionary<string, Product> _dbSimulator
17 = new() { ["corn"] = new Product( "corn", 100, "Initial record." ) };
18
19 public int DbOperationCount { get; private set; }
20
21 [Cache]
22 public Product GetProduct( string productId )
23 {
24 Console.WriteLine( $"Getting the product of {productId} from database." );
25
26 this.DbOperationCount++;
27
28 return _dbSimulator[productId];
29 }
30
31 public void Update( Product product )
32 {
33 if ( !_dbSimulator.ContainsKey( product.Id ) )
34 {
35 throw new KeyNotFoundException();
36 }
37
38 Console.WriteLine( $"Updating the product {product.Id}." );
39
40 this.DbOperationCount++;
41 _dbSimulator[product.Id] = product;
42
Error CS1061: 'ProductCatalogue' does not contain a definition for '_cachingService' and no accessible extension method '_cachingService' accepting a first argument of type 'ProductCatalogue' could be found (are you missing a using directive or an assembly reference?)
43 this._cachingService.Invalidate( this.GetProduct, product.Id );
44 }
45}
1using Metalama.Patterns.Caching.Aspects;
2using System;
3using System.Collections.Concurrent;
4using System.Collections.Generic;
5
6using Metalama.Patterns.Caching;
7using Metalama.Patterns.Caching.Aspects.Helpers;
8using System.Reflection;
9
10namespace Doc.AzureSynchronized;
11
12public record Product(string Id, decimal Price, string? Remarks = null);
13
14public sealed class ProductCatalogue
15{
16 // This instance is intentionally shared between both app instances to simulate
17 // a shared database.
18 private static readonly ConcurrentDictionary<string, Product> _dbSimulator
19 = new() { ["corn"] = new Product("corn", 100, "Initial record.") };
20
21 public int DbOperationCount { get; private set; }
22
23 [Cache]
24 public Product GetProduct(string productId)
25 {
26 static object? Invoke(object? instance, object?[] args)
27 {
28 return ((ProductCatalogue)instance).GetProduct_Source((string)args[0]);
29 }
30
31 return _cachingService.GetFromCacheOrExecute<Product>(_cacheRegistration_GetProduct, this, new object[] { productId }, Invoke);
32 }
33
34 private Product GetProduct_Source(string productId)
35 {
36 Console.WriteLine($"Getting the product of {productId} from database.");
37
38 this.DbOperationCount++;
39
40 return _dbSimulator[productId];
41 }
42
43 public void Update(Product product)
44 {
45 if (!_dbSimulator.ContainsKey(product.Id))
46 {
47 throw new KeyNotFoundException();
48 }
49
50 Console.WriteLine($"Updating the product {product.Id}.");
51
52 this.DbOperationCount++;
53 _dbSimulator[product.Id] = product;
54
55 this._cachingService.Invalidate(this.GetProduct, product.Id);
56 }
57
58 private static readonly CachedMethodMetadata _cacheRegistration_GetProduct;
59 private ICachingService _cachingService;
60
61 static ProductCatalogue()
62 {
63 _cacheRegistration_GetProduct = CachedMethodMetadata.Register(typeof(ProductCatalogue).GetMethod("GetProduct", BindingFlags.Public | BindingFlags.Instance, null, new[] { typeof(string) }, null).ThrowIfMissing("ProductCatalogue.GetProduct(string)"), new CachedMethodConfiguration() { AbsoluteExpiration = null, AutoReload = null, IgnoreThisParameter = null, Priority = null, ProfileName = (string?)null, SlidingExpiration = null }, true);
64 }
65
66 public ProductCatalogue(ICachingService? cachingService = null)
67 {
68 this._cachingService = cachingService ?? throw new System.ArgumentNullException(nameof(cachingService));
69 }
70}
1using System;
2using Metalama.Documentation.Helpers.ConsoleApp;
3using System.Threading;
4using System.Threading.Tasks;
5
6namespace Doc.AzureSynchronized;
7
8public sealed class ConsoleMain : IAsyncConsoleMain
9{
10 private readonly ProductCatalogue _productCatalogue;
11 private readonly string _appName;
12
13 public ConsoleMain( ProductCatalogue productCatalogue, IConsoleHost host )
14 {
15 this._productCatalogue = productCatalogue;
16 this._appName = host.Arguments[0];
17 }
18
19 public async Task ExecuteAsync()
20 {
21 // Force running in parallel.
22 await Task.Yield();
23
24 for ( var i = 0; i < 3; i++ )
25 {
26 for ( var j = 0; j < 3; j++ )
27 {
28 // Getting the product.
29 var corn = this._productCatalogue.GetProduct( "corn" );
30 Console.WriteLine( $"{this._appName} reads and gets {corn}." );
31 await Task.Delay( 20 + Random.Shared.Next( 4 ) );
32 }
33
34 // Updating the product.
35 var updatedCorn = new Product(
36 "corn",
37 100 + Random.Shared.Next( 20 ),
38 $"Updated by {this._appName}, i={i}" );
39
40 Console.WriteLine( $"{this._appName} update {updatedCorn}." );
41 this._productCatalogue.Update( updatedCorn );
42 }
43
44 Console.WriteLine(
45 $"In total, CloudCalculator in {this._appName} performed {this._productCatalogue.DbOperationCount} database operation(s)." );
46 }
47}
Getting the product of corn from database. Getting the product of corn from database. App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }. App1 update Product { Id = corn, Price = 111, Remarks = Updated by App1, i=0 }. App2 update Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. Updating the product corn. Updating the product corn. Getting the product of corn from database. Getting the product of corn from database. App1 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App2 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App1 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App2 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App1 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App2 reads and gets Product { Id = corn, Price = 110, Remarks = Updated by App2, i=0 }. App2 update Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App1 update Product { Id = corn, Price = 104, Remarks = Updated by App1, i=1 }. Updating the product corn. Updating the product corn. Getting the product of corn from database. Getting the product of corn from database. App1 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App2 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App1 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App2 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App1 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App2 reads and gets Product { Id = corn, Price = 107, Remarks = Updated by App2, i=1 }. App1 update Product { Id = corn, Price = 111, Remarks = Updated by App1, i=2 }. Updating the product corn. App2 update Product { Id = corn, Price = 105, Remarks = Updated by App2, i=2 }. Updating the product corn. In total, CloudCalculator in App2 performed 6 database operation(s). In total, CloudCalculator in App1 performed 6 database operation(s).
1using Metalama.Documentation.Helpers.ConsoleApp;
2using Metalama.Documentation.Helpers.Security;
3using Metalama.Patterns.Caching;
4using Metalama.Patterns.Caching.Backends.Azure;
5using Metalama.Patterns.Caching.Building;
6using Microsoft.Extensions.DependencyInjection;
7using System.Threading.Tasks;
8
9namespace Doc.AzureSynchronized;
10
11internal static class Program
12{
13 public static async Task Main()
14 {
15 // We simulate two applications running in parallel.
16 var app1 = RunApp( "App1" );
17 var app2 = RunApp( "App2" );
18
19 await Task.WhenAll( app1, app2 );
20 }
21
22 private static async Task RunApp( string name )
23 {
24 var builder = ConsoleApp.CreateBuilder();
25
26 // Get the connection string.
27 var connectionString = Secrets.Get( "CacheInvalidationTestServiceBusConnectionString" );
28
29 //
30 // Add the caching service.
31 builder.Services.AddMetalamaCaching(
32 caching =>
33 caching.WithBackend(
34 backend =>
35 backend.Memory()
36 .WithAzureSynchronization(
37 connectionString ) ) );
38 //
39
40 // Add other components as usual.
41 builder.Services.AddAsyncConsoleMain<ConsoleMain>();
42 builder.Services.AddSingleton<ProductCatalogue>();
43
44 // Build the application.
45 await using var app = builder.Build( [name] );
46
47 //
48 await app.Services.GetRequiredService<ICachingService>().InitializeAsync();
49 //
50
51 // Run the application.
52 await app.RunAsync();
53 }
54}
Using Redis Pub/Sub
If you are already using Redis as a storage for Metalama Caching, adding another layer of invalidation is unnecessary as this is already handled by the Redis caching back-end. However, if you already have a Redis cluster but don't want to use it for caching, you can still use it for cache invalidation. An example of this situation is when your Redis server's latency is too high for caching but sufficient for cache invalidation.
No configuration on your Redis server is necessary to use it for cache synchronization.
Add caching to your application as described in Getting started with Metalama Caching.
Add a reference to the Metalama.Patterns.Caching.Backends.Redis NuGet package.
Return to the code that initialized the Metalama Caching by calling serviceCollection.AddMetalamaCaching or CachingService.Create. Call the WithBackend method and supply a delegate that calls the Memory method. Then, call WithRedisSynchronization and pass an instance of RedisCacheSynchronizerConfiguration.
We recommend initializing the caching service during your application's initialization sequence; otherwise, the service will be initialized lazily upon its first use. Retrieve the ICachingService interface from the IServiceProvider and call the <xref"Metalama.Patterns.Caching.ICachingService.InitializeAsync*> method.
Warning
Ensure that the ICachingService is properly disposed of before the application exits. Failure to do so may leave some background cache write operations unprocessed, leading to cache inconsistency.