目 录CONTENT

文章目录

超乎想象的异步处理能力:Java 线程池与 @Async 的完美结合

小张的探险日记
2023-02-12 / 0 评论 / 0 点赞 / 543 阅读 / 10,161 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-02-20,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

在 Java 中,使用线程池可以有效的管理线程,避免因创建过多的线程而造成内存溢出。同时,使用 @Async 注解可以让方法异步执行,有效的提高程序的并发能力。下面是一个使用线程池和 @Async 的使用案例:

创建线程池

首先,我们需要创建一个线程池,在 Spring Boot 中可以使用 @Configuration + @Bean 注解将线程池配置到 Spring 容器中:

package com.thread.conf;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * @className ExecutorConfig
 * @Desc 线程池配置
 * @Author 张埔枘
 * @Date 2023/2/9 16:56
 * @Version 1.0
 */
@Configuration
@EnableAsync
public class ExecutorConfig{



    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor getAsyncExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);


        // 默认 超过队列queueCapacity 长度时 直接报错,这是默认策略

        // rejection-policy:当 queueCapacity队列长度已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.initialize();
        return executor;
    }



}



读取线程池消费情况


package com.thread.conf;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;


/**
 * @className VisiableThreadPoolTaskExecutor
 * @Desc 读取线程池消费情况
 * @Author 张埔枘
 * @Date 2023/2/9 17:08
 * @Version 1.0
 */

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

创建异步方法

service 接口

package com.thread.service;

/**
 * @className AsyncService
 * @Desc 
 * @Author 张埔枘
 * @Date 2023/2/9 17:02
 * @Version 1.0
 */
public interface AsyncService {
    /**
     * 执行异步任务
     */
    void executeAsync();
}

实现

package com.thread.service.impl;

import com.thread.service.AsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @className AsyncServiceImpl
 * @Desc 异步方法案例
 * @Author 张埔枘
 * @Date 2023/2/9 17:02
 * @Version 1.0
 */
@Service
public class AsyncServiceImpl implements AsyncService {


    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("线程名称:" + Thread.currentThread().getName() + "- start executeAsync");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info(Thread.currentThread().getName() + "end executeAsync");
    }
}


测试接口

配置文件信息

server.port=8080

async.executor.thread.core_pool_size = 5
async.executor.thread.max_pool_size = 5
async.executor.thread.queue_capacity = 5
async.executor.thread.name.prefix = async-service-

package com.thread.api;

import com.thread.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @className TestController
 * @Desc 测试
 * @Author 张埔枘
 * @Date 2023/2/9 17:05
 * @Version 1.0
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private AsyncService asyncService;

    @GetMapping("/async")
    public void async(){
        asyncService.executeAsync();
    }
}


Apifox 并发测试

连续请求接口,触发默认处理策略,会直接报错
image.png

前面会正常执行,默认 超过队列queueCapacity 长度时 直接报错,这是默认策略

image.png

以上代码中,我们在 TestController 中调用了 executeAsync() 方法,它会触发异步任务的执行,最终在控制台中可以看到任务的执行结果。

综上所述,使用线程池和 @Async 注解可以有效的提高程序的并发能力,需要注意的是,在使用线程池时需要注意线程的管理,以避免因线程数量过多而造成的性能问题。

需要注意的是,使用线程池和 @Async 并不是解决所有性能问题的万能药。它们的适用范围主要是:

对于需要长时间执行的任务,如文件下载、图像处理等;
对于需要并发执行的任务,如并行处理多个请求等;

需要注意的是,如果任务执行时间过短,使用线程池和 @Async 可能会引入额外的系统开销,导致性能降低。因此,在使用线程池和 @Async 时需要根据实际情况进行权衡。

在使用 Java 线程池和 @Async 时,还有一些需要注意的事项。

1.线程池的大小需要合理设置:线程池的大小应该根据系统资源和任务性质进行合理设置,避免因线程池过小或过大导致的性能问题。

2.线程池的生命周期需要合理管理:在使用完线程池后,应该及时销毁线程池,以避免线程池中的线程无效占用系统资源。

3.任务需要简单且独立:异步任务应该简单且独立,不依赖于其他任务的状态,避免因任务之间的相互影响导致的问题。

4.异常处理需要注意:在异步任务中可能会发生异常,因此需要设计合理的异常处理机制,以避免异常导致的问题。

多种线程池类型

Java 提供了多种线程池类型,例如 FixedThreadPool、CachedThreadPool 等。根据任务性质和系统资源情况,应该选择合适的线程池类型,以最大化线程池的使用效率。

Java 中提供了四种常用的线程池类型:

  1. FixedThreadPool:固定线程数量的线程池,适用于执行固定数量的任务。
  2. CachedThreadPool:可缓存线程池,适用于执行大量短期任务。
  3. SingleThreadExecutor:单线程线程池,适用于需要保证任务顺序的场景。
  4. ScheduledThreadPool:调度线程池,适用于执行定时任务和周期性任务。

以上四种线程池都是 Java 标准库提供的,可以通过 Executors 工具类轻松创建。除此之外,Java 还提供了自定义线程池的方法,允许开发人员根据业务需求设计自己的线程池。

代码案例和各自的优势说明

1.FixedThreadPool

