StackExchange.Redis.ConnectionMultiplexer Dispose是否导致类似于UnsubscribeAll的行为?

问题描述

我只是不确定StackExchange.Redis的源代码。处置ConnectionMultiplexer实例是否会导致取消订阅由该实例打开且之前没有手动取消订阅的所有订阅

解决方法

答案似乎是-不,但仍不确定。为什么?因为处置ConnectionMultiplexer不会清除现有的订阅,例如UnsubscribeAll正在这样做。但是至少,处置ConnectionMultiplexer可以防止触发仍存在于处置的ConnectionMultiplexer中的尚未取消订阅的处理程序。

例如,这种测试是绿色的。

    [Fact]
    [Trait("Category",TestCategoryCatalogs.IntegrationTest)]
    [Trait("Category",TestCategoryCatalogs.Involve.REDIS)]
    public async Task Dispose_of_redis_connection_cause_unsubscribe_of_existing_subscriptions()
    {
        await _ExecuteAsync(async sp =>
        {
            IRedisClient redisClient1 = sp.GetService<IRedisClient>();
            IRedisClient redisClient2 = sp.GetService<IRedisClient>();

            // Immulate two applications with different connections to the same redis instance.
            IConnectionMultiplexer connection1 = redisClient1.GetConnection();
            IConnectionMultiplexer connection2 = redisClient2.GetConnection();

            ISubscriber subscriber1 = connection1.GetSubscriber();
            ISubscriber subscriber2 = connection2.GetSubscriber();

            var tcs1 = new TaskCompletionSource<bool>();
            var tcs2 = new TaskCompletionSource<bool>();
            var tcs3 = new TaskCompletionSource<bool>();
            var tcs4 = new TaskCompletionSource<bool>();
            var tcs5 = new TaskCompletionSource<bool>();

            string channel1 = nameof(channel1);

            await subscriber1.SubscribeAsync(channel1,(c,v) =>
                {
                    if (tcs1.Task.IsCompleted)
                        tcs3.SetResult(true);
                    else tcs1.SetResult(true);
                });

            await subscriber2.SubscribeAsync(channel1,v) =>
                {
                    if (tcs2.Task.IsCompleted)
                        if (tcs4.Task.IsCompleted)
                            tcs5.SetResult(true);
                        else tcs4.SetResult(true);
                    else tcs2.SetResult(true);
                });

            await subscriber2.PublishAsync(channel1,"1");

            await Task.WhenAll(tcs1.Task,tcs2.Task);

            connection1.Dispose();

            await subscriber2.PublishAsync(channel1,"2");

            await tcs4.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

            try
            {
                await tcs3.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

                throw new Exception("Test has been failed");
            }
            catch (OperationCanceledException)
            {
            }

            FieldInfo subscriptionsFI = connection1.GetType().GetField("subscriptions",System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
            ICollection subscriptions1 = (ICollection) subscriptionsFI.GetValue(connection1);
            ICollection subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);

            // Subscription is still there but it seems to unactive.
            subscriptions1.Count.Should().Be(1);
            subscriptions2.Count.Should().Be(1);

            // If we clean them manually on alived connection,we will not have any subscription
            subscriber1.UnsubscribeAll();
            subscriber2.UnsubscribeAll();

            subscriptions1 = (ICollection)subscriptionsFI.GetValue(connection1);
            subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
            subscriptions1.Count.Should().Be(0);
            subscriptions2.Count.Should().Be(0);

            try
            {
                await tcs5.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

                throw new Exception("Test has been failed");
            }
            catch (OperationCanceledException)
            {
            }
        },_GetDefaultServiceProvider,embededTimeoutSeconds: 10);
    }