在 Java 中,使用线程池可以有效的管理线程,避免因创建过多的线程而造成内存溢出。同时,使用 @Async 注解可以让方法异步执行,有效的提高程序的并发能力。下面是一个使用线程池和 @Async 的使用案例:
创建线程池
首先,我们需要创建一个线程池,在 Spring Boot 中可以使用 @Configuration + @Bean 注解将线程池配置到 Spring 容器中:
package com.thread.conf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @className ExecutorConfig
* @Desc 线程池配置
* @Author 张埔枘
* @Date 2023/2/9 16:56
* @Version 1.0
*/
@Configuration
@EnableAsync
public class ExecutorConfig{
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor getAsyncExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// 默认 超过队列queueCapacity 长度时 直接报错,这是默认策略
// rejection-policy:当 queueCapacity队列长度已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
读取线程池消费情况
package com.thread.conf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @className VisiableThreadPoolTaskExecutor
* @Desc 读取线程池消费情况
* @Author 张埔枘
* @Date 2023/2/9 17:08
* @Version 1.0
*/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
创建异步方法
service 接口
package com.thread.service;
/**
* @className AsyncService
* @Desc
* @Author 张埔枘
* @Date 2023/2/9 17:02
* @Version 1.0
*/
public interface AsyncService {
/**
* 执行异步任务
*/
void executeAsync();
}
实现
package com.thread.service.impl;
import com.thread.service.AsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @className AsyncServiceImpl
* @Desc 异步方法案例
* @Author 张埔枘
* @Date 2023/2/9 17:02
* @Version 1.0
*/
@Service
public class AsyncServiceImpl implements AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
logger.info("线程名称:" + Thread.currentThread().getName() + "- start executeAsync");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + "end executeAsync");
}
}
测试接口
配置文件信息
server.port=8080
async.executor.thread.core_pool_size = 5
async.executor.thread.max_pool_size = 5
async.executor.thread.queue_capacity = 5
async.executor.thread.name.prefix = async-service-
package com.thread.api;
import com.thread.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @className TestController
* @Desc 测试
* @Author 张埔枘
* @Date 2023/2/9 17:05
* @Version 1.0
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public void async(){
asyncService.executeAsync();
}
}
Apifox 并发测试
连续请求接口,触发默认处理策略,会直接报错
前面会正常执行,默认 超过队列queueCapacity 长度时 直接报错,这是默认策略
以上代码中,我们在 TestController 中调用了 executeAsync() 方法,它会触发异步任务的执行,最终在控制台中可以看到任务的执行结果。
综上所述,使用线程池和 @Async 注解可以有效的提高程序的并发能力,需要注意的是,在使用线程池时需要注意线程的管理,以避免因线程数量过多而造成的性能问题。
需要注意的是,使用线程池和 @Async 并不是解决所有性能问题的万能药。它们的适用范围主要是:
对于需要长时间执行的任务,如文件下载、图像处理等;
对于需要并发执行的任务,如并行处理多个请求等;
需要注意的是,如果任务执行时间过短,使用线程池和 @Async 可能会引入额外的系统开销,导致性能降低。因此,在使用线程池和 @Async 时需要根据实际情况进行权衡。
在使用 Java 线程池和 @Async 时,还有一些需要注意的事项。
1.线程池的大小需要合理设置:线程池的大小应该根据系统资源和任务性质进行合理设置,避免因线程池过小或过大导致的性能问题。
2.线程池的生命周期需要合理管理:在使用完线程池后,应该及时销毁线程池,以避免线程池中的线程无效占用系统资源。
3.任务需要简单且独立:异步任务应该简单且独立,不依赖于其他任务的状态,避免因任务之间的相互影响导致的问题。
4.异常处理需要注意:在异步任务中可能会发生异常,因此需要设计合理的异常处理机制,以避免异常导致的问题。
多种线程池类型
Java 提供了多种线程池类型,例如 FixedThreadPool、CachedThreadPool 等。根据任务性质和系统资源情况,应该选择合适的线程池类型,以最大化线程池的使用效率。
Java 中提供了四种常用的线程池类型:
- FixedThreadPool:固定线程数量的线程池,适用于执行固定数量的任务。
- CachedThreadPool:可缓存线程池,适用于执行大量短期任务。
- SingleThreadExecutor:单线程线程池,适用于需要保证任务顺序的场景。
- ScheduledThreadPool:调度线程池,适用于执行定时任务和周期性任务。
以上四种线程池都是 Java 标准库提供的,可以通过 Executors 工具类轻松创建。除此之外,Java 还提供了自定义线程池的方法,允许开发人员根据业务需求设计自己的线程池。
代码案例和各自的优势说明
1.FixedThreadPool
代码案例:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
final int index = i;
fixedThreadPool.execute(() -> {
try {
System.out.println(index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
fixedThreadPool.shutdown();
优势说明:
FixedThreadPool 是一种固定线程数量的线程池,在池中的线程数量固定不变,且所有线程都是处于活动状态。因此,它适用于执行固定数量的任务,可以有效避免线程数量过多导致的资源浪费和系统消耗。
2.CachedThreadPool
代码案例:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int index = i;
cachedThreadPool.execute(() -> {
try {
System.out.println(index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
cachedThreadPool.shutdown();
优势说明:
CachedThreadPool 是一种可缓存线程池,适用于执行大量短期任务。在线程池中,如果没有空闲线程,就会创建新线程;如果有空闲线程,就会重用空闲线程。因此,CachedThreadPool 可以快速处理大量短期任务,但不适用于需要长时间执行的任务。
- SingleThreadExecutor
代码案例:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for(int i = 0; i < 100; i++) {
final int index = i;
singleThreadExecutor.execute(() -> {
try {
System.out.println(index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
singleThreadExecutor.shutdown();
优势说明:
SingleThreadExecutor 是一种单线程的线程池,它保证任务是按顺序执行的,而不是同时执行的。因此,如果任务依赖于先后顺序,则可以使用
4.SingleThreadExecutor
代码案例:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("running");
}, 1000, 1000, TimeUnit.MILLISECONDS);
优势说明:
ScheduledThreadPool 是一种基于定时任务的线程池,可以在指定的延迟时间后执行任务,或者在指定的固定频率内执行任务。因此,如果需要定期执行任务,可以使用 ScheduledThreadPool。
额外提一个特殊的线程池
WorkStealingPool
代码案例:
ForkJoinPool workStealingPool = new ForkJoinPool();
workStealingPool.execute(() -> {
System.out.println("running");
});
优势说明:
WorkStealingPool 是一种工作窃取线程池,它可以提高任务处理的效率。当一个线程完成了它的任务,它可以从其他线程中窃取未完成的任务进行处理。因此,如果需要提高任务处理效率,可以使用 WorkStealingPool。
线程拒绝策略
在 Java 中,线程池的拒绝策略是指当线程池的任务队列已满,并且线程池的线程数也已达到最大线程数时,对于新任务的处理策略。Java 提供了 4 种预定义的拒绝策略:
- AbortPolicy:直接抛出 RejectedExecutionException 异常。
- DiscardPolicy:直接丢弃任务,不做任何处理。
- DiscardOldestPolicy:丢弃任务队列中最老的任务,然后重新尝试执行任务(重复此过程)。
- CallerRunsPolicy:将任务回退到调用者,由调用者执行该任务。
默认情况下,Java 使用 AbortPolicy 拒绝策略。开发人员可以通过 Executors 类或 ThreadPoolExecutor 类的构造方法来自定义拒绝策略。
线程的异常处理
正常的线程执行出现异常,我们是很难感知的,通常可以通过如下方式来捕获异常:
1.try..catch 捕获
2.线程池提交方法返回的 RunnableFuture 来获取异常
3.为线程设置 UncaughtExceptionHandler 异常处理预案
4.重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
代码案例:
class ExtendedExecutor extends ThreadPoolExecutor {
// 这可是jdk文档里面给的例子。。
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}}
线程池有哪几种工作队列
线程池的工作队列可以根据不同的特点和应用场景,分为以下几种类型:
-
有界队列(Bounded Queue):有界队列指的是工作队列的容量是有限的,当队列已满时,新的任务就会被拒绝执行。有界队列可以有效地控制任务数量,防止任务过多导致系统资源耗尽。常见的有界队列实现方式包括数组、链表和环形缓冲区等。
-
无界队列(Unbounded Queue):无界队列指的是工作队列的容量是无限的,当队列已满时,新的任务会一直等待,直到有线程可用。无界队列可以保证所有任务都会被执行,但可能会导致内存占用过高。常见的无界队列实现方式包括链表和ConcurrentLinkedQueue等。
-
优先级队列(Priority Queue):优先级队列指的是工作队列中的任务可以按照优先级排序,高优先级的任务会先被执行。优先级队列可以优先处理紧急的任务,但可能会导致低优先级的任务长时间等待。常见的优先级队列实现方式包括堆和PriorityQueue等。
-
Synchronous Queue:Synchronous Queue是一种特殊的队列,它不存储任何元素,而是在生产者线程和消费者线程之间直接传递任务。当一个任务需要执行时,它会被直接传递给一个空闲的线程,如果没有空闲的线程,那么任务就会一直等待。Synchronous Queue可以实现无缓冲的、同步的任务传递,但可能会导致任务等待过长时间。
-
Delay Queue:Delay Queue是一种支持延迟执行的队列,它可以存储实现了Delayed接口的元素,并根据元素的延迟时间来确定执行顺序。当一个元素的延迟时间过期时,它就会被取出并执行。Delay Queue可以实现延迟执行的任务调度,常见的应用场景包括定时任务和缓存过期等。
需要根据具体的应用场景和需求,选择合适的工作队列类型。
线程池有那几种状态
1.初始状态:线程池刚创建时处于初始状态,此时线程池中没有任何线程。
2.运行状态:线程池通过调用execute或其他任务提交方法开始接受任务,并根据需要创建新线程来执行任务。
3.暂停状态:当线程池被暂停时,线程池将不再接受新的任务,并将当前正在执行的任务执行完毕后结束。
4.关闭状态:线程池被关闭时,线程池将不再接受新的任务,同时尝试中断正在执行的任务并释放线程池中的所有线程。
5.终止状态:当线程池关闭后,所有线程都已被终止,线程池处于终止状态。
这些状态可以通过监控线程池的状态标志位来跟踪。线程池的状态通常由其生命周期的不同阶段决定,例如创建、运行、暂停、关闭和终止。
评论区