多线程编程学习五(线程池的创建)

一、概述

在开发过程中,线程池可以带来如下好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

New Thread的弊端如下:
       a、每次New Thread新建对象性能差。
       b、线程缺乏统一的管理,可能无限制的新建线程,相互之间竞争,极可能占用过多的系统资源导致死机 或者 OOM。
       c、缺乏更多功能,如定时执行、定期执行、线程中断。

Java提供的四种线程池的好处在于:
       a、重用存在的线程,减少对象创建、消亡的开销,性能佳。
       b、可有效控制最大并发线程数、提供系统资源的使用率,同时避免过多资源竞争,避免堵塞。
       c、提供定时执行、定期执行、单线程、并发数控制等功能。

二、Executors 创建线程池

Java通过Executors提供四种线程池,分别为:

newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,表示同一时刻只能有这么大的并发数
newScheduledThreadPool 创建一个定时线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行。

三、ThreadPoolExecutor 创建线程池

线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
      1、 newFixedThreadPool 和 newSingleThreadExecutor:
       主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
      2、newCachedThreadPool 和 newScheduledThreadPool:
       主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

这里介绍三种创建线程池的方式:

Example 1:

    //org.apache.commons.lang3.concurrent.BasicThreadFactory
    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

Example 2:

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    Common Thread Pool
    ExecutorService pool = new ThreadPoolExecutor(5,200,0L,TimeUnit.MILLISECONDS,1)">new LinkedBlockingQueue<Runnable>(1024),namedThreadFactory,1)">new ThreadPoolExecutor.AbortPolicy());

    pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    pool.shutdown();gracefully shutdown

Example 3:

    <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        property name="corePoolSize" value="10" />
        ="maxPoolSize"="100" ="queueCapacity"="2000" ="threadFactory"= threadFactory ="rejectedExecutionHandler">
            ref local="rejectedExecutionHandler" </property>
    bean>
    //in code
    userThreadPool.execute(thread);

 

tips:ThreadPoolExecutor详解可以参考:https://www.cnblogs.com/jmcui/p/11552583.html

四、自建线程池

    我们要建一个简单的线程池,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。

public interface ThreadPool<Job extends Runnable> {

    /**
     * 执行一个Job,这个Job需要实现Runnable
     *
     * @param job
     */
    void execute(Job job);

    
     * 关闭线程池
      shutdown();

    
     * 增加工作者线程
     *
     *  num
     void addWorkers(int num);

    
     * 减少工作者线程
     *
     * void removeWorker(
     * 得到正在等待执行的任务数量
     *
     * @return
      getJobSize();
}
ThreadPool

class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    
     * 线程池最大限制数、默认的数量、最小的数量
     private static final Integer MAX_WORKER_NUMBERS = 10;
    final Integer DEFAULT_WORKER_NUMBERS = 5final Integer MIN_WORKER_NUMBERS = 1
     * 这是一个待工作列表,将会向里面插入工作
     final LinkedList<Job> jobs = new LinkedList<>();
    
     * 工作者列表(固定数目的线程,不断去执行  jobs 中的任务)
     final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    
     * 工作者线程的数量
     int workerNum = DEFAULT_WORKER_NUMBERS;
    
     * 线程编号生成
     private AtomicLong threadNum =  AtomicLong();

