Spring framework
中有一个叫 @Async
的注解。顾名思义,它是一个能将方法异步化的注解。 @Async
可以作用在方法或者类上,只需加上这一注解,就可以轻松将本来同步调用的方法变为异步调用。在实际应用中,如果我们在一段逻辑中不关心某个方法的具体返回值,只是希望调用这个方法,便可以在这个方法加上这个注解,使主逻辑快速返回。
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
/**
* 指定执行器名称。
* 可以是任意一个 java.util.concurrent.Executor 或者 org.springframework.core.task.TaskExecutor
*/
String value() default "";
}
使用方法
环境与配置
@Async
在spring3.0之后就支持了,这里我们测试的环境为:
JAVA 1.8
Spring Boot 1.5.8
Tomcat 7
pom
中的关键配置:
<groupId>org.hxuhao.spring.cloud.demo</groupId>
<artifactId>user-service-provider</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Dalston.SR4</spring-cloud.version>
</properties>
<dependencies>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入 servlet-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
</dependencies>
主要类
首先,我们一个配置类来开启对 @Async
的支持,当然也可以通过 xml
的形式配置
配置类
@EnableAsync // 开启@Async
@Component
public class AsyncConfig {
}
业务类
然后,我们就可以在需要的方法上加上 @Async
,为了测试,我们会输出执行方法的具体线程。
/**
* 用户控制器
*/
@RestController
@RequestMapping("/users")
public class UserController {
@Autowired
private UserService userService;
@GetMapping(value = "/getUser")
public String getUser() {
System.out.println(String.format("UserController.getUser current thread %s", Thread.currentThread().getName()));
String user = userService.getUser();
userService.asyncFunction();
return user;
}
}
---
/**
* 用户服务 (这里为篇幅省略,就不贴UserService这个接口了)
*/
@Service
public class UserServiceImpl implements UserService {
@Override
public String getUser() {
System.out.println(String.format("UserServiceImpl.getUser current thread %s", Thread.currentThread().getName()));
return "hxuhao233";
}
/**
* 可以异步执行的方法
*/
@Async
@Override
public void asyncFunction() {
System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
return;
}
}
启动应用后通过调用 http://localhost:{port}/users/getUser
,可以看到控制台的输出
// 第一次调用
UserController.getUser current thread http-apr-8083-exec-4
UserServiceImpl.getUser current thread http-apr-8083-exec-4
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-2
// 第二次调用
UserController.getUser current thread http-apr-8083-exec-7
UserServiceImpl.getUser current thread http-apr-8083-exec-7
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-3
// 第三次调用
UserController.getUser current thread http-apr-8083-exec-10
UserServiceImpl.getUser current thread http-apr-8083-exec-10
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-4
通过这段输出,我们发现:加上 @Async
的 asyncFunction
被 SimpleAsyncTaskExecutor
执行了,而不是被Tomcat本身的线程执行,从而实现了
方法的异步调用。那么这个 SimpleAsyncTaskExecutor
是什么呢?
SimpleAsyncTaskExecutor
通过查看源码,我们发现, SimpleAsyncTaskExecutor
其实是 @Async
默认的一种实现方式,因为异步执行肯定用到其他线程池,然而在我们的代码中并没有指定线程池,所以 Spring
默认使用了 SimpleAsyncTaskExecutor
。
需要注意的是: SimpleAsyncTaskExecutor
并不是标准意义上的线程池,因为它对每一次方法调用都是直接 新建一个线程 去执行的,没有复用线程。这是一个非常 危险 的行为,在实际应用中可能为引起巨大的 资源浪费 。
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
protected void doExecute(Runnable task) {
// 每次都new一个线程
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
public Thread createThread(Runnable runnable) {
Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
thread.setPriority(getThreadPriority());
thread.setDaemon(isDaemon());
return thread;
}
}
为 @Async
指定线程池
单线程池
为了指定线程池,我们需要把配置类实现 AsyncConfigurer
,它有两个方法需要实现,一个方法指定线程池,另一个方法指定异常处理的方式。
@EnableAsync
@Component
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
// 实际线程池的具体参数等配置应跟实际业务修改。
Executor executor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
new CustomizableThreadFactory("async-pool-"));
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// 异常处理
return new SimpleAsyncUncaughtExceptionHandler();
}
}
修改之后再次调用 http://localhost:{port}/users/getUser
,可以看到控制台输出
// 第一次调用
UserController.getUser current thread http-apr-8083-exec-7
UserServiceImpl.getUser current thread http-apr-8083-exec-7
UserServiceImpl.asyncFunction current thread async-pool-1
// 第二次调用
UserController.getUser current thread http-apr-8083-exec-10
UserServiceImpl.getUser current thread http-apr-8083-exec-10
UserServiceImpl.asyncFunction current thread async-pool-2
// 第三次调用
UserController.getUser current thread http-apr-8083-exec-8
UserServiceImpl.getUser current thread http-apr-8083-exec-8
UserServiceImpl.asyncFunction current thread async-pool-1
这次就是我们自己指定的线程池执行 asyncFunction
了。
指定多线程池
如果我们需要方法的执行不受别的方法影响,或者不同方法在不同的线程中执行,就需要配置多个线程池。
并且使用 @Async
时必须指定执行 执行器的名称
@EnableAsync
@Component
public class AsyncConfig{
@Bean("async-pool-bean-1")
public Executor getExecutor1() {
return new ThreadPoolExecutor(2, 2, 1,
TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
new CustomizableThreadFactory("async-pool-1-"));
}
@Bean("async-pool-bean-2")
public Executor getExecutor2() {
return new ThreadPoolExecutor(2, 2, 1,
TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
new CustomizableThreadFactory("async-pool-2-"));
}
}
---
@Async("async-pool-bean-1") // 指定由 async-pool-bean-1 执行
@Override
public void asyncFunction() {
System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
return;
}
@Async("async-pool-bean-2") // 指定由 async-pool-bean-2 执行
@Override
public void asyncFunction2() {
System.out.println(String.format("UserServiceImpl.asyncFunction2 current thread %s", Thread.currentThread().getName()));
return;
}
原理
大致分为3块:
- 应用启动时。
- 构造业务类时。
- 异步方法执行时。
应用启动
首先,要使用 @Async
,必须先用 @EnableAsyc
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* 这里可以自定义使方法异步化的注解,一般我们直接用@Async就行
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* 实现异步代理的切面模式,一种是jdk动态代理(默认使用),一种是ASPECJ
*/
AdviceMode mode() default AdviceMode.PROXY;
}
其引入了 AsyncConfigurationSelector
,这是一个选择器,根据 @EnableAsync
中的 mode
来决定使用哪种异步配置。由于使用我们的配置是 AdviceMode.PROXY
,所以这里返回 ProxyAsyncConfiguration
的名称,表示后续会构建 ProxyAsyncConfiguration
。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] { ProxyAsyncConfiguration.class.getName() }; // 走的这
case ASPECTJ:
return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
default:
return null;
}
}
}
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
protected Executor executor; // 继承自AbstractAsyncConfiguration
protected AsyncUncaughtExceptionHandler exceptionHandler; // 继承自AbstractAsyncConfiguration
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
if (this.executor != null) {
bpp.setExecutor(this.executor);
}
if (this.exceptionHandler != null) {
bpp.setExceptionHandler(this.exceptionHandler);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
ProxyAsyncConfiguration
会构造一个 AsyncAnnotationBeanPostProcessor
。
AsyncAnnotationBeanPostProcessor
会被 spring
添加到bean的后置处理器列表中,可以在bean构造之后在执行某些操作,在后面构造业务类时也会出现。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
protected final Log logger = LogFactory.getLog(getClass());
private Class<? extends Annotation> asyncAnnotationType; // 异步的注解,比如@Async
private Executor executor; // 默认的线程池
private AsyncUncaughtExceptionHandler exceptionHandler; // 异常处理器
protected Advisor advisor; // (实在不知道怎么翻译。。。理解为一种切面的基础接口,可以用来获取切面) ,继承自AbstractBeanFactoryAwareAdvisingPostProcessor
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
在 setBeanFactory
中, AsyncAnnotationBeanPostProcessor
创建了一个 AsyncAnnotationAdvisor
。
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private AsyncUncaughtExceptionHandler exceptionHandler;
private Advice advice;
private Pointcut pointcut;
public AsyncAnnotationAdvisor(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(2);
asyncAnnotationTypes.add(Async.class);
this.advice = buildAdvice(executor, this.exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
/**
* 构造切面
*/
protected Advice buildAdvice(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
}
/**
* 构造切点
*/
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return result;
}
}
AsyncAnnotationAdvisor
构造了一个切点,就包括 @Async
还有我们自定义的异步化注解。
另外还构造了一个切面 AnnotationAsyncExecutionInterceptor
,这就是 @Async
能异步执行的关键,在后面执行业务代码时再介绍。
构造业务类
以上面的 UserServiceImpl
举例
@Service
public class UserServiceImpl implements UserService {
@Override
public String getUser() {
System.out.println(String.format("UserServiceImpl.getUser current thread %s", Thread.currentThread().getName()));
return "hxuhao233";
}
/**
* 可以异步执行的方法
*/
@Async
@Override
public void asyncFunction() {
System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
return;
}
}
当 spring
在构造这个 bean
时,会执行所有 BeanPostProcessor
的postProcessAfterInitialization,这其中就包括上面提到的 AsyncAnnotationBeanPostProcessor
public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
implements AutowireCapableBeanFactory {
@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
throws BeansException {
Object result = existingBean;
// 依次调用所有注册到spring的PostProcessor的postProcessAfterInitialization方法
for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) {
result = beanProcessor.postProcessAfterInitialization(result, beanName);
if (result == null) {
return result;
}
}
return result;
}
}
在 postProcessAfterInitialization
方法中, UserSerivceImpl
被包起来,返回了一个新的代理类
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
private ConfigurableListableBeanFactory beanFactory;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// ...
// isEligible 是用来判断这个bean是否应该被代理起来,具体到这就是判断这个bean有没有加上@Async注解
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
// 工厂类设置`AsyncAnnotationAdvisor`
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// 通过工厂创建代理类
return proxyFactory.getProxy(getProxyClassLoader());
}
// ...
}
}
最后 UserServiceImpl
的代理类长这样:
异步方法执行
当实际执行到 UserServiceImpl
的 asyncFunction
时, advisor
中的 AnnotationAsyncExecutionInterceptor
就发挥作用了
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 选择具体执行方法的线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 构造任务
Callable<Object> task = new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
// 调用线程池执行具体的方法(asyncFunction)
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
因此, asyncFunction
就交给另外的线程执行,实现了方法异步化。
原文:spring框架中的@Async使用以及原理浅析 (1)_Hxuhao2333的博客-CSDN博客
原文:spring框架中的@Async使用以及原理浅析 (2)_Hxuhao2333的博客-CSDN博客
作者: Hxuhao2333