问题描述
我正在使用ExecutorService fixedThreadPool()运行任务。
这里的任务定义为从特定的URL下载文件,如果文件不存在则保存到数据库中,或者仅从数据库中读取文件。因此,这更像是一个读取器-写入器问题,其中执行者线程池的任何线程都可以一次充当写入器,而其他线程将成为后续请求的读取器。
我正在使用Semaphore来执行此操作,但是这种方法的问题是随后的读取请求是顺序发生的。
如果要使用4个任务击相同的URL,我需要进行同步,直到下载文件并释放信号量为止,即,在4个线程中,任何人都可以获取锁,其余3个正在等待。下载完成后,其余所有3个线程应同时读取下载的文件。但是,就我而言,这最后一步是相继发生的,这也会对项目绩效产生影响。
说了上述用例之后,下面是我的示例代码:
以下Runnable传递给ExecutorService以对SharedObject类执行任务。
class DownloadRunnable(SharedObjectMT sharedObject,String url) implement Runnable {
void run() {
sharedObject.loadFile(url);
}
}
class SharedObjectMT {
// This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
// multiple threads requesting for the same URL will only be synchronized on their
// corresponding semaphore. And threads requesting for different URLs
// will run concurrently.
private static HashMap<String,Semaphore> syncMap = new HashMap<>();
.....
void loadFile(String url) {
// Let all threads enter sequentially and try to assign a Semaphore for their url in the
// hashmap. If the url has never been requested,then only a Semaphore(say S1) will be
// assigned to that url. And for all the other threads with *same request url*,this
// Semaphore(S1) will be used to handle concurrency.
synchronized(syncMap) {
if(syncMap[url] == null) {
syncMap[url] = new Semaphore(1);
}
}
Semaphore semaphore = syncMap[url];
synchronized(semaphore) {
............
............
semaphore.acquire();
String filePath = findInDatabase(url);
if(filePath != null) {
semaphore.release(); // no need to hold semaphore since file already downloaded.
printStatus("Already downloaded file from url = "+url);
} else {
// This DownloadThread is actually a mock of my real project where a third-party
// library uses a thread to download the file.
DownloadThread(() -> {
printStatus("Download completed for url= "+ url +". Releasing semaphore.");
semaphore.release();
}).start();
.............
.............
}
}
}
}
我知道一个信号量无法帮助我。也许我们可以再使用1个信号量来区分读写锁定或任何其他锁定机制。因此,对于这种类型的一次性同步需要使用什么帮助。
注意:由于实际项目在Kotlin中,因此如果您在上面的代码中发现任何语法错误,请忽略,但这是一个基本的Java多线程问题,因此我将其发布为Java代码。
解决方法
我不确定Kotlin,但是我可以用Java进行演示:
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead {
//Utility method,which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws IOException,InterruptedException {
/*Utilize url property here to download the file...
In our case,just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
//Return the file's local path...
return randomString(20); //In our case,a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService,//Can be shared between calls of loadPath...
final HashMap<String,Future<String>> urlToFuture,//MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException,ExecutionException {
final Future<String> future;
synchronized (urlToFuture) {
if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
urlToFuture.put(url,executorService.submit(new DownLoadCallable(url))); //Create a Future...
future = urlToFuture.get(url); //Obtain the Future (new or old).
}
return future.get(); //Outside the synchronized block!
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared map...");
final HashMap<String,Future<String>> urlToFuture = new HashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10),randomString(20),randomString(15)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i)
System.out.println(loadPath(executorService,urlToFuture,urls[rand.nextInt(urls.length)]));
executorService.shutdown();
executorService.awaitTermination(10,TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
整个想法是向Callable
提交ExecutorService
来处理下载。我们还利用Future
方法返回的submit
来获得所需的结果路径/文件/任何内容。只需在所需的get
对象上调用Future
就可以了。唯一需要同步的是Map
的URL Future
。
您会注意到,在运行该测试程序时,第一个文件将被阻止直到下载,然后对同一URL的后续调用将立即完成(因为该URL已下载),而我们仅针对每个新URL进行阻止(尚未下载)。在这种情况下,我只使用3个随机URL,每个URL需要5到15秒才能完成,这使我们大约有15到45秒的正常运行时间,因为我们顺序下载它们。
到此结束loadPath
方法。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个Thread
来下载,则可以从许多loadPath
中调用Thread
(不需要共享Map
之外的其他地方进行进一步的同步)。>
正如人们可以读到this answer here一样,似乎在操作完成后调用同一get
的{{1}}方法,将总是产生相同的对象或抛出相同的{{ 1}}如果失败。在本文中提供的代码中,这是我们利用的优势。
编辑1:
甚至更好的是,如@drekbour在评论中指出的那样,为工作使用Future
和Exception
,就像这样:
computeIfAbsent