停止内存缓存调用的重入

问题描述

应用需要加载数据并缓存一段时间。我希望如果应用程序的多个部分想要同时访问同一个缓存键,缓存应该足够智能,只加载一次数据并将该调用的结果返回给所有调用者。但是,MemoryCache 没有这样做。如果您并行访问缓存(这通常发生在应用程序中),它会为每次尝试获取缓存值创建一个任务。我认为这段代码会达到预期的结果,但事实并非如此。我希望缓存只运行一个 GetDataAsync 任务,等待它完成,然后使用结果获取其他调用的值。

using Microsoft.Extensions.Caching.Memory;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp4
{
    class Program
    {
        private const string Key = "1";
        private static int number = 0;

        static async Task Main(string[] args)
        {
            var memoryCache = new MemoryCache(new MemoryCacheOptions { });

            var tasks = new List<Task>();
            tasks.Add(memoryCache.GetorCreateAsync(Key,(cacheEntry) => GetDataAsync()));
            tasks.Add(memoryCache.GetorCreateAsync(Key,(cacheEntry) => GetDataAsync()));

            await Task.WhenAll(tasks);

            Console.WriteLine($"The cached value was: {memoryCache.Get(Key)}");
        }

        public static async Task<int> GetDataAsync()
        {
            //Simulate getting a large chunk of data from the database
            await Task.Delay(3000);
            number++;
            Console.WriteLine(number);
            return number;
        }
    }
}

事实并非如此。以上显示了这些结果(不一定按此顺序):

2

1

3

缓存的值为:3

它为每个缓存请求创建一个任务,并丢弃其他两个返回的值。

这会不必要地花费时间,这让我怀疑您是否可以说这个类甚至是线程安全的。 ConcurrentDictionary 具有相同的行为。我测试了它,同样的事情发生了。

有没有办法在任务不运行 3 次的情况下实现所需的行为?

解决方法

有不同的解决方案可用,其中最著名的可能是LazyCache:这是一个很棒的库。

您可能会发现另一个有用的是 FusionCache ⚡?,它是我最近发布的:它具有完全相同的功能(尽管实现方式不同)以及更多。

您正在寻找的功能描述为 here,您可以这样使用它:

var result = await fusionCache.GetOrSetAsync(
  Key,_ => await GetDataAsync(),TimeSpan.FromMinutes(2)
);

您可能还会发现其他一些有趣的功能,例如 fail-safeadvanced timeouts 具有后台工厂完成功能并支持可选的分布式 2nd level

如果你愿意,请告诉我你的想法。

/无耻插头

,

MemoryCache 由您决定如何处理填充缓存键的竞争。在您的情况下,您不希望多个线程竞争来填充一个键,大概是因为这样做很昂贵。

要像这样协调多个线程的工作,您需要一个锁,但在异步代码中使用 C# lock 语句会导致线程池饥饿。幸运的是,SemaphoreSlim 提供了一种执行异步锁定的方法,因此只需创建一个包装底层 IMemoryCache 的受保护内存缓存。

我的第一个解决方案只有一个信号量用于整个缓存,将所有缓存填充任务放在一行中,这不是很聪明,所以这里是更精细的解决方案,每个缓存键都有一个信号量。另一种解决方案可能是通过键的散列选择固定数量的信号量。

sealed class GuardedMemoryCache : IDisposable
{
    readonly IMemoryCache cache;
    readonly ConcurrentDictionary<object,SemaphoreSlim> semaphores = new();

    public GuardedMemoryCache(IMemoryCache cache) => this.cache = cache;

    public async Task<TItem> GetOrCreateAsync<TItem>(object key,Func<ICacheEntry,Task<TItem>> factory)
    {
        var semaphore = GetSemaphore(key);
        await semaphore.WaitAsync();
        try
        {
            return await cache.GetOrCreateAsync(key,factory);
        }
        finally
        {
            semaphore.Release();
            RemoveSemaphore(key);
        }
    }

    public object Get(object key) => cache.Get(key);

    public void Dispose()
    {
        foreach (var semaphore in semaphores.Values)
            semaphore.Release();
    }

    SemaphoreSlim GetSemaphore(object key) => semaphores.GetOrAdd(key,_ => new SemaphoreSlim(1));

    void RemoveSemaphore(object key)
    {
        if (semaphores.TryRemove(key,out var semaphore))
            semaphore.Dispose();
    }
}

如果多个线程尝试填充相同的缓存键,实际上只有一个线程会这样做。其他线程将改为返回创建的值。

假设您使用依赖注入,您可以让 GuardedMemoryCache 实现 IMemoryCache,方法是添加更多的方法,这些方法转发到底层缓存以修改整个应用程序中的缓存行为,只需很少的代码更改。

,

这是一个自定义扩展方法 GetOrCreateExclusiveAsync,类似于原生的 IMemoryCache.GetOrCreateAsync,它可以防止在正常情况下并发调用提供的异步 lambda。目的是在大量使用的情况下提高缓存机制的效率。仍然存在并发的可能性,因此这不能替代线程同步(如果需要)。

此实现还会从缓存中驱逐出错的任务,以便随后重试失败的异步操作。

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

/// <summary>
/// Returns an entry from the cache,or creates a new cache entry using the
/// specified asynchronous factory method. Concurrent invocations are prevented,/// unless the entry is evicted before the completion of the delegate. The errors
/// of failed invocations are not cached.
/// </summary>
public static Task<T> GetOrCreateExclusiveAsync<T>(this IMemoryCache cache,object key,Func<Task<T>> factory,MemoryCacheEntryOptions options = null)
{
    if (!cache.TryGetValue(key,out Task<T> task))
    {
        var entry = cache.CreateEntry(key);
        if (options != null) entry.SetOptions(options);
        var cts = new CancellationTokenSource();
        var newTaskTask = new Task<Task<T>>(async () =>
        {
            try { return await factory().ConfigureAwait(false); }
            catch { cts.Cancel(); throw; }
            finally { cts.Dispose(); }
        });
        var newTask = newTaskTask.Unwrap();
        entry.ExpirationTokens.Add(new CancellationChangeToken(cts.Token));
        entry.Value = newTask;
        entry.Dispose(); // The Dispose actually inserts the entry in the cache
        if (!cache.TryGetValue(key,out task)) task = newTask;
        if (task == newTask)
            newTaskTask.RunSynchronously(TaskScheduler.Default);
        else
            cts.Dispose();
    }
    return task;
}

用法示例:

var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateExclusiveAsync(url,async () =>
{
    return await httpClient.GetStringAsync(url);
},new MemoryCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromMinutes(10)));

此实现在内部使用嵌套任务 (Task<Task<T>>) 而不是惰性任务 (Lazy<Task<T>>) 作为包装器,因为后者的构造在某些情况下容易发生死锁。
参考:Lazy<Task> with asynchronous initializationVSTHRD011 Use AsyncLazy


GitHub 上的相关 API 建议:GetOrCreateExclusive() and GetOrCreateExclusiveAsync(): Exclusive versions of GetOrCreate() and GetOrCreateAsync()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...