本篇文章总结下在工作中使用到的技术。
异步
异步的使用场景在调用某个接口时,需要同步修改其他信息,而其他信息的返回结果是不关心的,使用异步来处理,提高接口的相应速度。
在 Spring Boot 中,可以使用 @Async
注解很方便地使用异步。使用步骤如下
- 配置开启异步,指定线程池(可选)
在配置类中添加 @EnableAsync
即可配置异步。同时手动构造一个线程池,用来执行异步任务
@Configuration
@EnableAsync
public class ThreadPoolConfig {
private static final int CORE_NUM = Runtime.getRuntime().availableProcessors();
@Bean("consumerThreadPool")
public ExecutorService buildThreadPool() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("consumer-thread-%d").build();
ExecutorService threadPool = new ThreadPoolExecutor(CORE_NUM, CORE_NUM * 2, 10L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), threadFactory,
new ThreadPoolExecutor.AbortPolicy());
return threadPool;
}
}
- 使用异步
使用时,在需要异步的方法上加 @Async
注解。如果需要使用自定义的线程池,使用 @Async("指定的线程池Bean")
需要注意,在 Spring Boot中,使用 @Async
有两个限制:
- 方法必须使用 public
- 不能在同一个类中调用 含有
@Async
注解的方法,会导致注解失效
原因在于:Spring 扫描 bean 会扫描该类方法是否含有 @Async
注解,如果包含,生成一个代理类,利用 apo 为这些方法加上异步逻辑。如果这个异步方法在同一个类中被调用,该方法没有通过异步的代理类,而是由该类的代理,因此没有异步执行逻辑。
即调用方和被调用方是在同一个类中,是无法产生切面的,该对象没有被Spring容器管理
正确的写法如下:在另一个类中维护异步方法
public interface FileCommonService {
@Async("consumerThreadPool")
void deleteFileByIdAsync(String fileId);
}
public class FileCommonServiceImpl implements FileCommonService {
// ...
}
线程池
在业务中会遇到一些请求多个接口的情况,然后组装各个接口的返回数据。接口之间几乎不存在关联,可以独立请求,使用单线程会耗费大量的时间,使用并发,可以大大减少请求时间。
线程池是一种池化技术,使用线程池的好处在于:
- 降低资源消耗。线程的创建和销毁非常耗费资源,使用线程池可以避免频繁地创建和销毁线程,
- 提高响应速度。使用线程池,无序创建线程,立即执行,提高响应速度。
- 更方便的管理线程。使用线程池统一分配管理。
JDK 自带的线程池
JUC 包下的 Executors 工具提供了四种预先创建好的线程池:
- newCachedThreadPool
- newFixedThreadPool
- newScheduledThreadPool
- newSingleThreadExecutor
底层都是预定义参数创建线程池,以 newCachedThreadPool 为例:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
尽量不要使用这四种自带的线程池。
- newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM
线程池主要参数
线程池的定义:
|
|
其中一个主要的参数:
- corePoolSize:核心线程数。
- maximumPoolSize:最大核心线程数
- BlockingQueue:阻塞队列
corePoolSize、maximumPoolSize、BlockingQueue 在提交任务后的关系为:
- 如果 当前运行线程数 < corePoolSize ,立即创建一个线程并执行
- 如果 当前运行线程数 >= corePoolSize,且阻塞队列未满,将任务放入阻塞队列中,等待调用。
- 如果 corePoolSize <= 当前运行线程数 < maximumPoolSize,且阻塞队列已满,创建一个线程并立即执行
- 如果 当前运行线程数 >= maximumPoolSize,且阻塞队列已满,执行拒绝策略。
手动创建线程池
更推荐手动创建线程池。
在 Spring Boot 中,可以通过 @Bean
配置一个线程池
|
|
使用的时候直接引用:
|
|
一般来讲,会根据业务划分多个线程池,不同的业务使用不同的线程池,避免一个业务阻塞导致其他业务产生异常。
分布式锁
在维护另一个同事的功能时有这样一个场景:进入某个页面时,需要对另一个页面的数据进行汇总、计算展示,接口流程大致如下:
- 在页面 A 进行数据的添加、删除、编辑的操作,数据变更时维护一个Redis key,标记数据有变更
- 进入汇总页面 B,读取 Redis 中的key,判断是否需要进行数据的汇总等操作,不需要就返回数据
- 需要重新汇总数据,读取 A 的数据,根据相关逻辑将 A 中的数据进行计算,添加到 B 表中进行添加、删除等操作
- 汇总数据完成,删除 Redis 中的key。
可以发现,2、3是一个耗时操作,以上过程在并发环境存在一个明显的问题:并发场景下,线程1调用接口,读取 Redis(步骤1),进行数据汇总(2、3);此时另一个线程 B 调用接口,读取 Redis,此时 线程1还在处理数据,没有删除 Redis 中的标记,因此同样进入数据汇总(2、3)...在第一个线程未删除 Redis 中的标记时,其他线程也会汇总数据,便会在数据库中产生重复数据。
因此,解决思路应该是:添加一个锁。第一个获取到Redis key 的线程获取到一个锁,并且只有获取到锁的线程能够汇总数据。锁被获取后,其他线程获取不到锁,不进行数据汇总。我们使用的是微服务架构,可能在多台机器上部署,因此,使用线程锁是不行的,必须使用分布式锁。
分布式锁的主要实现有几种流行方案:Redis 、zk。我们的项目没有使用 zk,因此,使用 Redis 是一种不错的方案。
基于 Redis 的分布式锁
关于 Redis 的分布式锁网上有很多,包括各种解决方案、存在的问题、集群部署存在的问题等。简单阐述一下:
- 锁在集群部署下出现的问题。我们的 Redis 使用的是单机部署,可以暂时不考虑
- 使用 redission 完成分布式锁,redisson 足够强大,锁过期、原子性等问题都以解决
最后的实现:
- 首先获取 Redis 锁,然后读取标记判断是否需要添加数据
- 获取到Redis 且需要添加数据,才添加数据。分布式锁同一时间只能有一个线程持有,因此其他线程不添加数据
- 添加数据完成,修改标记,删除锁。
- 此时如果其他线程获取到了锁,标记已经修改,同样不添加数据。