    public DefaultThreadPool() {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool( num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWokers(workerNum);
    }

    @Override
     execute(Job job) {
        if (job != null) {
             添加一个工作,然后进行通知
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
     shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }

    @Override
     num) {
         (jobs) {
             限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum += num;
        }
    }

    @Override
    if (num >= .workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                 每次都移除第一个线程
                Worker worker = workers.get(0);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
     getJobSize() {
        return jobs.size();
    }

    
     * 初始化线程工作者
     *
     * void initializeWokers(for (int i = 0; i < num; i++) {
            Worker worker =  Worker();
            workers.add(worker);
            Thread thread = new Thread(worker,"ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    
     * 工作者,负责消费任务
     class Worker implements Runnable {
        
         * 是否工作
         */
        volatile boolean running = true;

        @Override
         run() {
            while (running) {
                Job job;
                 (jobs) {
                     如果工作者列表是空的,那么就wait
                     (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException ex) {
                             感知到外部对 WorkerThread 的中断操作,返回
                            Thread.currentThread().interrupt();
                            ;
                        }
                    }
                    取出一个Job
                    job = jobs.removeFirst();
                }
                ) {
                     {
                        job.run();
                    }  (Exception ex) {
                         忽略Job执行中的Exception
                    }
                }
            }
        }

         shutdown() {
            running = false;
        }
    }
}
DefaultThreadPool

    可以看到,线程池的本质就是使用了一个线程安全的工作队列(workers)连接工作者线程和客户端线程,客户端线程将任务(job)放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。

    我们利用自建的线程池来构造一个简单的 Web 服务器,这个 Web 服务器用来处理 HTTP 请求,目前只能处理简单的文本和 JPG 图片内容。这个 Web 服务器使用 main 线程不断地接受客户端 Socket 的连接,将连接以及请求提交给线程池处理,这样使得 Web 服务器能够同时处理多个客户端请求。

class SimpleHttpServer {

    
     * 处理HttpRequest的线程池
     static ThreadPool<HttpRequestHandler> THREAD_POOL = new DefaultThreadPool<>(1);
    
     * SimpleHttpServer的根路径(可以理解成 Tomcat 的 Root 目录)
     static String basePath;
     ServerSocket serverSocket;
    
     * 服务监听端口
     int port = 8080;

    void setPort( port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }

     setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() &&  File(basePath).isDirectory()) {
            SimpleHttpServer.basePath = basePath;
        }
    }

    
     * 启动SimpleHttpServer
     *
     * @throws Exception
     void start() throws Exception {
        serverSocket =  ServerSocket(port);
        Socket socket;
        while ((socket = serverSocket.accept()) !=  接收一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行
            THREAD_POOL.execute( HttpRequestHandler(socket));
        }
        serverSocket.close();
    }


    class HttpRequestHandler  Runnable {

        private Socket socket;

         HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
         socket 输入
            BufferedReader reader =  socket 输出
            PrintWriter out = ;
            BufferedReader br = ;
            InputStream in =  {
                reader = new BufferedReader( InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                 由相对路径计算出绝对路径
                String filePath = basePath + header.split("\\s+")[1];
                out =  PrintWriter(socket.getOutputStream());
                 如果请求资源的后缀为jpg或者ico,则读取资源并输出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in =  FileInputStream(filePath);
                    ByteArrayOutputStream baos =  ByteArrayOutputStream();
                     i;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");
                    socket.getOutputStream().write(array,0,array.length);
                } else {
                    br = new InputStreamReader( FileInputStream(filePath)));
                    out =  PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    String line = ;
                    while ((line = br.readLine()) != ) {
                        out.println(line);
                    }
                }
                out.flush();
            }  (Exception ex) {
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            } finally {
                close(br,in,reader,out,socket);
            }
        }
    }

    
     * 关闭流或者Socket
     *
     *  closeables
      close(Closeable... closeables) {
        if (closeables !=  (Closeable closeable : closeables) {
                 {
                    closeable.close();
                }  (Exception ex) {
                }
            }
        }
    }


    void main(String[] args)  Exception {
        SimpleHttpServer.setPort(-1);
        SimpleHttpServer.setBasePath("D:\\");
        SimpleHttpServer.start();
    }
}
SimpleHttpServer

    以上案例参考自《Java 并发编程的艺术》

相关文章

摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠...
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠...
今天犯了个错:“接口变动,伤筋动骨,除非你确定只有你一个...
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:...
本文目录 线程与多线程 线程的运行与创建 线程的状态 1 线程...