问题描述
我正在处理带有套接字的应用程序。客户端正在监视特定目录,并且当用户在其中添加文件时,客户端正在通过套接字将此文件发送到服务器端的特定目录。 现在,我已经实现了信号量,可以将许多文件同时发送到服务器。 这是我的问题开始的地方。我正在使用while循环,当客户端尝试发送一些文件时,服务器正在创建它可以的所有线程。在这种情况下4,因此:
static Semaphore semaphore = new Semaphore(4);
while (true) {
MyRunnable t1 = new MyRunnable("Name" + number++);
Thread thread = new Thread(t1);
thread.start();
}
这是我的Thread类以及内部发生的事情
public class MyRunnable implements Runnable {
String name;
public MyRunnable(String name) {
this.name = name;
}
private void saveFile(String path) throws IOException {
byte[] buffer = new byte[4096]; //4096 16384
String fileName = dis.readUTF();
int fileSize = (int) dis.readLong();
int read = 0;
int totalRead = 0;
FileOutputStream fos = new FileOutputStream(path + fileName);
System.out.println("Name of the file " + fileName);
while ((read = dis.read(buffer,Math.min(buffer.length,fileSize))) > 0) {
totalRead += read;
fileSize -= read;
System.out.println("Read" + totalRead + " bytes.");
fos.write(buffer,read);
}
}
public void run() {
try {
semaphore.acquire();
System.out.println(name + " : got the permit!");
System.out.println("available Semaphore permits : "
+ semaphore.availablePermits());
try {
saveFile(pathToFiles + login + "\\");
} finally {
// calling release() after a successful acquire()
System.out.println(name + " : releasing lock...");
semaphore.release();
System.out.println(name + " : available Semaphore permits Now: "
+ semaphore.availablePermits());
}
} catch (IOException | InterruptedException e) {
e.printstacktrace();
}
}
}
这是我的输出,什么都没有发送:
Name0 : got the permit!
Name1 : got the permit!
Name3 : got the permit!
Name2 : got the permit!
available Semaphore permits : 0
available Semaphore permits : 0
available Semaphore permits : 0
available Semaphore permits : 0
这是我的客户代码的一部分
public void watchDirectory(Path path) throws IOException,InterruptedException {
WatchService watchService
= FileSystems.getDefault().newWatchService();
path.register(
watchService,StandardWatchEventKinds.ENTRY_CREATE
// StandardWatchEventKinds.ENTRY_DELETE,// StandardWatchEventKinds.ENTRY_MODIFY
);
WatchKey key;
while ((key = watchService.take()) != null) {
for (WatchEvent<?> event : key.pollEvents()) {
System.out.println(
"Event kind:" + event.kind()
+ ". File affected: " + event.context());
Runnable runnable = () -> {
System.out.println("Inside : " + Thread.currentThread().getName());
File file = new File(pathToFiles + login + "\\" + event.context());
try {
Thread.sleep(50);
sendFile(file);
} catch (IOException | InterruptedException e) {
e.printstacktrace();
}
};
System.out.println("Creating Thread...");
Thread thread = new Thread(runnable);
System.out.println("Starting Thread...");
thread.start();
}
key.reset();
}
}
public void sendFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
byte[] buffer = new byte[4096]; //4096 16384
// writing name
dos.writeUTF(file.getName());
// writing length
dos.writeLong(file.length());
System.out.println(file.getName() + " " + file.length());
int count;
while ((count = fis.read(buffer)) > 0) {
dos.write(buffer,count);
}
}
我想让一个线程处理一个文件客户端发送的消息。当客户端发送更多文件时,我希望线程能够一致地工作。 有一个好的方法吗?感谢您的帮助。
解决方法
如果要重用一组4个线程,最好的方法是使用固定的线程池执行程序:
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(4);
threadPoolExecutor.submit( new SomeRunnable() );
(您可以在 documentation。)
在这种情况下,您将在目录监视程序之外创建ExecutorService,然后在监视程序代码中醒来做某事的地方submit()
对其执行任务。
有关如何在其自己的线程中监视目录的示例,请参见this question。在该示例之后,目录监视程序类将如下所示:
public class DirWatcher implements Runnable {
private final Path dir;
private final WatchService watcher;
private final WatchKey key;
// thread pool for handling files in the watched directory
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(4);
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
public DirWatcher(Path dir) throws IOException {
this.dir = dir;
this.watcher = FileSystems.getDefault().newWatchService();
this.key = dir.register(watcher,ENTRY_CREATE);
}
public void run() {
try {
for (;;) {
WatchKey key = watcher.take();
if (this.key != key) {
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent<Path> ev = cast(event);
String fileName = dir.resolve( ev.context() ).toString();
// handle event
threadPoolExecutor.submit( new SomeTask( fileName ) );
}
// reset key
if (!key.reset()) {
break;
}
}
} catch (InterruptedException x) {
return;
}
}
}
您可以看到ExecutorService
实例和submit()
调用,您的目录观察者将在其中响应正在创建的新文件,并为Runnable保留一个占位符“ SomeTask”,无论执行什么工作,您都将调用它您想要对文件进行操作。
您可以这样称呼它:
public static void watchThatDirectory(String dirName) throws IOException,InterruptedException {
Path dir = Paths.get(dirName);
DirWatcher watcher = new DirWatcher(dir);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(watcher);
executor.shutdown();
}
现在目录监视程序线程正在后台运行,并且将启动新线程(最多4个)以处理监视目录中新文件的出现。同时,您可以在主线程中执行其他工作。