问题描述
假设多个线程在比较代码上存在竞争条件。
private int volatile maxValue;
private AtomicInteger currentValue;
public void constructor() {
this.current = new AtomicInteger(getNewValue());
}
public getNextValue() {
while(true) {
int latestValue = this.currentValue.get();
int nextValue = latestValue + 1;
if(latestValue == maxValue) {//Race condition 1
latestValue = getNewValue();
}
if(currentValue.compareAndSet(latestValue,nextValue) {//Race condition 2
return latestValue;
}
}
}
private int getNewValue() {
int newValue = getFromDb(); //not idempotent
maxValue = newValue + 10;
return newValue;
}
问题:
解决这个问题的显而易见的方法是在 if 条件周围添加同步块/方法。使用并发 api 而不使用任何类型的锁来解决此问题的其他高效方法是什么?
如何摆脱 while 循环,以便我们可以在没有或更少线程争用的情况下获取下一个值?
约束:
接下来的 db 序列将按递增顺序排列,不一定均匀分布。所以它可能是 1、11、31,其中 21 可能是其他节点询问的。请求的下一个值将始终是唯一的。还需要确保所有序列都被使用,一旦我们达到前一个范围的最大值,则只向 db 请求另一个起始序列,依此类推。
示例:
对于增量为 10 的 db next 序列 1,11,31,对于 30 个请求,输出的 next 序列应为 1-10、11-20、31-40。
解决方法
首先:我建议再考虑一次使用 synchronized
,因为:
- 看看这样的代码有多简单:
private int maxValue; private int currentValue; public constructor() { requestNextValue(); } public synchronized int getNextValue() { currentValue += 1; if (currentValue == maxValue) { requestNextValue(); } return currentValue; } private void requestNextValue() { currentValue = getFromDb(); //not idempotent maxValue = currentValue + 10; }
- java 中的锁实际上是 pretty intelligent 并且有 pretty good performance。
- 您在代码中与 DB 对话 - 仅此一项的性能成本可能比锁的性能成本高出几个数量级。
但一般来说,您的竞争条件是因为您独立更新 maxValue
和 currentValue
。
您可以将这 2 个值组合成一个不可变对象,然后以原子方式使用该对象:
private final AtomicReference<State> stateHolder = new AtomicReference<>(newStateFromDb());
public int getNextValue() {
while (true) {
State oldState = stateHolder.get();
State newState = (oldState.currentValue == oldState.maxValue)
? newStateFromDb()
: new State(oldState.currentValue + 1,oldState.maxValue);
if (stateHolder.compareAndSet(oldState,newState)) {
return newState.currentValue;
}
}
}
private static State newStateFromDb() {
int newValue = getFromDb(); // not idempotent
return new State(newValue,newValue + 10);
}
private static class State {
final int currentValue;
final int maxValue;
State(int currentValue,int maxValue) {
this.currentValue = currentValue;
this.maxValue = maxValue;
}
}
修复之后,您接下来可能需要解决以下问题:
- 如何防止多个并行
getFromDb();
(尤其是考虑到该方法是幂等的) - 当一个线程执行
getFromDb();
时,如何防止其他线程在while(true)
循环内忙于自旋并消耗所有可用的cpu时间 - 更多类似问题
解决这些问题中的每一个都可能会使您的代码变得越来越复杂。
所以,恕我直言,这几乎是不值得的 - 锁工作正常并保持代码简单。
,您不能完全避免使用给定约束进行锁定:因为 (1) getFromDb()
返回的每个值都必须使用,并且 (2) 调用 getFromDb()
只允许在 maxValue
被达到,您需要确保对 getFromDb()
的调用互斥。
如果没有约束 (1) 或 (2),您可以使用乐观锁定:
-
如果没有 (1),您可以允许多个线程同时调用
getFromDb()
并选择结果之一丢弃所有其他线程。 -
如果没有 (2),您可以允许多个线程同时调用
getFromDb()
并选择结果之一。其他结果将“保存以备后用”。
解决这个问题的显而易见的方法是在 if 条件周围添加同步块
那是行不通的。让我试着解释一下。
当您遇到条件:if(latestValue == maxValue) { ... }
时,您希望以原子方式更新 maxValue
和 currentValue
。像这样:
latestValue = getNewValue();
currentValue.set(latestValue);
getNewValue
将从 DB
获取您的下一个起始值并更新 maxValue
,但同时,您希望现在将 currentValue
设置为新的起始值.假设情况:
-
您首先从数据库中读取
1
。因此maxValue = 11
,currentValue = 1
。 -
当你达到条件
if(latestValue == maxValue)
时,你想要去数据库获取新的起始位置(假设21
),但同时你希望每个线程现在从21
开始。因此,您还必须设置currentValue
。
现在的问题是,如果你在同步块下写到currentValue
,例如:
if(latestValue == maxValue) {
synchronized (lock) {
latestValue = getNewValue();
currentValue.set(latestValue);
}
}
你还需要在同一个lock
下阅读,否则你就有种族了。最初我认为我可以更聪明一点,做一些类似的事情:
if(latestValue == maxValue) {
synchronized (lock) {
if(latestValue == maxValue) {
latestValue = getNewValue();
currentValue.set(latestValue);
} else {
continue;
}
}
}
以便所有等待 lock
的线程在释放锁时不会将先前写入的值覆盖为 maxValue
。但这仍然是 race
并且会在其他地方引起问题,在不同的情况下,相当微不足道。例如:
-
ThreadA
执行latestValue = getNewValue();
,因此maxValue == 21
。 之前它currentValue.set(latestValue);
-
ThreadB
读取int latestValue = this.currentValue.get();
,看到11
,当然这是假的:if(latestValue == maxValue) {
,所以它可以写12
({{ 1}}) 到nextValue
。这破坏了整个算法。
我看不出有任何其他方法可以使 currentValue
getNextValue
或以其他方式受互斥锁/自旋锁保护。
我真的看不到同步数据库调用的方法 - 除非多次调用数据库不是问题(即检索多个“新值”)。
要消除同步 getNextValue
方法的需要,您可以使用 BlockingQueue
,这将消除原子更新 2 个变量的需要。如果你真的不想使用synchronize关键字,你可以使用一个标志,只让一个线程调用数据库。
它可能看起来像这样(看起来不错,但没有经过测试):
private final BlockingQueue<Integer> nextValues = new ArrayBlockingQueue<>(10);
private final AtomicBoolean updating = new AtomicBoolean();
public int getNextValue() {
while (true) {
Integer nextValue = nextValues.poll();
if (nextValue != null) return nextValue;
else getNewValues();
}
}
private void getNewValues() {
if (updating.compareAndSet(false,true)) {
//we hold the "lock" to run the update
if (!nextValues.isEmpty()) {
updating.set(false);
throw new IllegalStateException("nextValues should be empty here");
}
try {
int newValue = getFromDb(); //not idempotent
for (int i = 0; i < 10; i++) {
nextValues.add(newValue + i);
}
} finally {
updating.set(false);
}
}
}
但是正如其他评论中提到的,这里最昂贵的操作很可能是数据库调用,它保持同步,所以你最好同步所有内容并保持简单,在性能方面几乎没有差异。
,当 getFromDb
命中数据库时,您确实需要一些锁定 - 其他线程应该阻塞而不是也去访问数据库或自旋。真的,如果你每 10 次迭代就这样做,你可能会同步很多。然而,这并不好玩。
任何合理的非微控制器平台都应该支持 AtomicLong
作为无锁。所以我们可以方便地将两个 int
打包成一个原子。
private final AtomicLong combinedValue;
public getNextValue() {
for (;;) {
long combined = combinedValue.get();
int latestValue = (int)combined;
int maxValue = (int)(combined>>32);
int nextValue = latestValue + 1;
long nextCombined = (newValue&0xffffffff) | (maxValue<<32)
if (latestValue == maxValue) {
nextValue();
} else if (currentValue.compareAndSet(combined,nextCombined)) {
return latestValue;
}
}
}
private synchronized void nextValue() {
// Yup,we need to double check with this locking.
long combined = combinedValue.get();
int latestValue = (int)combined;
int maxValue = (int)(combined>>32);
if (latestValue == maxValue) {
int newValue = getFromDb(); //not idempotent
int maxValue = newValue + 10;
long nextCombined = (newValue&0xffffffff) | (maxValue<<32)
combinedValue.set(nextCombined);
}
}
内存分配的另一种方法是将两个值集中到一个对象中并使用 AtomicReference
。但是,我们可以观察到值的变化比最大值更频繁,因此我们可以使用缓慢变化的对象和快速的偏移量。
private static record Segment(
int maxValue,AtomicInteger currentValue
) {
}
private volatile Segment segment;
public getNextValue() {
for (;;) {
Segment segment = this.segment;
int latestValue = segment.currentValue().get();
int nextValue = latestValue + 1;
if (latestValue == segment.maxValue()) {
nextValue();
} else if (segment.currentValue().compareAndSet(
latestValue,nextValue
)) {
return latestValue;
}
}
}
private synchronized void nextValue() {
// Yup,we need to double check with this locking.
Segment segment = this.segment;
int latestValue = segment.currentValue().get();
if (latestValue == segment.maxValue()) {
int newValue = getFromDb(); //not idempotent
int maxValue = newValue + 10;
segment = new Segment(maxValue,new AtomicInteger(newValue));
}
}
(标准免责声明:代码没有经过编译、测试或考虑太多。记录在编写 JDK 时需要一个相当新的。构造函数被省略。)
,多么有趣的问题。正如其他人所说,您可以通过使用 synchronized
关键字来解决问题。
public synchronized int getNextValue() { ... }
但是因为您不想使用该关键字,同时又想避免竞争条件,所以这可能会有所帮助。虽然没有保证。并且请不要要求解释,我会把OutOfBrainException
扔给你。
private volatile int maxValue;
private volatile boolean locked = false; //For clarity.
private AtomicInteger currentValue;
public int getNextValue() {
int latestValue = this.currentValue.get();
int nextValue = latestValue + 1;
if(!locked && latestValue == maxValue) {
locked = true; //Only one thread per time.
latestValue = getNewValue();
currentValue.set(latestValue);
locked = false;
}
while(locked) { latestValue = 0; } //If a thread running in the previous if statement,we need this to buy some time.
//We also need to reset "latestValue" so that when this thread runs the next loop,//it will guarantee to call AtomicInteger.get() for the updated value.
while(!currentValue.compareAndSet(latestValue,nextValue)) {
latestValue = this.currentValue.get();
nextValue = latestValue + 1;
}
return nextValue;
}
或者你可以使用Atomic
来对抗Atomic
。
private AtomicBoolean locked = new AtomicBoolean(false);
public int getNextValue() {
...
if(locked.compareAndSet(false,true)) { //Only one thread per time.
if(latestValue == maxValue) {
latestValue = getNewValue();
currentValue.set(latestValue);
}
locked.set(false);
}
...
,
我想不出消除所有锁定的方法,因为潜在问题是从多个线程访问可变值。但是,可以对您提供的代码进行一些改进,基本上是利用这样一个事实,即当数据由多个线程读取时,除非必须进行写入,否则无需锁定读取,因此使用 Read/Write锁将减少争用。只有 1/10 次才会有“满”写锁
所以代码可以这样重写(把错误放在一边):
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Counter {
private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock(true);
private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
private AtomicInteger currentValue;
private AtomicInteger maxValue;
public Counter() {
int initialValue = getFromDb();
this.currentValue = new AtomicInteger(initialValue);
this.maxValue = new AtomicInteger(initialValue + 10);
}
public int getNextValue() {
readLock.lock();
while (true){
int nextValue = currentValue.getAndIncrement();
if(nextValue<maxValue.get()){
readLock.unlock();
return nextValue;
}
else {
readLock.unlock();
writeLock.lock();
reload();
readLock.lock();
writeLock.unlock();
}
}
}
private void reload(){
int newValue = getFromDb();
if(newValue>maxValue.get()) {
this.currentValue.set(newValue);
this.maxValue.set(newValue + 10);
}
}
private int getFromDb(){
// your implementation
}
}
,
您要解决的业务用例是什么? 下一个场景是否适合您:
- 根据数据库中的计数器要求创建 SQL sequence(基于您的数据库);
- 从数据库中批量获取计数器,例如 50-100 个 ID
- 在应用级别使用 50-100 后,再从 db 中获取 100 个值...
?
,稍微修改过的 Tom Hawtin - tackline's answer 版本以及问题评论中 codeflush.dev 的建议
代码
我添加了一个工作版本的代码并模拟了一个基本的多线程环境。
免责声明:自行决定使用
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
class Seed {
private static final int MSB = 32;
private final int start;
private final int end;
private final long window;
public Seed(int start,int end) {
this.start = start;
this.end = end;
this.window = (((long) end) << MSB) | start;
}
public Seed(long window) {
this.start = (int) window;
this.end = (int) (window >> MSB);
this.window = window;
}
public int getStart() {
return start;
}
public int getEnd() {
return end;
}
public long getWindow() {
return window;
}
// this will not update the state,will only return the computed value
public long computeNextInWindow() {
return window + 1;
}
}
// a mock external seed service to abstract the seed generation and window logic
class SeedService {
private static final int SEED_INIT = 1;
private static final AtomicInteger SEED = new AtomicInteger(SEED_INIT);
private static final int SEQ_LENGTH = 10;
private static final int JITTER_FACTOR = 5;
private final boolean canAddRandomJitterToSeed;
private final Random random;
public SeedService(boolean canJitterSeed) {
this.canAddRandomJitterToSeed = canJitterSeed;
this.random = new Random();
}
public int getSeqLengthForTest() {
return SEQ_LENGTH;
}
public Seed getDefaultWindow() {
return new Seed(1,1);
}
public Seed getNextWindow() {
int offset = SEQ_LENGTH;
// trying to simulate multiple machines with interleaved start seed
if (canAddRandomJitterToSeed) {
offset += random.nextInt(JITTER_FACTOR) * SEQ_LENGTH;
}
final int start = SEED.getAndAdd(offset);
return new Seed(start,start + SEQ_LENGTH);
}
// helper to validate generated ids
public boolean validate(List<Integer> ids) {
Collections.sort(ids);
// unique check
if (ids.size() != new HashSet<>(ids).size()) {
return false;
}
for (int startIndex = 0; startIndex < ids.size(); startIndex += SEQ_LENGTH) {
if (!checkSequence(ids,startIndex)) {
return false;
}
}
return true;
}
// checks a sequence
// relies on 'main' methods usage of SEQ_LENGTH
protected boolean checkSequence(List<Integer> ids,int startIndex) {
final int startRange = ids.get(startIndex);
return IntStream.range(startRange,startRange + SEQ_LENGTH).boxed()
.collect(Collectors.toList())
.containsAll(ids.subList(startIndex,startIndex + SEQ_LENGTH));
}
public void shutdown() {
SEED.set(SEED_INIT);
System.out.println("See you soon!!!");
}
}
class SequenceGenerator {
private final SeedService seedService;
private final AtomicLong currentWindow;
public SequenceGenerator(SeedService seedService) {
this.seedService = seedService;
// initialize currentWindow using seedService
// best to initialize to an old window so that every instance of SequenceGenerator
// will lazy load from seedService during the first getNext() call
currentWindow = new AtomicLong(seedService.getDefaultWindow().getWindow());
}
public synchronized boolean requestSeed() {
Seed seed = new Seed(currentWindow.get());
if (seed.getStart() == seed.getEnd()) {
final Seed nextSeed = seedService.getNextWindow();
currentWindow.set(nextSeed.getWindow());
return true;
}
return false;
}
public int getNext() {
while (true) {
// get current window
Seed seed = new Seed(currentWindow.get());
// exhausted and need to seed again
if (seed.getStart() == seed.getEnd()) {
// this will loop at least one more time to return value
requestSeed();
} else if (currentWindow.compareAndSet(seed.getWindow(),seed.computeNextInWindow())) {
// successfully incremented value for next call. so return current value
return seed.getStart();
}
}
}
}
public class SequenceGeneratorTest {
public static void test(boolean canJitterSeed) throws Exception {
// just some random multithreaded invocation
final int EXECUTOR_THREAD_COUNT = 10;
final Random random = new Random();
final int INSTANCES = 500;
final SeedService seedService = new SeedService(canJitterSeed);
final int randomRps = 500;
final int seqLength = seedService.getSeqLengthForTest();
ExecutorService executorService = Executors.newFixedThreadPool(EXECUTOR_THREAD_COUNT);
Callable<List<Integer>> callable = () -> {
final SequenceGenerator generator = new SequenceGenerator(seedService);
int rps = (1 + random.nextInt(randomRps)) * seqLength;
return IntStream.range(0,rps).parallel().mapToObj(i -> generator.getNext())
.collect(Collectors.toList());
};
List<Future<List<Integer>>> futures = IntStream.range(0,INSTANCES).parallel()
.mapToObj(i -> executorService.submit(callable))
.collect(Collectors.toList());
List<Integer> ids = new ArrayList<>();
for (Future<List<Integer>> f : futures) {
ids.addAll(f.get());
}
executorService.shutdown();
// validate generated ids for correctness
if (!seedService.validate(ids)) {
throw new IllegalStateException();
}
seedService.shutdown();
// summary
System.out.println("count: " + ids.size() + ",unique count: " + new HashSet<>(ids).size());
Collections.sort(ids);
System.out.println("min id: " + ids.get(0) + ",max id: " + ids.get(ids.size() - 1));
}
public static void main(String[] args) throws Exception {
test(true);
System.out.println("Note: ids can be interleaved. if continuous sequence is needed,initialize SeedService with canJitterSeed=false");
final String ruler = Collections.nCopies( 50,"-" ).stream().collect( Collectors.joining());
System.out.println(ruler);
test(false);
System.out.println("Thank you!!!");
System.out.println(ruler);
}
}
,
稍微修改过的 user15102975 的 answer 版本,没有 while 循环和 getFromDb() 模拟实现。
/**
* Lock free sequence counter implementation
*/
public class LockFreeSequenceCounter {
private static final int BATCH_SIZE = 10;
private final AtomicReference<Sequence> currentSequence;
private final ConcurrentLinkedQueue<Integer> databaseSequenceQueue;
public LockFreeSequenceCounter() {
this.currentSequence = new AtomicReference<>(new Sequence(0,0));
this.databaseSequenceQueue = new ConcurrentLinkedQueue<>();
}
/**
* Get next unique id (threadsafe)
*/
public int getNextValue() {
return currentSequence.updateAndGet((old) -> old.next(this)).currentValue;
}
/**
* Immutable class to handle current and max value
*/
private static final class Sequence {
private final int currentValue;
private final int maxValue;
public Sequence(int currentValue,int maxValue) {
this.currentValue = currentValue;
this.maxValue = maxValue;
}
public Sequence next(LockFreeSequenceCounter counter){
return isMaxReached() ? fetchDB(counter) : inc();
}
private boolean isMaxReached(){
return currentValue == maxValue;
}
private Sequence inc(){
return new Sequence(this.currentValue + 1,this.maxValue);
}
private Sequence fetchDB(LockFreeSequenceCounter counter){
counter.databaseSequenceQueue.add(counter.getFromDb());
int newValue = counter.databaseSequenceQueue.poll();
int maxValue = newValue + BATCH_SIZE -1;
return new Sequence(newValue,maxValue);
}
}
/**
* Get unique id from db (mocked)
* return on call #1: 1
* return on call #2: 11
* return on call #3: 31
* Note: this function is not idempotent
*/
private int getFromDb() {
if (dbSequencer.get() == 21){
return dbSequencer.addAndGet(BATCH_SIZE);
} else{
return dbSequencer.getAndAdd(BATCH_SIZE);
}
}
private final AtomicInteger dbSequencer = new AtomicInteger(1);
}