在第一个线程锁定到第一个任务时的多个锁中,如何使第二个线程不闲置而是锁定到下一个任务?

问题描述

处理器类 -

public class Processor extends Thread {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    private void doJob1() {
        synchronized (lock1) {
            System.out.println(Thread.currentThread().getName() + " doing job1");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " completed job1");
        }
    }

    private void doJob2() {
        synchronized (lock2) {
            System.out.println(Thread.currentThread().getName() + " doing job2");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " completed job2");
        }
    }

    public void run() {
        doJob1();
        doJob2();
    }
}

主要方法-

    final Processor processor1 = new Processor();
    final Processor processor2 = new Processor();

    processor1.start();
    processor2.start();

第一次运行时,Thread-0 或 Thread-1 获取 job1() 的锁,另一个空闲 5 秒。

一个线程释放 job1() 上的锁并锁定 job2() 后,第二个线程获取 job1() 上的锁。

我希望第二个线程不应该闲置,因为 job1() 被第一个线程锁定,它应该锁定 job2(),然后锁定 job1()。

怎么做?

注意:这是基本的蓝图。实际上,即使有 100 个任务和 5 个线程,我也希望我的代码能够工作。

解决方法

我想,您正在寻找一种方法来“尽可能获取锁,如果它不是免费的就做其他事情”

您可以使用 ReentrantLock#tryLock 来完成。

方法:

仅在调用时未被其他线程持有时才获取锁。

它返回:

true 如果锁是空闲的并且被当前线程获取,或者锁已经被当前线程持有;和 false 否则

这是问题代码的修改版本:

  • 如果第一个任务的锁是空闲的,一个线程将运行第一个任务
  • 否则它会在第二个之后运行

Processor.java

public class Processor extends Thread {
    private static final Lock lock1 = new ReentrantLock();
    private static final Lock lock2 = new ReentrantLock();

    @SneakyThrows
    private void doJob1() {
        System.out.println(Thread.currentThread().getName() + " doing job1");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + " completed job1");
    }

    @SneakyThrows
    private void doJob2() {
        System.out.println(Thread.currentThread().getName() + " doing job2");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + " completed job2");
    }

    public void run() {
        boolean executedFirst = false;
        if (lock1.tryLock()) {
            try {
                doJob1();
                executedFirst = true;
            } finally {
                lock1.unlock();
            }
        }
        try {
            lock2.lock();
            doJob2();
        } finally {
            lock2.unlock();
        }

        if (!executedFirst) {
            try {
                lock1.lock();
                doJob1();
            } finally {
                lock1.unlock();
            }
        }
    }

    public static void main(String[] args) {
        new Processor().start();
        new Processor().start();
    }
}

示例输出:

Thread-1 doing job2
Thread-0 doing job1
Thread-0 completed job1
Thread-1 completed job2
Thread-1 doing job1
Thread-0 doing job2
Thread-1 completed job1
Thread-0 completed job2

请注意,lock/tryLockunlock 调用被包围在 try/finally


进入篮筐和球:

Color.java

public enum Color {
    RED,GREEN,BLUE;

    public static Color fromOrdinal(int i) {
        for (Color value : values()) {
            if (value.ordinal() == i) {
                return value;
            }
        }
        throw new IllegalStateException("Unknown ordinal = " + i);
    }
}

Basket.java

@Data(staticConstructor = "of")
public class Basket {
    private final Color color;
    // balls that this basket has
    private final List<Ball> balls = new ArrayList<>();
    private final Lock lock = new ReentrantLock();
}

Ball.java

@Value(staticConstructor = "of")
public class Ball {
    Color color;
}

