问题描述
处理器类 -
public class Processor extends Thread {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
private void doJob1() {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
}
private void doJob2() {
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
}
public void run() {
doJob1();
doJob2();
}
}
主要方法-
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
第一次运行时,Thread-0 或 Thread-1 获取 job1() 的锁,另一个空闲 5 秒。
第一个线程释放 job1() 上的锁并锁定 job2() 后,第二个线程获取 job1() 上的锁。
我希望第二个线程不应该闲置,因为 job1() 被第一个线程锁定,它应该锁定 job2(),然后锁定 job1()。
怎么做?
注意:这是基本的蓝图。实际上,即使有 100 个任务和 5 个线程,我也希望我的代码能够工作。
解决方法
我想,您正在寻找一种方法来“尽可能获取锁,如果它不是免费的就做其他事情”。
您可以使用 ReentrantLock#tryLock
来完成。
方法:
仅在调用时未被其他线程持有时才获取锁。
它返回:
true
如果锁是空闲的并且被当前线程获取,或者锁已经被当前线程持有;和 false
否则
这是问题代码的修改版本:
- 如果第一个任务的锁是空闲的,一个线程将运行第一个任务
- 否则它会在第二个之后运行
Processor.java
:
public class Processor extends Thread {
private static final Lock lock1 = new ReentrantLock();
private static final Lock lock2 = new ReentrantLock();
@SneakyThrows
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
@SneakyThrows
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
boolean executedFirst = false;
if (lock1.tryLock()) {
try {
doJob1();
executedFirst = true;
} finally {
lock1.unlock();
}
}
try {
lock2.lock();
doJob2();
} finally {
lock2.unlock();
}
if (!executedFirst) {
try {
lock1.lock();
doJob1();
} finally {
lock1.unlock();
}
}
}
public static void main(String[] args) {
new Processor().start();
new Processor().start();
}
}
示例输出:
Thread-1 doing job2
Thread-0 doing job1
Thread-0 completed job1
Thread-1 completed job2
Thread-1 doing job1
Thread-0 doing job2
Thread-1 completed job1
Thread-0 completed job2
请注意,lock
/tryLock
和 unlock
调用被包围在 try
/finally
进入篮筐和球:
Color.java
:
public enum Color {
RED,GREEN,BLUE;
public static Color fromOrdinal(int i) {
for (Color value : values()) {
if (value.ordinal() == i) {
return value;
}
}
throw new IllegalStateException("Unknown ordinal = " + i);
}
}
Basket.java
:
@Data(staticConstructor = "of")
public class Basket {
private final Color color;
// balls that this basket has
private final List<Ball> balls = new ArrayList<>();
private final Lock lock = new ReentrantLock();
}
Ball.java
@Value(staticConstructor = "of")
public class Ball {
Color color;
}
Boy.java
- 每个男孩捡一个球 (
queue.poll()
) - 跑到篮子里(
baskets.get(color)
同色) - 根据篮子被占用时的行为,他:
- 将球扔掉并再次尝试(代码中的选项
a
) - 等待篮子被释放(选项
b
)
- 将球扔掉并再次尝试(代码中的选项
- 请注意,使用选项
a
时,某个boy
可能会在另一个扔掉ball
并且还没有人捡起它时终止(无论如何,有人会捡起来放进篮子里)
@RequiredArgsConstructor
public class Boy implements Runnable {
private final Map<Color,Basket> baskets;
private final Queue<Ball> balls;
@Override
public void run() {
Ball ball;
while ((ball = balls.poll()) != null) {
Color color = ball.getColor();
Basket basket = baskets.get(color);
// a
if (basket.getLock().tryLock()) {
try {
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
} else {
balls.offer(ball);
}
// b
/*
try {
basket.getLock().lock();
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
*/
}
}
}
最后main
:
Queue<Ball> balls = new LinkedBlockingQueue<>();
ThreadLocalRandom.current().ints(0,3)
.mapToObj(Color::fromOrdinal)
.map(Ball::of)
.limit(1000)
.forEach(balls::add);
Map<Color,Basket> baskets = Map.of(
Color.RED,Basket.of(Color.RED),Color.GREEN,Basket.of(Color.GREEN),Color.BLUE,Basket.of(Color.BLUE)
);
List<Thread> threads = IntStream.range(0,100)
.mapToObj(ignore -> new Boy(baskets,balls))
.map(Thread::new)
.collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
baskets.forEach((color,basket) -> System.out.println("There are "
+ basket.getBalls().size() + " ball(-s) in " + color + " basket"));
输出示例:
There are 331 ball(-s) in GREEN basket
There are 330 ball(-s) in BLUE basket
There are 339 ball(-s) in RED basket
,
这里有一个稍微复杂的例子,它有一个作业对象和一个指示作业是否已执行的条件变量,以及包装器如何将 ReentrantLock
适配到 try-with-resources 语句的例子.
/**
* A Job represents a unit of work that needs to be performed once and
* depends upon a lock which it must hold while the work is performed.
*/
public class Job {
private final Runnable job;
private final ReentrantLock lock;
private boolean hasRun;
public Job(Runnable job,ReentrantLock lock) {
this.job = Objects.requireNonNull(job);
this.lock = Objects.requireNonNull(lock);
this.hasRun = false;
}
/**
* @returns true if the job has already been run
*/
public boolean hasRun() {
return hasRun;
}
// this is just to make the test in Processor more readable
public boolean hasNotRun() {
return !hasRun;
}
/**
* Tries to perform the job,returning immediately if the job has
* already been performed or the lock cannot be obtained.
*
* @returns true if the job was performed on this invocation
*/
public boolean tryPerform() {
if (hasRun) {
return false;
}
try (TryLocker locker = new TryLocker(lock)) {
if (locker.isLocked()) {
job.run();
hasRun = true;
}
}
return hasRun;
}
}
/**
* A Locker is an AutoCloseable wrapper around a ReentrantLock.
*/
public class Locker implements AutoCloseable {
private final ReentrantLock lock;
public Locker(final ReentrantLock lock) {
this.lock = lock;
lock.lock();
}
@Override
public void close() {
lock.unlock();
}
}
/**
* A TryLocker is an AutoCloseable wrapper around a ReentrantLock that calls
* its tryLock() method and provides a way to test whether than succeeded.
*/
public class TryLocker implements AutoCloseable {
private final ReentrantLock lock;
public TryLocker(final ReentrantLock lock) {
this.lock = lock.tryLock() ? lock : null;
}
public boolean isLocked() {
return lock != null;
}
@Override
public void close() {
if (isLocked()) {
lock.unlock();
}
}
}
/**
* A modified version of the Processor class from the question.
*/
public class Processor extends Thread {
private static final ReentrantLock lock1 = new ReentrantLock();
private static final ReentrantLock lock2 = new ReentrantLock();
private void snooze(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
Job job1 = new Job(() -> doJob1(),lock1);
Job job2 = new Job(() -> doJob2(),lock2);
List<Job> jobs = List.of(job1,job2);
while (jobs.stream().anyMatch(Job::hasNotRun)) {
jobs.forEach(Job::tryPerform);
}
}
public static void main(String[] args) {
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
}
}
一些注意事项:
-
run()
中的Processor
方法现在泛化为 n 个作业列表。当任何作业尚未执行时,它会尝试执行它们,并在所有作业完成后完成。 -
TryLocker
类是AutoCloseable
,因此Job
中的锁定和解锁可以通过在 try-with-resources 中创建它的实例来完成声明。 - 此处未使用
Locker
类,但演示了如何对阻塞lock()
调用而不是tryLock()
调用执行相同的操作。
如果需要, -
TryLocker
也可以花费一段时间并调用tryLock
的重载,该重载等待一段时间后放弃;该修改留给读者作为练习。 -
hasNotRun()
的Job
方法只是为了使anyMatch(Job::hasNotRun)
在run()
的Processor
方法中更具可读性;它可能不会减轻它的重量,可以省去。 - locker 类不会使用
Objects.requireNonNull
检查传入的锁是否为空;他们通过调用一个方法来立即使用它,所以如果它为空,他们仍然会抛出一个 NPE,但放置一个明确的 requireNonNull 可能会让他们更清楚。 - locker 类不会在调用
ReentrantLock
之前检查它们是否已经解锁了unlock()
以使它们具有幂等性;在这种情况下,他们会抛出一个IllegalMonitorStateException
。过去我写了一个带有标志变量的变体来避免这种情况,但由于目的是在 try-with-resources 语句中使用它们,该语句只会调用 {{1} } 方法一次,我觉得如果有人手动调用close方法让他们炸掉比较好。