为客户端发送的每个文件套接字创建新的线程

问题描述

我正在处理带有套接字的应用程序。客户端正在监视特定目录,并且当用户在其中添加文件时,客户端正在通过套接字将此文件发送到服务器端的特定目录。 现在,我已经实现了信号量,可以将许多文件同时发送到服务器。 这是我的问题开始的地方。我正在使用while循环,当客户端尝试发送一些文件时,服务器正在创建它可以的所有线程。在这种情况下4,因此:

static Semaphore semaphore = new Semaphore(4); 
while (true) {       
    MyRunnable t1 = new MyRunnable("Name" + number++);

    Thread thread = new Thread(t1);
    thread.start();                     
}

这是我的Thread类以及内部发生的事情

public class MyRunnable implements Runnable {

        String name;

        public MyRunnable(String name) {
            this.name = name;
        }

        private void saveFile(String path) throws IOException {

            byte[] buffer = new byte[4096]; //4096 16384

            String fileName = dis.readUTF();
            int fileSize = (int) dis.readLong();

            int read = 0;
            int totalRead = 0;

            FileOutputStream fos = new FileOutputStream(path + fileName);
            System.out.println("Name of the file " + fileName);

            while ((read = dis.read(buffer,Math.min(buffer.length,fileSize))) > 0) {
                totalRead += read;
                fileSize -= read;
                System.out.println("Read" + totalRead + " bytes.");
                fos.write(buffer,read);
            }

        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println(name + " : got the permit!");
                System.out.println("available Semaphore permits : "
                        + semaphore.availablePermits());

                try {
                    saveFile(pathToFiles + login + "\\");
                } finally {

                    // calling release() after a successful acquire()
                    System.out.println(name + " : releasing lock...");
                    semaphore.release();
                    System.out.println(name + " : available Semaphore permits Now: "
                            + semaphore.availablePermits());

                }

            } catch (IOException | InterruptedException e) {
                e.printstacktrace();
            }
        }
    }

这是我的输出,什么都没有发送:

Name0 : got the permit!
Name1 : got the permit!
Name3 : got the permit!
Name2 : got the permit!
available Semaphore permits : 0
available Semaphore permits : 0
available Semaphore permits : 0
available Semaphore permits : 0

这是我的客户代码的一部分

public void watchDirectory(Path path) throws IOException,InterruptedException {
        WatchService watchService
                = FileSystems.getDefault().newWatchService();

        path.register(
                watchService,StandardWatchEventKinds.ENTRY_CREATE
                //     StandardWatchEventKinds.ENTRY_DELETE,// StandardWatchEventKinds.ENTRY_MODIFY
                );

        WatchKey key;
        while ((key = watchService.take()) != null) {
            for (WatchEvent<?> event : key.pollEvents()) {
                System.out.println(
                        "Event kind:" + event.kind()
                                + ". File affected: " + event.context());
                Runnable runnable = () -> {
                    System.out.println("Inside : " + Thread.currentThread().getName());
                    File file = new File(pathToFiles + login + "\\" + event.context());
                    try {
                        Thread.sleep(50);
                        sendFile(file);
                    } catch (IOException | InterruptedException e) {
                        e.printstacktrace();
                    }

                };
                
                System.out.println("Creating Thread...");
                Thread thread = new Thread(runnable);

                System.out.println("Starting Thread...");
                thread.start();
            }
            key.reset();
        }
    }

    public void sendFile(File file) throws IOException {

        FileInputStream fis = new FileInputStream(file);
        byte[] buffer = new byte[4096];  //4096 16384

        // writing name
        dos.writeUTF(file.getName());
        // writing length
        dos.writeLong(file.length());

        System.out.println(file.getName() + " " + file.length());

        int count;

        while ((count = fis.read(buffer)) > 0) {
            dos.write(buffer,count);
        }

    }

我想让一个线程处理一个文件客户端发送的消息。当客户端发送更多文件时,我希望线程能够一致地工作。 有一个好的方法吗?感谢您的帮助。

解决方法

如果要重用一组4个线程,最好的方法是使用固定的线程池执行程序:

ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(4);

threadPoolExecutor.submit( new SomeRunnable() );

(您可以在 documentation。)

在这种情况下,您将在目录监视程序之外创建ExecutorService,然后在监视程序代码中醒来做某事的地方submit()对其执行任务。

有关如何在其自己的线程中监视目录的示例,请参见this question。在该示例之后,目录监视程序类将如下所示:

public class DirWatcher implements Runnable {

    private final Path dir;
    private final WatchService watcher;
    private final WatchKey key;

    // thread pool for handling files in the watched directory
    ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(4);

    @SuppressWarnings("unchecked")
    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>) event;
    }

    public DirWatcher(Path dir) throws IOException {
        this.dir = dir;
        this.watcher = FileSystems.getDefault().newWatchService();
        this.key = dir.register(watcher,ENTRY_CREATE);
    }

    public void run() {
        try {
            for (;;) {
                WatchKey key = watcher.take();

                if (this.key != key) {
                    continue;
                }

                for (WatchEvent<?> event : key.pollEvents()) {
                    WatchEvent<Path> ev = cast(event);
                    String fileName = dir.resolve( ev.context() ).toString();

                    // handle event
                    threadPoolExecutor.submit( new SomeTask( fileName ) );
                }

                // reset key
                if (!key.reset()) {
                    break;
                }
            }
        } catch (InterruptedException x) {
            return;
        }
    }
}

您可以看到ExecutorService实例和submit()调用,您的目录观察者将在其中响应正在创建的新文件,并为Runnable保留一个占位符“ SomeTask”,无论执行什么工作,您都将调用它您想要对文件进行操作。

您可以这样称呼它:

public static void watchThatDirectory(String dirName) throws IOException,InterruptedException {

    Path dir = Paths.get(dirName);
    DirWatcher watcher = new DirWatcher(dir);

    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<?> future = executor.submit(watcher);
    executor.shutdown();
}

现在目录监视程序线程正在后台运行,并且将启动新线程(最多4个)以处理监视目录中新文件的出现。同时,您可以在主线程中执行其他工作。