Boy.java

  • 每个男孩捡一个球 (queue.poll())
  • 跑到篮子里(baskets.get(color) 同色)
  • 根据篮子被占用时的行为,他:
    • 将球扔掉并再次尝试(代码中的选项 a
    • 等待篮子被释放(选项 b
  • 请注意,使用选项 a 时,某个 boy 可能会在另一个扔掉 ball 并且还没有人捡起它时终止(无论如何,有人会捡起来放进篮子里)
@RequiredArgsConstructor
public class Boy implements Runnable {
    private final Map<Color,Basket> baskets;
    private final Queue<Ball> balls;

    @Override
    public void run() {
        Ball ball;
        while ((ball = balls.poll()) != null) {
            Color color = ball.getColor();
            Basket basket = baskets.get(color);
            // a
            if (basket.getLock().tryLock()) {
                try {
                    basket.getBalls().add(ball);
                } finally {
                    basket.getLock().unlock();
                }
            } else {
                balls.offer(ball);
            }

            // b
            /*
            try {
                basket.getLock().lock();
                basket.getBalls().add(ball);
            } finally {
                basket.getLock().unlock();
            }
             */
        }
    }
}

最后main

Queue<Ball> balls = new LinkedBlockingQueue<>();
ThreadLocalRandom.current().ints(0,3)
        .mapToObj(Color::fromOrdinal)
        .map(Ball::of)
        .limit(1000)
        .forEach(balls::add);
Map<Color,Basket> baskets = Map.of(
        Color.RED,Basket.of(Color.RED),Color.GREEN,Basket.of(Color.GREEN),Color.BLUE,Basket.of(Color.BLUE)
);
List<Thread> threads = IntStream.range(0,100)
        .mapToObj(ignore -> new Boy(baskets,balls))
        .map(Thread::new)
        .collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
    thread.join();
}

baskets.forEach((color,basket) -> System.out.println("There are "
        + basket.getBalls().size() + " ball(-s) in " + color + " basket"));

输出示例:

There are 331 ball(-s) in GREEN basket
There are 330 ball(-s) in BLUE basket
There are 339 ball(-s) in RED basket
,

这里有一个稍微复杂的例子,它有一个作业对象和一个指示作业是否已执行的条件变量,以及包装器如何将 ReentrantLock 适配到 try-with-resources 语句的例子.

/**
 * A Job represents a unit of work that needs to be performed once and
 * depends upon a lock which it must hold while the work is performed.
 */
public class Job {
    private final Runnable job;
    private final ReentrantLock lock;
    private boolean hasRun;

    public Job(Runnable job,ReentrantLock lock) {
        this.job = Objects.requireNonNull(job);
        this.lock = Objects.requireNonNull(lock);
        this.hasRun = false;
    }

    /**
     * @returns true if the job has already been run
     */
    public boolean hasRun() {
        return hasRun;
    }

    // this is just to make the test in Processor more readable
    public boolean hasNotRun() {
        return !hasRun;
    }

    /**
     * Tries to perform the job,returning immediately if the job has
     * already been performed or the lock cannot be obtained.
     *
     * @returns true if the job was performed on this invocation
     */
    public boolean tryPerform() {
        if (hasRun) {
            return false;
        }
        try (TryLocker locker = new TryLocker(lock)) {
            if (locker.isLocked()) {
                job.run();
                hasRun = true;
            }
        }
        return hasRun;
    }
}

/**
 * A Locker is an AutoCloseable wrapper around a ReentrantLock.
 */
public class Locker implements AutoCloseable {
    private final ReentrantLock lock;

    public Locker(final ReentrantLock lock) {
        this.lock = lock;
        lock.lock();
    }

    @Override
    public void close() {
        lock.unlock();
    }
}

/**
 * A TryLocker is an AutoCloseable wrapper around a ReentrantLock that calls
 * its tryLock() method and provides a way to test whether than succeeded.
 */
public class TryLocker implements AutoCloseable {
    private final ReentrantLock lock;

    public TryLocker(final ReentrantLock lock) {
        this.lock = lock.tryLock() ?  lock : null;
    }

    public boolean isLocked() {
        return lock != null;
    }

    @Override
    public void close() {
        if (isLocked()) {
            lock.unlock();
        }
    }
}

/**
 * A modified version of the Processor class from the question.
 */
public class Processor extends Thread {
    private static final ReentrantLock lock1 = new ReentrantLock();
    private static final ReentrantLock lock2 = new ReentrantLock();

    private void snooze(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void doJob1() {
        System.out.println(Thread.currentThread().getName() + " doing job1");
        snooze(5000);
        System.out.println(Thread.currentThread().getName() + " completed job1");
    }

    private void doJob2() {
        System.out.println(Thread.currentThread().getName() + " doing job2");
        snooze(5000);
        System.out.println(Thread.currentThread().getName() + " completed job2");
    }

    public void run() {
        Job job1 = new Job(() -> doJob1(),lock1);
        Job job2 = new Job(() -> doJob2(),lock2);

        List<Job> jobs = List.of(job1,job2);

        while (jobs.stream().anyMatch(Job::hasNotRun)) {
            jobs.forEach(Job::tryPerform);
        }
    }

    public static void main(String[] args) {
        final Processor processor1 = new Processor();
        final Processor processor2 = new Processor();

        processor1.start();
        processor2.start();
    }
}

一些注意事项:

  • run() 中的 Processor 方法现在泛化为 n 个作业列表。当任何作业尚未执行时,它会尝试执行它们,并在所有作业完成后完成。
  • TryLocker 类是 AutoCloseable,因此 Job 中的锁定和解锁可以通过在 try-with-resources 中创建它的实例来完成声明。
  • 此处未使用 Locker 类,但演示了如何对阻塞 lock() 调用而不是 tryLock() 调用执行相同的操作。
  • 如果需要,
  • TryLocker 也可以花费一段时间并调用 tryLock 的重载,该重载等待一段时间后放弃;该修改留给读者作为练习。
  • hasNotRun()Job 方法只是为了使 anyMatch(Job::hasNotRun)run()Processor 方法中更具可读性;它可能不会减轻它的重量,可以省去。
  • locker 类不会使用 Objects.requireNonNull 检查传入的锁是否为空;他们通过调用一个方法来立即使用它,所以如果它为空,他们仍然会抛出一个 NPE,但放置一个明确的 requireNonNull 可能会让他们更清楚。
  • locker 类不会在调用 ReentrantLock 之前检查它们是否已经解锁了 unlock() 以使它们具有幂等性;在这种情况下,他们会抛出一个 IllegalMonitorStateException。过去我写了一个带有标志变量的变体来避免这种情况,但由于目的是在 try-with-resources 语句中使用它们,该语句只会调用 {{1} } 方法一次,我觉得如果有人手动调用close方法让他们炸掉比较好。