问题描述
我有一个包含10台计算机的系统,我需要在每台计算机上按同步顺序逐一执行某些任务。基本上,只有一台计算机应在特定时间执行该任务。我们已经将Consul
用于其他目的,但是我在想我们也可以使用Consul
吗?
我了解了更多有关它的信息,看来我们可以在领事中使用领导者选举,其中每台计算机将尝试获取锁,进行工作,然后释放锁。工作完成后,它将释放锁定,然后其他计算机将尝试再次获取锁定并执行相同的工作。这样,所有内容将一次同步到一台计算机。
我决定使用已经内置了此功能的C#
PlayFab ConsulDotNet
library,但是如果有更好的选择,我也很乐意。在我的代码库中,Action
下面的方法几乎是通过观察程序机制在每台计算机上同时调用的。
private void Action() {
// Try to acquire lock using Consul.
// If lock acquired then DoTheWork() otherwise keep waiting for it until lock is acquired.
// Once work is done,release the lock
// so that some other machine can acquire the lock and do the same work.
}
想法是所有10台计算机应一次DoTheWork()
同步顺序。基于这个blog和这个blog,我决定修改他们的示例以适合我们的需求-
下面是我的leaderElectionService
班:
public class leaderElectionService
{
public leaderElectionService(string leadershipLockKey)
{
this.key = leadershipLockKey;
}
public event EventHandler<leaderChangedEventArgs> leaderChanged;
string key;
CancellationTokenSource cts = new CancellationTokenSource();
Timer timer;
bool lastIsHeld = false;
IdistributedLock distributedLock;
public void Start()
{
timer = new Timer(async (object state) => await TryAcquireLock((CancellationToken)state),cts.Token,Timeout.Infinite);
}
private async Task TryAcquireLock(CancellationToken token)
{
if (token.IsCancellationRequested)
return;
try
{
if (distributedLock == null)
{
var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.host.domain.com") };
ConsulClient client = new ConsulClient(clientConfig);
distributedLock = await client.AcquireLock(new LockOptions(key) { LockTryOnce = true,LockWaitTime = TimeSpan.FromSeconds(3) },token).ConfigureAwait(false);
}
else
{
if (!distributedLock.IsHeld)
{
await distributedLock.Acquire(token).ConfigureAwait(false);
}
}
}
catch (LockMaxAttemptsReachedException ex)
{
//this is expected if it Couldn't acquire the lock within the first attempt.
Console.WriteLine(ex.Stacktrace);
}
catch (Exception ex)
{
Console.WriteLine(ex.Stacktrace);
}
finally
{
bool lockHeld = distributedLock?.IsHeld == true;
HandleLockStatusChange(lockHeld);
//Retrigger the timer after a 10 seconds delay (in this example). Delay for 7s if not held as the AcquireLock call will block for ~3s in every Failed attempt.
timer.Change(lockHeld ? 10000 : 7000,Timeout.Infinite);
}
}
protected virtual void HandleLockStatusChange(bool isHeldNew)
{
// Is this the right way to check and do the work here?
// In general I want to call method "DoTheWork" in "Action" method itself
// And then release and destroy the session once work is done.
if (isHeldNew)
{
// DoTheWork();
Console.WriteLine("Hello");
// And then were should I release the lock so that other machine can try to grab it?
// distributedLock.Release();
// distributedLock.Destroy();
}
if (lastIsHeld == isHeldNew)
return;
else
{
lastIsHeld = isHeldNew;
}
if (leaderChanged != null)
{
leaderChangedEventArgs args = new leaderChangedEventArgs(lastIsHeld);
foreach (EventHandler<leaderChangedEventArgs> handler in leaderChanged.GetInvocationList())
{
try
{
handler(this,args);
}
catch (Exception ex)
{
Console.WriteLine(ex.Stacktrace);
}
}
}
}
}
下面是我的leaderChangedEventArgs
类:
public class leaderChangedEventArgs : EventArgs
{
private bool isleader;
public leaderChangedEventArgs(bool isHeld)
{
isleader = isHeld;
}
public bool Isleader { get { return isleader; } }
}
在上面的代码中,我的用例可能不需要很多片段,但是想法是相同的。
问题陈述
现在,在我的Action
方法中,我想使用上述类并在获取锁后立即执行任务,否则请继续等待锁。工作完成后,释放并销毁该会话,以便其他计算机可以抓住它并进行工作。我有点困惑如何在我的below方法中正确使用上述类。
private void Action() {
leaderElectionService electionService = new leaderElectionService("data/process");
// electionService.leaderChanged += (source,arguments) => Console.WriteLine(arguments.Isleader ? "leader" : "Slave");
electionService.Start();
// Now how do I wait for the lock to be acquired here indefinitely
// And once lock is acquired,do the work and then release and destroy the session
// so that other machine can grab the lock and do the work
}
我最近开始使用C#
,这就是为什么对如何使用Consul
和该库有效地使其在生产中起作用的原因感到困惑。
更新
我根据您的建议尝试了以下代码,我想我也尽早进行了尝试,但是由于某种原因,一旦转到 await distributedLock.Acquire(cancellationToken);
行,它就会自动返回到main方法。它永远不会前进到我的Doing Some Work!
打印输出。 CreateLock
确实有效吗?我期望它会在领事上创建data/lock
(因为它不存在),然后尝试获取它的锁,如果获得了锁,那么就进行工作,然后将其释放给其他机器?
private static CancellationTokenSource cts = new CancellationTokenSource();
public static void Main(string[] args)
{
Action(cts.Token);
Console.WriteLine("Hello World");
}
private static async Task Action(CancellationToken cancellationToken)
{
const string keyName = "data/lock";
var clientConfig = new ConsulClientConfiguration { Address = new Uri("http://consul.test.host.com") };
ConsulClient client = new ConsulClient(clientConfig);
var distributedLock = client.CreateLock(keyName);
while (true)
{
try
{
// Try to acquire lock
// As soon as it comes to this line,// it just goes back to main method automatically. not sure why
await distributedLock.Acquire(cancellationToken);
// Lock is acquired
// DoTheWork();
Console.WriteLine("Doing Some Work!");
// Work is done. Jump out of loop to release the lock
break;
}
catch (LockHeldException)
{
// Cannot acquire the lock. Wait a while then retry
await Task.Delay(TimeSpan.FromSeconds(10),cancellationToken);
}
catch (Exception)
{
// Todo: Handle exception thrown by DoTheWork method
// Here we jump out of the loop to release the lock
// But you can try to acquire the lock again based on your requirements
break;
}
}
// Release and destroy the lock
// So that other machine can grab the lock and do the work
await distributedLock.Release(cancellationToken);
await distributedLock.Destroy(cancellationToken);
}
解决方法
IMO,这些博客中的LeaderElectionService
对您而言是过大的事情。
更新1
无需进行while
循环,因为:
-
ConsulClient
是局部变量- 无需检查
IsHeld
属性
- 无需检查
-
Acquire
将无限期阻止,除非- 在
LockTryOnce
中将LockOptions
设置为true - 将超时设置为
CancellationToken
- 在
请注意,在分布式锁(reference)上调用Destroy
后,不必调用Release
方法。
private async Task Action(CancellationToken cancellationToken)
{
const string keyName = "YOUR_KEY";
var client = new ConsulClient();
var distributedLock = client.CreateLock(keyName);
try
{
// Try to acquire lock
// NOTE:
// Acquire method will block indefinitely unless
// 1. Set LockTryOnce = true in LockOptions
// 2. Pass a timeout to cancellation token
await distributedLock.Acquire(cancellationToken);
// Lock is acquired
DoTheWork();
}
catch (Exception)
{
// TODO: Handle exception thrown by DoTheWork method
}
// Release the lock (not necessary to invoke Destroy method),// so that other machine can grab the lock and do the work
await distributedLock.Release(cancellationToken);
}
更新2
OP的代码仅返回到Main
方法的原因是,Action
方法是未等待的。如果使用C#7.1,则可以使用async Main,并将await
放在Action
方法上。
public static async Task Main(string[] args)
{
await Action(cts.Token);
Console.WriteLine("Hello World");
}