应该如何处理线程锁定,以使其他线程等待下载文件,然后让所有线程一次读取文件

问题描述

我正在使用ExecutorService fixedThreadPool()运行任务

这里的任务定义为从特定的URL下载文件,如果文件不存在则保存到数据库中,或者仅从数据库中读取文件。因此,这更像是一个读取器-写入器问题,其中执行者线程池的任何线程都可以一次充当写入器,而其他线程将成为后续请求的读取器。

我正在使用Semaphore来执行此操作,但是这种方法的问题是随后的读取请求是顺序发生的。

如果要使用4个任务击相同的URL,我需要进行同步,直到下载文件并释放信号量为止,即,在4个线程中,任何人都可以获取锁,其余3个正在等待。下载完成后,其余所有3个线程应同时读取下载的文件。但是,就我而言,这最后一步是相继发生的,这也会对项目绩效产生影响。

说了上述用例之后,下面是我的示例代码:

以下Runnable传递给ExecutorService以对SharedObject类执行任务。

 class DownloadRunnable(SharedObjectMT sharedObject,String url) implement Runnable {
    void run() {
        sharedObject.loadFile(url);
    }
 }
class SharedObjectMT {
    // This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
        // multiple threads requesting for the same URL will only be synchronized on their
        // corresponding semaphore. And threads requesting for different URLs 
        // will run concurrently.

    private static HashMap<String,Semaphore> syncMap = new HashMap<>();
    .....
    void loadFile(String url) {
        
        // Let all threads enter sequentially and try to assign a Semaphore for their url in the 
        // hashmap. If the url has never been requested,then only a Semaphore(say S1) will be 
        // assigned to that url. And for all the other threads with *same request url*,this 
        // Semaphore(S1) will be used to handle concurrency.

        synchronized(syncMap) {
             if(syncMap[url] == null) {
                syncMap[url] = new Semaphore(1);
            }
        }
        
        Semaphore semaphore = syncMap[url];

        synchronized(semaphore) {
            ............
            ............
            semaphore.acquire();
            String filePath = findInDatabase(url);
            if(filePath != null) {
                semaphore.release(); // no need to hold semaphore since file already downloaded.
                printStatus("Already downloaded file from url = "+url);
            } else {
                // This DownloadThread is actually a mock of my real project where a third-party 
                // library uses a thread to download the file.

                DownloadThread(() -> {
                    printStatus("Download completed for url= "+ url +". Releasing semaphore.");
                    semaphore.release();
                }).start();
                .............
                .............
            }
        }
    }
}

我知道一个信号量无法帮助我。也许我们可以再使用1个信号量来区分读写锁定或任何其他锁定机制。因此,对于这种类型的一次性同步需要使用什么帮助。

注意:由于实际项目在Kotlin中,因此如果您在上面的代码中发现任何语法错误,请忽略,但这是一个基本的Java多线程问题,因此我将其发布为Java代码。

解决方法

我不确定Kotlin,但是我可以用Java进行演示:

import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class DownloadOrRead {
        
    //Utility method,which just generates a random String instance...
    private static String randomString(final int length) {
        String alphabet = "abcdefghijklmnopqrstuvwxyz";
        alphabet += alphabet.toUpperCase();
        alphabet += "0123456789";
        final int alphabetSize = alphabet.length();
        final char[] chars = new char[length];
        final Random rand = new Random();
        for (int i = 0; i < chars.length; ++i)
            chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
        return String.valueOf(chars);
    }
    
    public static class DownLoadCallable implements Callable<String> {
        private final String url;
        
        public DownLoadCallable(final String url) {
            this.url = Objects.requireNonNull(url);
        }
        
        @Override
        public String call() throws IOException,InterruptedException {
            
            /*Utilize url property here to download the file...
            In our case,just simulate a download delay supposedly...*/
            Thread.sleep(5000L + (long) (Math.random() * 10000L));
            
            //Return the file's local path...
            return randomString(20); //In our case,a random String of 20 characters.
        }
    }
    
    //This is the method you are looking for:
    public static String loadPath(final ExecutorService executorService,//Can be shared between calls of loadPath...
                                  final HashMap<String,Future<String>> urlToFuture,//MUST be shared between calls of loadPath!
                                  final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
            throws InterruptedException,ExecutionException {
        final Future<String> future;
        synchronized (urlToFuture) {
            if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
                urlToFuture.put(url,executorService.submit(new DownLoadCallable(url))); //Create a Future...
            future = urlToFuture.get(url); //Obtain the Future (new or old).
        }
        return future.get(); //Outside the synchronized block!
    }
    
    public static void main(final String[] args) {
        
        System.out.println("Creating ExecutorService...");
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        System.out.println("Creating shared map...");
        final HashMap<String,Future<String>> urlToFuture = new HashMap<>();
        
        System.out.println("Creating random URLs...");
        final String[] urls = new String[]{randomString(10),randomString(20),randomString(15)};
        
        try {
            System.out.println("Downloading files sequencially...");
            final Random rand = new Random();
            for (int i = 0; i < 100; ++i)
                System.out.println(loadPath(executorService,urlToFuture,urls[rand.nextInt(urls.length)]));
            
            executorService.shutdown();
            executorService.awaitTermination(10,TimeUnit.MINUTES);
        }
        catch (final InterruptedException | ExecutionException x) {
            System.err.println(x);
        }
    }
}

整个想法是向Callable提交ExecutorService来处理下载。我们还利用Future方法返回的submit来获得所需的结果路径/文件/任何内容。只需在所需的get对象上调用Future就可以了。唯一需要同步的是Map的URL Future

您会注意到,在运行该测试程序时,第一个文件将被阻止直到下载,然后对同一URL的后续调用将立即完成(因为该URL已下载),而我们仅针对每个新URL进行阻止(尚未下载)。在这种情况下,我只使用3个随机URL,每个URL需要5到15秒才能完成,这使我们大约有15到45秒的正常运行时间,因为我们顺序下载它们。

到此结束loadPath方法。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个Thread来下载,则可以从许多loadPath中调用Thread(不需要共享Map之外的其他地方进行进一步的同步)。>

正如人们可以读到this answer here一样,似乎在操作完成后调用同一get的{​​{1}}方法,将总是产生相同的对象或抛出相同的{{ 1}}如果失败。在本文中提供的代码中,这是我们利用的优势。

编辑1:

甚至更好的是,如@drekbour在评论中指出的那样,为工作使用FutureException,就像这样:

computeIfAbsent

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...