Java线程池应用场景

线程池应用场景:

1、创建线程池 ThreadPoolExecutor

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {
    //使用阿里巴巴推荐的创建线程池的方式
    //通过ThreadPoolExecutor构造函数自定义参数创建
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.CallerRunsPolicy());
    
    for (int i = 0; i < 10; i++) {
        executor.execute(new ThreadClass(i));
    }
    //终止线程池
    executor.shutdown();
    try {
        executor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Finished all threads");
}


/**
 * Runnable 类
 */
static class ThreadClass implements Runnable {
    private int i;
    
    public ThreadClass(int i) {
        this.i = i;
    }
    
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " star " + i);
    
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        System.out.println(Thread.currentThread().getName() + " end " + i);
    }
}

2、并行任务

public R test() {
    // CountDownLatch 计数器,线程完成一个记录一个,计数器递减
    CountDownLatch latch = new CountDownLatch(2);
    // 创建线程池
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                // TODO 任务一
            } finally {
                latch.countDown();
            }
        }
    });
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                // TODO 任务二
            } finally {
                latch.countDown();
            }
        }
    });

    // 一定记得加上timeout时间,防止阻塞主线程
    try {
        latch.await(20000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // 等待所有子任务完成,组装信息
    // TODO 

    //关闭线程池
    executorService.shutdown();
    return R.ok();
}

3、定时任务 ScheduledExecutorService

public static void main(String[] args) {
        // 每隔一秒打印
    ScheduledExecutorService ex = Executors.newScheduledThreadPool(1);
    ex.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println(DateUtil.date());
        }
    }, 0, 1000, TimeUnit.MILLISECONDS);
}

线程池最佳实践

1、使用 ThreadPoolExecutor 的构造函数声明线程池

线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类的 newFixedThreadPool 和 newCachedThreadPool ,因为可能会有 OOM 的风险。

Executors 返回线程池对象的弊端如下:

FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

说白了就是:使用有界队列,控制线程创建数量。

2、监测线程池运行状态

你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API做一个简陋的监控。从下图可以看出, ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

    /**
     * 打印线程池的状态
     *
     * @param threadPool 线程池对象
     */
    public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

3、建议不同类别的业务用不同的线程池

一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。

4、别忘记给线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

1.利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

2.自己实现 ThreadFactor。

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override 
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}

5.正确配置线程池参数

有一个简单并且适用面比较广的公式:

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

参考

https://juejin.cn/post/6844903844355260429
https://javaguide.cn/java/concurrent/java-thread-pool-best-practices.html

本文链接: https://jianz.xyz/index.php/archives/266/

1 + 8 =
快来做第一个评论的人吧~