如何在不使用同步无锁序列计数器实现的情况下修复竞争条件? 免责声明:自行决定使用

问题描述

假设多个线程在比较代码上存在竞争条件。

private int volatile maxValue;
private AtomicInteger currentValue;

public void constructor() {
   this.current = new AtomicInteger(getNewValue());
}

public getNextValue() {
  while(true) {
     int latestValue = this.currentValue.get();
     int nextValue = latestValue + 1;
     if(latestValue == maxValue) {//Race condition 1 
       latestValue = getNewValue();
     }
    if(currentValue.compareAndSet(latestValue,nextValue) {//Race condition 2
      return latestValue;
    }
  }
}

private int getNewValue() {
    int newValue = getFromDb(); //not idempotent
    maxValue = newValue + 10;
    return newValue;
}

问题:

解决这个问题的显而易见的方法是在 if 条件周围添加同步块/方法。使用并发 api 而不使用任何类型的锁来解决此问题的其他高效方法是什么?

如何摆脱 while 循环,以便我们可以在没有或更少线程争用的情况下获取一个值?

约束:

接下来的 db 序列将按递增顺序排列,不一定均匀分布。所以它可能是 1、11、31,其中 21 可能是其他节点询问的。请求的下一个值将始终是唯一的。还需要确保所有序列都被使用,一旦我们达到前一个范围的最大值,则只向 db 请求另一个起始序列,依此类推。

示例:

对于增量为 10 的 db next 序列 1,11,31,对于 30 个请求,输出的 next 序列应为 1-10、11-20、31-40。

解决方法

首先:我建议再考虑一次使用 synchronized,因为:

  1. 看看这样的代码有多简单:
     private int maxValue;
     private int currentValue;
    
     public constructor() {
       requestNextValue();
     }
    
     public synchronized int getNextValue() {
       currentValue += 1;
       if (currentValue == maxValue) {
         requestNextValue();
       }
       return currentValue;
     }
    
     private void requestNextValue() {
       currentValue = getFromDb(); //not idempotent
       maxValue = currentValue + 10;
     }
    
  2. java 中的锁实际上是 pretty intelligent 并且有 pretty good performance
  3. 您在代码中与 DB 对话 - 仅此一项的性能成本可能比锁的性能成本高出几个数量级。

但一般来说,您的竞争条件是因为您独立更新 maxValuecurrentValue
您可以将这 2 个值组合成一个不可变对象,然后以原子方式使用该对象:

private final AtomicReference<State> stateHolder = new AtomicReference<>(newStateFromDb());

public int getNextValue() {
  while (true) {
    State oldState = stateHolder.get();
    State newState = (oldState.currentValue == oldState.maxValue)
        ? newStateFromDb()
        : new State(oldState.currentValue + 1,oldState.maxValue);
    if (stateHolder.compareAndSet(oldState,newState)) {
      return newState.currentValue;
    }
  }
}

private static State newStateFromDb() {
  int newValue = getFromDb(); // not idempotent
  return new State(newValue,newValue + 10);
}


private static class State {

  final int currentValue;
  final int maxValue;

  State(int currentValue,int maxValue) {
    this.currentValue = currentValue;
    this.maxValue = maxValue;
  }
}

修复之后,您接下来可能需要解决以下问题:

  • 如何防止多个并行getFromDb();(尤其是考虑到该方法是幂等的)
  • 当一个线程执行getFromDb();时,如何防止其他线程在while(true)循环内忙于自旋并消耗所有可用的cpu时间
  • 更多类似问题

解决这些问题中的每一个都可能会使您的代码变得越来越复杂。

所以,恕我直言,这几乎是不值得的 - 锁工作正常并保持代码简单。

,

您不能完全避免使用给定约束进行锁定:因为 (1) getFromDb() 返回的每个值都必须使用,并且 (2) 调用 getFromDb() 只允许在 maxValue 被达到,您需要确保对 getFromDb() 的调用互斥。

如果没有约束 (1) 或 (2),您可以使用乐观锁定:

  • 如果没有 (1),您可以允许多个线程同时调用 getFromDb() 并选择结果之一丢弃所有其他线程。

  • 如果没有 (2),您可以允许多个线程同时调用 getFromDb() 并选择结果之一。其他结果将“保存以备后用”。

,

解决这个问题的显而易见的方法是在 if 条件周围添加同步块

那是行不通的。让我试着解释一下。

当您遇到条件:if(latestValue == maxValue) { ... } 时,您希望以原子方式更新 maxValuecurrentValue。像这样:

latestValue = getNewValue();
currentValue.set(latestValue);

getNewValue 将从 DB 获取您的下一个起始值并更新 maxValue,但同时,您希望现在将 currentValue 设置为新的起始值.假设情况:

  • 您首先从数据库中读取 1。因此maxValue = 11currentValue = 1

  • 当你达到条件 if(latestValue == maxValue) 时,你想要去数据库获取新的起始位置(假设 21),但同时你希望每个线程现在从 21 开始。因此,您还必须设置 currentValue

现在的问题是,如果你在同步块下currentValue,例如:

if(latestValue == maxValue) {
   synchronized (lock) {
       latestValue = getNewValue();
       currentValue.set(latestValue);
   }
}

你还需要在同一个lock阅读,否则你就有种族了。最初我认为我可以更聪明一点,做一些类似的事情:

if(latestValue == maxValue) {
    synchronized (lock) {
       if(latestValue == maxValue) {
           latestValue = getNewValue();
           currentValue.set(latestValue);
       } else {
          continue;
       }
    }
}

以便所有等待 lock 的线程在释放锁时不会将先前写入的值覆盖为 maxValue。但这仍然是 race 并且会在其他地方引起问题,在不同的情况下,相当微不足道。例如:

  • ThreadA 执行 latestValue = getNewValue();,因此 maxValue == 21之前currentValue.set(latestValue);

  • ThreadB 读取 int latestValue = this.currentValue.get();,看到 11,当然这是假的:if(latestValue == maxValue) {,所以它可以写 12 ({{ 1}}) 到 nextValue。这破坏了整个算法。

我看不出有任何其他方法可以使 currentValue getNextValue 或以其他方式受互斥锁/自旋锁保护。

,

我真的看不到同步数据库调用的方法 - 除非多次调用数据库不是问题(即检索多个“新值”)。

要消除同步 getNextValue 方法的需要,您可以使用 BlockingQueue,这将消除原子更新 2 个变量的需要。如果你真的不想使用synchronize关键字,你可以使用一个标志,只让一个线程调用数据库。

它可能看起来像这样(看起来不错,但没有经过测试):

private final BlockingQueue<Integer> nextValues = new ArrayBlockingQueue<>(10);
private final AtomicBoolean updating = new AtomicBoolean();

public int getNextValue() {
  while (true) {
    Integer nextValue = nextValues.poll();
    if (nextValue != null) return nextValue;
    else getNewValues();
  }
}

private void getNewValues() {
  if (updating.compareAndSet(false,true)) {
    //we hold the "lock" to run the update
    if (!nextValues.isEmpty()) {
      updating.set(false);
      throw new IllegalStateException("nextValues should be empty here");
    }
    try {
      int newValue = getFromDb(); //not idempotent
      for (int i = 0; i < 10; i++) {
        nextValues.add(newValue + i);
      }
    } finally {
      updating.set(false);
    }
  }
}

但是正如其他评论中提到的,这里最昂贵的操作很可能是数据库调用,它保持同步,所以你最好同步所有内容并保持简单,在性能方面几乎没有差异。

,

getFromDb 命中数据库时,您确实需要一些锁定 - 其他线程应该阻塞而不是也去访问数据库或自旋。真的,如果你每 10 次迭代就这样做,你可能会同步很多。然而,这并不好玩。

任何合理的非微控制器平台都应该支持 AtomicLong 作为无锁。所以我们可以方便地将两个 int 打包成一个原子。

private final AtomicLong combinedValue;

public getNextValue() {
    for (;;) {
        long combined = combinedValue.get();
        int latestValue = (int)combined;
        int maxValue = (int)(combined>>32);

        int nextValue = latestValue + 1;

        long nextCombined = (newValue&0xffffffff) | (maxValue<<32)

        if (latestValue == maxValue) { 
            nextValue();
        } else if (currentValue.compareAndSet(combined,nextCombined)) {
            return latestValue;
        }
    }
}

private synchronized void nextValue() {
    // Yup,we need to double check with this locking.
    long combined = combinedValue.get();
    int latestValue = (int)combined;
    int maxValue = (int)(combined>>32);

    if (latestValue == maxValue) {
        int newValue = getFromDb(); //not idempotent
        int maxValue = newValue + 10;

        long nextCombined = (newValue&0xffffffff) | (maxValue<<32)

        combinedValue.set(nextCombined);
    }
}

内存分配的另一种方法是将两个值集中到一个对象中并使用 AtomicReference。但是,我们可以观察到值的变化比最大值更频繁,因此我们可以使用缓慢变化的对象和快速的偏移量。

private static record Segment(
    int maxValue,AtomicInteger currentValue
) {
}
private volatile Segment segment;

public getNextValue() {
    for (;;) {
        Segment segment = this.segment;
        int latestValue = segment.currentValue().get();
        int nextValue = latestValue + 1;

        if (latestValue == segment.maxValue()) { 
            nextValue();
        } else if (segment.currentValue().compareAndSet(
            latestValue,nextValue
        )) {
            return latestValue;
        }
    }
}

private synchronized void nextValue() {
    // Yup,we need to double check with this locking.
    Segment segment = this.segment;
    int latestValue = segment.currentValue().get();

    if (latestValue == segment.maxValue()) {
        int newValue = getFromDb(); //not idempotent
        int maxValue = newValue + 10;
        segment = new Segment(maxValue,new AtomicInteger(newValue));
    }
}

(标准免责声明:代码没有经过编译、测试或考虑太多。记录在编写 JDK 时需要一个相当新的。构造函数被省略。)

,

多么有趣的问题。正如其他人所说,您可以通过使用 synchronized 关键字来解决问题。

public synchronized int getNextValue() { ... }

但是因为您不想使用该关键字,同时又想避免竞争条件,所以这可能会有所帮助。虽然没有保证。并且请不要要求解释,我会把OutOfBrainException扔给你。

private volatile int maxValue;
private volatile boolean locked = false; //For clarity.
private AtomicInteger currentValue;
    
public int getNextValue() {
    int latestValue = this.currentValue.get();
    int nextValue = latestValue + 1;
        
    if(!locked && latestValue == maxValue) {
        locked = true; //Only one thread per time.
        latestValue = getNewValue();
        currentValue.set(latestValue);
        locked = false;
    }
    while(locked) { latestValue = 0; } //If a thread running in the previous if statement,we need this to buy some time.
    //We also need to reset "latestValue" so that when this thread runs the next loop,//it will guarantee to call AtomicInteger.get() for the updated value.
    while(!currentValue.compareAndSet(latestValue,nextValue)) {
        latestValue = this.currentValue.get();
        nextValue = latestValue + 1;
    }
    return nextValue;
}

或者你可以使用Atomic来对抗Atomic

private AtomicBoolean locked = new AtomicBoolean(false);
        
public int getNextValue() {
...
if(locked.compareAndSet(false,true)) { //Only one thread per time.
    if(latestValue == maxValue) {
        latestValue = getNewValue();
        currentValue.set(latestValue);
    }
    locked.set(false);
}
...
,

我想不出消除所有锁定的方法,因为潜在问题是从多个线程访问可变值。但是,可以对您提供的代码进行一些改进,基本上是利用这样一个事实,即当数据由多个线程读取时,除非必须进行写入,否则无需锁定读取,因此使用 Read/Write锁将减少争用。只有 1/10 次才会有“满”写锁

所以代码可以这样重写(把错误放在一边):

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Counter {

    private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock(true);
    private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
    private AtomicInteger currentValue;
    private AtomicInteger maxValue;

    public Counter() {
        int initialValue = getFromDb();
        this.currentValue = new AtomicInteger(initialValue);
        this.maxValue = new AtomicInteger(initialValue + 10);
    }

    public int getNextValue() {
        readLock.lock();
        while (true){
            int nextValue = currentValue.getAndIncrement();
            if(nextValue<maxValue.get()){
                readLock.unlock();
                return nextValue;
            }
            else {
                readLock.unlock();
                writeLock.lock();
                reload();
                readLock.lock();
                writeLock.unlock();
            }
        }
    }

    private void reload(){
        int newValue = getFromDb();
        if(newValue>maxValue.get()) {
            this.currentValue.set(newValue);
            this.maxValue.set(newValue + 10);
        }
    }

    private int getFromDb(){
        // your implementation
    }

}
,

您要解决的业务用例是什么? 下一个场景是否适合您:

  1. 根据数据库中的计数器要求创建 SQL sequence(基于您的数据库);
  2. 从数据库中批量获取计数器,例如 50-100 个 ID
  3. 在应用级别使用 50-100 后,再从 db 中获取 100 个值...

,

稍微修改过的 Tom Hawtin - tackline's answer 版本以及问题评论中 codeflush.dev 的建议

代码

我添加了一个工作版本的代码并模拟了一个基本的多线程环境。

免责声明:自行决定使用

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

class Seed {
    private static final int MSB = 32;
    private final int start;
    private final int end;
    private final long window;

    public Seed(int start,int end) {
        this.start = start;
        this.end = end;
        this.window = (((long) end) << MSB) | start;
    }

    public Seed(long window) {
        this.start = (int) window;
        this.end = (int) (window >> MSB);
        this.window = window;
    }

    public int getStart() {
        return start;
    }

    public int getEnd() {
        return end;
    }

    public long getWindow() {
        return window;
    }

    // this will not update the state,will only return the computed value
    public long computeNextInWindow() {
        return window + 1;
    }
}

// a mock external seed service to abstract the seed generation and window logic
class SeedService {

    private static final int SEED_INIT = 1;
    private static final AtomicInteger SEED = new AtomicInteger(SEED_INIT);
    private static final int SEQ_LENGTH = 10;

    private static final int JITTER_FACTOR = 5;
    private final boolean canAddRandomJitterToSeed;
    private final Random random;

    public SeedService(boolean canJitterSeed) {
        this.canAddRandomJitterToSeed = canJitterSeed;
        this.random = new Random();
    }

    public int getSeqLengthForTest() {
        return SEQ_LENGTH;
    }

    public Seed getDefaultWindow() {
        return new Seed(1,1);
    }

    public Seed getNextWindow() {
        int offset = SEQ_LENGTH;

        // trying to simulate multiple machines with interleaved start seed
        if (canAddRandomJitterToSeed) {
            offset += random.nextInt(JITTER_FACTOR) * SEQ_LENGTH;
        }

        final int start = SEED.getAndAdd(offset);
        return new Seed(start,start + SEQ_LENGTH);
    }

    // helper to validate generated ids
    public boolean validate(List<Integer> ids) {
        Collections.sort(ids);
        // unique check
        if (ids.size() != new HashSet<>(ids).size()) {
            return false;
        }

        for (int startIndex = 0; startIndex < ids.size(); startIndex += SEQ_LENGTH) {
            if (!checkSequence(ids,startIndex)) {
                return false;
            }
        }
        return true;
    }

    // checks a sequence
    // relies on 'main' methods usage of SEQ_LENGTH
    protected boolean checkSequence(List<Integer> ids,int startIndex) {
        final int startRange = ids.get(startIndex);
        return IntStream.range(startRange,startRange + SEQ_LENGTH).boxed()
            .collect(Collectors.toList())
            .containsAll(ids.subList(startIndex,startIndex + SEQ_LENGTH));
    }

    public void shutdown() {
        SEED.set(SEED_INIT);
        System.out.println("See you soon!!!");
    }
}

class SequenceGenerator {

    private final SeedService seedService;
    private final AtomicLong currentWindow;

    public SequenceGenerator(SeedService seedService) {
        this.seedService = seedService;

        // initialize currentWindow using seedService
        // best to initialize to an old window so that every instance of SequenceGenerator
        // will lazy load from seedService during the first getNext() call
        currentWindow = new AtomicLong(seedService.getDefaultWindow().getWindow());
    }

    public synchronized boolean requestSeed() {
        Seed seed = new Seed(currentWindow.get());
        if (seed.getStart() == seed.getEnd()) {
            final Seed nextSeed = seedService.getNextWindow();
            currentWindow.set(nextSeed.getWindow());
            return true;
        }
        return false;
    }

    public int getNext() {
        while (true) {
            // get current window
            Seed seed = new Seed(currentWindow.get());

            // exhausted and need to seed again
            if (seed.getStart() == seed.getEnd()) {
                // this will loop at least one more time to return value
                requestSeed();
            } else if (currentWindow.compareAndSet(seed.getWindow(),seed.computeNextInWindow())) {
                // successfully incremented value for next call. so return current value

                return seed.getStart();
            }
        }
    }
}

public class SequenceGeneratorTest {

    public static void test(boolean canJitterSeed) throws Exception {
        // just some random multithreaded invocation

        final int EXECUTOR_THREAD_COUNT = 10;
        final Random random = new Random();
        final int INSTANCES = 500;
        final SeedService seedService = new SeedService(canJitterSeed);
        final int randomRps = 500;
        final int seqLength = seedService.getSeqLengthForTest();

        ExecutorService executorService = Executors.newFixedThreadPool(EXECUTOR_THREAD_COUNT);
        Callable<List<Integer>> callable = () -> {
            final SequenceGenerator generator = new SequenceGenerator(seedService);
            int rps = (1 + random.nextInt(randomRps)) * seqLength;
            return IntStream.range(0,rps).parallel().mapToObj(i -> generator.getNext())
                .collect(Collectors.toList());
        };

        List<Future<List<Integer>>> futures = IntStream.range(0,INSTANCES).parallel()
            .mapToObj(i -> executorService.submit(callable))
            .collect(Collectors.toList());

        List<Integer> ids = new ArrayList<>();
        for (Future<List<Integer>> f : futures) {
            ids.addAll(f.get());
        }

        executorService.shutdown();

        // validate generated ids for correctness
        if (!seedService.validate(ids)) {
            throw new IllegalStateException();
        }

        seedService.shutdown();

        // summary
        System.out.println("count: " + ids.size() + ",unique count: " + new HashSet<>(ids).size());
        Collections.sort(ids);
        System.out.println("min id: " + ids.get(0) + ",max id: " + ids.get(ids.size() - 1));
    }

    public static void main(String[] args) throws Exception {
        test(true);
        System.out.println("Note: ids can be interleaved. if continuous sequence is needed,initialize SeedService with canJitterSeed=false");
        final String ruler = Collections.nCopies( 50,"-" ).stream().collect( Collectors.joining());
        System.out.println(ruler);

        test(false);
        System.out.println("Thank you!!!");
        System.out.println(ruler);
    }
}

,

稍微修改过的 user15102975 的 answer 版本,没有 while 循环和 getFromDb() 模拟实现。

/**
 * Lock free sequence counter implementation
 */
public class LockFreeSequenceCounter {

    private static final int BATCH_SIZE = 10;
    private final AtomicReference<Sequence> currentSequence;
    private final ConcurrentLinkedQueue<Integer> databaseSequenceQueue;

    public LockFreeSequenceCounter() {
        this.currentSequence = new AtomicReference<>(new Sequence(0,0));
        this.databaseSequenceQueue = new ConcurrentLinkedQueue<>();
    }

    /**
     * Get next unique id (threadsafe)
     */
    public int getNextValue() {
        return currentSequence.updateAndGet((old) -> old.next(this)).currentValue;
    }

    /**
     * Immutable class to handle current and max value
     */
    private static final class Sequence {
        private final int currentValue;
        private final int maxValue;

        public Sequence(int currentValue,int maxValue) {
            this.currentValue = currentValue;
            this.maxValue = maxValue;
        }

        public Sequence next(LockFreeSequenceCounter counter){
            return isMaxReached() ? fetchDB(counter) : inc();
        }

        private boolean isMaxReached(){
            return currentValue == maxValue;
        }

        private Sequence inc(){
            return new Sequence(this.currentValue + 1,this.maxValue);
        }

        private Sequence fetchDB(LockFreeSequenceCounter counter){
            counter.databaseSequenceQueue.add(counter.getFromDb());
            int newValue = counter.databaseSequenceQueue.poll();
            int maxValue = newValue + BATCH_SIZE -1;
            return new Sequence(newValue,maxValue);
        }
    }

    /**
     * Get unique id from db (mocked)
     * return on call #1: 1
     * return on call #2: 11
     * return on call #3: 31
     * Note: this function is not idempotent
     */
    private int getFromDb() {
        if (dbSequencer.get() == 21){
            return dbSequencer.addAndGet(BATCH_SIZE);
        } else{
            return dbSequencer.getAndAdd(BATCH_SIZE);
        }
    }
    private final AtomicInteger dbSequencer = new AtomicInteger(1);
}