代码案例:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
    final int index = i;
    fixedThreadPool.execute(() -> {
        try {
            System.out.println(index);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
fixedThreadPool.shutdown();


优势说明:

FixedThreadPool 是一种固定线程数量的线程池,在池中的线程数量固定不变,且所有线程都是处于活动状态。因此,它适用于执行固定数量的任务,可以有效避免线程数量过多导致的资源浪费和系统消耗。

2.CachedThreadPool

代码案例:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
    final int index = i;
    cachedThreadPool.execute(() -> {
        try {
            System.out.println(index);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
cachedThreadPool.shutdown();


优势说明:

CachedThreadPool 是一种可缓存线程池,适用于执行大量短期任务。在线程池中,如果没有空闲线程,就会创建新线程;如果有空闲线程,就会重用空闲线程。因此,CachedThreadPool 可以快速处理大量短期任务,但不适用于需要长时间执行的任务。

  1. SingleThreadExecutor

代码案例:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for(int i = 0; i < 100; i++) {
	final int index = i;
	singleThreadExecutor.execute(() -> {
		try {
			System.out.println(index);
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	});
}
singleThreadExecutor.shutdown();


优势说明:

SingleThreadExecutor 是一种单线程的线程池,它保证任务是按顺序执行的,而不是同时执行的。因此,如果任务依赖于先后顺序,则可以使用

4.SingleThreadExecutor

代码案例:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.scheduleAtFixedRate(() -> {
	System.out.println("running");
}, 1000, 1000, TimeUnit.MILLISECONDS);

优势说明:

ScheduledThreadPool 是一种基于定时任务的线程池,可以在指定的延迟时间后执行任务,或者在指定的固定频率内执行任务。因此,如果需要定期执行任务,可以使用 ScheduledThreadPool。

额外提一个特殊的线程池

WorkStealingPool
代码案例:

ForkJoinPool workStealingPool = new ForkJoinPool();
workStealingPool.execute(() -> {
    System.out.println("running");
});

优势说明:

WorkStealingPool 是一种工作窃取线程池,它可以提高任务处理的效率。当一个线程完成了它的任务,它可以从其他线程中窃取未完成的任务进行处理。因此,如果需要提高任务处理效率,可以使用 WorkStealingPool。

线程拒绝策略

在 Java 中,线程池的拒绝策略是指当线程池的任务队列已满,并且线程池的线程数也已达到最大线程数时,对于新任务的处理策略。Java 提供了 4 种预定义的拒绝策略:

  1. AbortPolicy:直接抛出 RejectedExecutionException 异常。
  2. DiscardPolicy:直接丢弃任务,不做任何处理。
  3. DiscardOldestPolicy:丢弃任务队列中最老的任务,然后重新尝试执行任务(重复此过程)。
  4. CallerRunsPolicy:将任务回退到调用者,由调用者执行该任务。

默认情况下,Java 使用 AbortPolicy 拒绝策略。开发人员可以通过 Executors 类或 ThreadPoolExecutor 类的构造方法来自定义拒绝策略。

线程的异常处理

正常的线程执行出现异常,我们是很难感知的,通常可以通过如下方式来捕获异常:
1.try..catch 捕获
2.线程池提交方法返回的 RunnableFuture 来获取异常
3.为线程设置 UncaughtExceptionHandler 异常处理预案
4.重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用

代码案例:

class ExtendedExecutor extends ThreadPoolExecutor {
    // 这可是jdk文档里面给的例子。。
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
            System.out.println(t);
    }
}}

线程池有哪几种工作队列

线程池的工作队列可以根据不同的特点和应用场景,分为以下几种类型:

  1. 有界队列(Bounded Queue):有界队列指的是工作队列的容量是有限的,当队列已满时,新的任务就会被拒绝执行。有界队列可以有效地控制任务数量,防止任务过多导致系统资源耗尽。常见的有界队列实现方式包括数组、链表和环形缓冲区等。

  2. 无界队列(Unbounded Queue):无界队列指的是工作队列的容量是无限的,当队列已满时,新的任务会一直等待,直到有线程可用。无界队列可以保证所有任务都会被执行,但可能会导致内存占用过高。常见的无界队列实现方式包括链表和ConcurrentLinkedQueue等。

  3. 优先级队列(Priority Queue):优先级队列指的是工作队列中的任务可以按照优先级排序,高优先级的任务会先被执行。优先级队列可以优先处理紧急的任务,但可能会导致低优先级的任务长时间等待。常见的优先级队列实现方式包括堆和PriorityQueue等。

  4. Synchronous Queue:Synchronous Queue是一种特殊的队列,它不存储任何元素,而是在生产者线程和消费者线程之间直接传递任务。当一个任务需要执行时,它会被直接传递给一个空闲的线程,如果没有空闲的线程,那么任务就会一直等待。Synchronous Queue可以实现无缓冲的、同步的任务传递,但可能会导致任务等待过长时间。

  5. Delay Queue:Delay Queue是一种支持延迟执行的队列,它可以存储实现了Delayed接口的元素,并根据元素的延迟时间来确定执行顺序。当一个元素的延迟时间过期时,它就会被取出并执行。Delay Queue可以实现延迟执行的任务调度,常见的应用场景包括定时任务和缓存过期等。

需要根据具体的应用场景和需求,选择合适的工作队列类型。

线程池有那几种状态

1.初始状态:线程池刚创建时处于初始状态,此时线程池中没有任何线程。

2.运行状态:线程池通过调用execute或其他任务提交方法开始接受任务,并根据需要创建新线程来执行任务。

3.暂停状态:当线程池被暂停时,线程池将不再接受新的任务,并将当前正在执行的任务执行完毕后结束。

4.关闭状态:线程池被关闭时,线程池将不再接受新的任务,同时尝试中断正在执行的任务并释放线程池中的所有线程。

5.终止状态:当线程池关闭后,所有线程都已被终止,线程池处于终止状态。

这些状态可以通过监控线程池的状态标志位来跟踪。线程池的状态通常由其生命周期的不同阶段决定,例如创建、运行、暂停、关闭和终止。

0

评论区