问题描述
我一直在使用this answer中排队的锁定代码,并为此编写了单元测试。 作为参考,锁定代码:
public sealed class FifoMutex
{
private readonly object innerLock = new object();
private volatile int ticketsCount = 0;
private volatile int ticketToRide = 1;
private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();
public void Enter()
{
reenter.Value++;
if (reenter.Value > 1)
return;
int myTicket = Interlocked.Increment(ref ticketsCount);
Monitor.Enter(innerLock);
while (true)
{
if (myTicket == ticketToRide)
{
return;
}
else
{
Monitor.Wait(innerLock);
}
}
}
public void Exit()
{
if (reenter.Value > 0)
reenter.Value--;
if (reenter.Value > 0)
return;
Interlocked.Increment(ref ticketToRide);
Monitor.pulseAll(innerLock);
Monitor.Exit(innerLock);
}
}
我的测试代码:
[TestClass]
public class FifoMutexTests
{
public static ConcurrentQueue<string> Queue;
[Testinitialize]
public void Setup()
{
Queue = new ConcurrentQueue<string>();
}
[TestCleanup]
public void TearDown()
{
Queue = null;
}
[TestMethod]
public void TestFifoMutex()
{
int noOfThreads = 10;
int[] threadSleepTimes = new int[noOfThreads];
string[] threadNames = new string[noOfThreads];
Random r = new Random();
for (int i = 0; i < noOfThreads; i++)
{
threadSleepTimes[i] = r.Next(0,250);
threadNames[i] = "Thread " + i;
}
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
Thread.Sleep(3000);
var receivedThreadNamesInorder = Queue.ToArray();
Assert.AreEqual(threadNames.Length,receivedThreadNamesInorder.Length);
for (int i = 0; i < receivedThreadNamesInorder.Length; i++)
{
Assert.AreEqual(threadNames[i],receivedThreadNamesInorder[i]);
}
}
}
使用此测试互斥锁用户:
public class FifoMutexTestUser
{
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
从本质上讲,我正在创建十个线程,每个线程将随机睡眠一段时间,然后将其名称放入主测试类的静态并发队列中。线程是从同一用户类的不同实例构建的,该类具有静态fifo互斥属性。这种情况类似于我自己的用例(我有多个使用方类,它们从不同的地方接收消息,并且我需要后端严格按顺序处理消息,但也要严格按消息到达的顺序处理消息。)
但是此测试无效。所有线程都排队其所有名称,但顺序不正确。从第二个代码段的最后一个for循环中,我读到它们实际上是按随机顺序执行的,这正是fifo互斥锁要防止的内容。
这就是问题。对我的测试代码进行了一次小的调整,所有这些工作都像一个魅力。
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
Thread.Sleep(1);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
现在,我在启动所有线程的循环(第二个片段的第二个循环)中睡眠了一毫秒,这是最小的可能间隔。如果这样做,则所有线程都以正确的顺序排入它们的名称,并且我的测试成功100%的时间。
所以我想知道为什么这么小的睡眠时间会有所作为。我对编译不是很了解,但是我的第一个猜测是启动所有线程的循环正在由编译器编译或优化,并且在此过程中线程的顺序会发生变化?
或者(也许更可能)替代,我的测试代码(或互斥代码)是否有问题?
解决方法
(如果我正确理解问题的话)您似乎认为线程实际上将启动(在这种情况下将执行DoWork
),并按照在它们上调用Thread.Start
的顺序来获取互斥量。但是,事实并非如此。
假设您有10个线程(“ id”从1到10),然后依次对它们调用Thead.Start
-这不是 表示它们实际上将从该线程开始订购。您在线程1上调用start,然后在线程2上调用start,然后可能首先执行线程2(而非1)的DoWork
。您可以通过以下方式更改测试代码来观察这一点:
public class FifoMutexTestUser {
private readonly int _id;
public FifoMutexTestUser(int id) {
_id = id;
}
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
Console.WriteLine("Thread started: " + _id);
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
然后在其中传递循环变量(与执行断言的threadNames
相对应):
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser(i);
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
您会看到类似这样的结果(结果可能会有所不同):
Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8
因此,在此运行中,您最后一次调用Thread.Start
的线程实际上首先启动。但是更多的是-如果线程先启动(首先是启动,我们的意思是这里DoWork
开始执行)-这并不意味着它将首先获取互斥锁,因为线程是并行执行的,并且代码在fifoMutex.Enter
之外并且fifoMutex.Exit
(以及在实际获取或发布互斥锁之前和之后的那些函数内)不受任何同步结构的保护-任何线程都可以首先抢占互斥锁。
有时(并非总是)添加延迟会给以前的(在循环中)线程带来好处,因此它有更多机会首先实际抓住互斥体。如果您很幸运,以便线程尝试以正确的顺序获取互斥体,那么您的FifoMutex
确保它们将按该顺序解除阻塞。但是,您获取互斥量的顺序在您的测试代码中不确定。