springboot的async的编程

在 Spring 中,使用 @Async 注解可以实现异步方法调用,让被修饰的方法在被调用时在一个新的线程中执行,而不会阻塞主线程。这对于一些耗时的操作或需要后台处理的任务非常有用,比如处理日志、发送邮件、短信等。

要使用 @Async 注解,需要进行以下配置:

  1. 在 Spring 配置文件中启用异步支持。可以通过在 XML 配置文件中添加以下内容来实现:
1
2
xmlCopy code<task:annotation-driven executor="asyncExecutor" />
<task:executor id="asyncExecutor" pool-size="5" />

这里配置了一个线程池 asyncExecutor,可以根据需要调整线程池大小。

  1. 在启动类上添加 @EnableAsync 注解,开启异步支持。
  2. 在需要异步执行的方法上添加 @Async 注解。

示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
javaCopy codeimport org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    @Async
    public void doAsyncTask() {
        // 异步任务的逻辑
        System.out.println("Async method is running in a new thread.");
    }

    // 另一个异步方法
    @Async
    public void doAnotherAsyncTask() {
        // 异步任务的逻辑
        System.out.println("Another async method is running in a new thread.");
    }
}

在上面的示例中,MyService 类中的 doAsyncTaskdoAnotherAsyncTask 方法被标记为异步方法,当这些方法被调用时,会在新的线程中执行。

需要注意的是,异步方法必须在 Spring 管理的 Bean 中才能正常工作。通常情况下,将 @Async 注解放在类级别上会将该类的所有方法都标记为异步方法。如果只想让某个方法异步执行,就将 @Async 注解放在该方法上。

同时,由于异步方法会在新线程中执行,所以不能依赖于主线程中的任何变量或上下文。确保在异步方法中处理好所需的数据和资源。

要在 Spring Boot 中使用 @Async 注解,需要按照以下步骤:

  1. 在启动类上加上 @EnableAsync 注解,开启异步支持。
  2. 在需要异步执行的方法上添加 @Async 注解,并可以指定使用的线程池名称,如 @Async("asyncServiceExecutor")
  3. 配置自定义的线程池,如果不进行配置,则使用默认的线程池。配置可以在一个专门的配置类中进行,使用 ThreadPoolTaskExecutor 来创建线程池,设置线程池的参数,如核心线程数、最大线程数、队列大小、线程名前缀等。

使用 @Async 注解后,被标记的方法在被调用时,会在一个新的线程中执行,而不会阻塞主线程。这样可以实现并发处理,提高系统的性能和吞吐量。

同时,文本中还提到了解决异步调用中的一些问题:

  • 异步方法失效问题:如果异步方法在同一个类内调用,由于 Spring 使用动态代理来实现异步调用,同类调用时并不会经过 Spring 容器获取代理对象,导致注解失效。解决方法是将需要异步执行的方法单独抽取成一个类,确保调用的是代理对象而不是对象本身。
  • 异步方法无法被私有方法调用:异步方法不能被私有化,因为异步方法调用需要通过 Spring 代理对象,而私有方法无法被代理。
  • 异步方法无法被静态方法调用:异步方法也不能被静态方法调用,因为静态方法属于类本身,不属于实例,无法通过 Spring 代理来调用。
  • 其他问题:异步调用可能出现线程安全的问题,需要注意共享资源的访问。

综上所述,异步调用在适当的场景下可以提高系统性能和并发处理能力,但也需要小心处理可能出现的问题。正确地配置和使用异步方法,可以使系统更加高效和稳定。

异步请求与异步调用的区别

两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到 kafka 中做日志分析等。

异步请求是会一直等待 response 相应的,需要返回结果给客户端的;而异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。

异步请求的实现

方式一:Servlet 方式实现异步请求

@RequestMapping(value = “/email/servletReq”, method = GET)

public void servletReq (HttpServletRequest request, HttpServletResponse response) {

AsyncContext asyncContext = request.startAsync();

//设置***:可设置其开始、完成、异常、超时等事件的回调处理

asyncContext.addListener(new AsyncListener() {

@Override

public void onTimeout(AsyncEvent event) throws IOException {

System.out.println(“超时了…”);

//做一些超时后的相关操作…

}

@Override

public void onStartAsync(AsyncEvent event) throws IOException {

System.out.println(“线程开始”);

}

@Override

public void onError(AsyncEvent event) throws IOException {

System.out.println(“发生错误:"+event.getThrowable());

}

@Override

public void onComplete(AsyncEvent event) throws IOException {

System.out.println(“执行完成”);

//这里可以做一些清理资源的操作…

}

});

//设置超时时间

asyncContext.setTimeout(20000);

asyncContext.start(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(10000);

System.out.println(“内部线程:” + Thread.currentThread().getName());

asyncContext.getResponse().setCharacterEncoding(“utf-8”);

asyncContext.getResponse().setContentType(“text/html;charset=UTF-8”);

asyncContext.getResponse().getWriter().println(“这是异步的请求返回”);

} catch (Exception e) {

System.out.println(“异常:"+e);

}

//异步请求完成通知

//此时整个请求才完成

asyncContext.complete();

}

});

//此时之类 request 的线程连接已经释放了

System.out.println(“主线程:” + Thread.currentThread().getName());

}

方式二:使用很简单,直接返回的参数包裹一层 callable 即可,可以继承 WebMvcConfigurerAdapter 类来设置默认线程池和超时处理

@RequestMapping(value = “/email/callableReq”, method = GET)

@ResponseBody

public Callable callableReq () {

System.out.println(“外部线程:” + Thread.currentThread().getName());

return new Callable() {

@Override

public String call() throws Exception {

Thread.sleep(10000);

System.out.println(“内部线程:” + Thread.currentThread().getName());

return “callable!”;

}

};

}

@Configuration

public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {

@Resource

private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;

@Override

public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {

//处理 callable 超时

configurer.setDefaultTimeout(60*1000);

configurer.setTaskExecutor(myThreadPoolTaskExecutor);

configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());

}

@Bean

public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {

return new TimeoutCallableProchttp://essingInterceptor();

}

}

方式三:和方式二差不多,在 Callable 外包一层,给 WebAsyncTask 设置一个超时回调,即可实现超时处理

@RequestMapping(value = “/email/webAsyncReq”, method = GET)

@ResponseBody

public WebAsyncTask webAsyncReq () {

System.out.println(“外部线程:” + Thread.currentThread().getName());

Callable result = () -> {

System.out.println(“内部线程开始:” + Thread.currentThread().getName());

try {

TimeUnit.SECONDS.sleep(4);

} catch (Exception e) {

// TODO: handle exception

}

logger.info(“副线程返回”);

System.out.println(“内部线程返回:” + Thread.currentThread().getName());

return “success”;

};

WebAsyncTask wat = new WebAsyncTask(3000L, result);

wat.onTimeout(new Callable() {

@Override

public String call() throws Exception {

// TODO Auto-generated method stub

return “超时”;

}

});

return wat;

}

方式四:DeferredResult 可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

@RequestMapping(value = “/email/deferredResultReq”, method = GET)

@ResponseBody

public DeferredResult deferredResultReq () {

System.out.println(“外部线程:” + Thread.currentThread().getName());

//设置超时时间

DeferredResult result = new DeferredResult(60*1000L);

//处理超时事件 采用委托机制

result.onTimeout(new Runnable() {

@Override

public void run() {

System.out.println(“DeferredResult 超时”);

result.setResult(“超时了!”);

}

});

result.onCompletion(new Runnable() {

@Override

public void run() {

//完成后

System.out.println(“调用完成”);

}

});

myThreadPoolTaskExecutor.execute(new Runnable() {

@Override

public void run() {

//处理业务逻辑

System.out.println(“内部线程:” + Thread.currentThread().getName());

//返回结果

result.setResult(“DeferredResult!!”);

}

});

return result;

}

SpringBoot 中异步调用的使用

1、介绍

异步请求的处理。除了异步请求,一般上我们用的比较多的应该是异步调用。通常在开发过程中,会遇到一个方法是和实际业务无关的,没有紧密性的。比如记录日志信息等业务。这个时候正常就是启一个新线程去做一些业务处理,让主线程异步的执行其他业务。

2、使用方式(基于 spring 下)

需要在启动类加入@EnableAsync 使异步调用@Async 注解生效

在需要异步执行的方法上加入此注解即可@Async(“threadPool”),threadPool 为自定义线程池

代码略。。。就俩标签,自己试一把就可以了

3、注意事项

在默认情况下,未设置 TaskExecutor 时,默认是使用 SimpleAsyncTaskExecutor 这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。可通过控制台日志输出可以看出,每次输出线程名都是递增的。所以最好我们来自定义一个线程池。

调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为 Spring 在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。

其他的注解如@Cache 等也是一样的道理,说白了,就是 Spring 的代理机制造成的。所以在开发中,最好把异步服务单独抽出一个类来管理。下面会重点讲述。

4、什么情况下会导致@Async 异步方法会失效?

a.调用同一个类下注有@Async 异步方法:在 spring 中像@Async 和@Transactional、cache 等注解本质使用的是动态代理,其实 Spring 容器在初始化的时候 Spring 容器会将含有 AOP 注解的类对象“替换”为代理对象(简单这么理解),那么注解失效的原因就很明显了,就是因为调用方法的是对象本身而不是代理对象,因为没有经过 Spring 容器,那么解决方法也会沿着这个思路来解决。

b.调用的是静态(static )方法

c.调用(private)私有化方法

5、解决 4 中问题 1 的方式(其它 2,3 两个问题自己注意下就可以了)

将要异步执行的方法单独抽取成一个类,原理就是当你把执行异步的方法单独抽取成一个类的时候,这个类肯定是被 Spring 管理的,其他 Spring 组件需要调用的时候肯定会注入进去,这时候实际上注入进去的就是代理类了。

其实我们的注入对象都是从 Spring 容器中给当前 Spring 组件进行成员变量的赋值,由于某些类使用了 AOP 注解,那么实际上在 Spring 容器中实际存在的是它的代理对象。那么我们就可以通过上下文获取自己的代理对象调用异步方法。

@Controller

@RequestMapping("/app”)

public class EmailController {

//获取 ApplicationContext 对象方式有多种,这种最简单,其它的大家自行了解一下

@Autowired

private ApplicationContext applicationContext;

@RequestMapping(value = “/email/asyncCall”, method = GET)

@ResponseBody

public Map asyncCall () {

Map resMap = new HashMap();

try{

//这样调用同类下的异步方法是不起作用的

//this.testAsyncTask();

//通过上下文获取自己的代理对象调用异步方法

EmailController emailController = (EmailController)applicationContext.getBean(EmailController.class);

emailController.testAsyncTask();

resMap.put(“code”,200);

}catch (Exception e) {

resMap.put(“code”,400);

logger.error(“error!",e);

}

return resMap;

}

//注意一定是 public,且是非 static 方法

@Async

public void testAsyncTask() throws InterruptedException {

Thread.sleep(10000);

System.out.println(“异步任务执行完成!”);

}

}

6、开启 cglib 代理,手动获取 Spring 代理类,从而调用同类下的异步方法。

首先,在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解。

代码实现,如下:

@Service

@Transactional(value = “transactionManager”, readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)

public class EmailService {

@Autowired

private ApplicationContext applicationContext;

@Async

public void testSyncTask() throws InterruptedException {

Thread.sleep(10000);

System.out.println(“异步任务执行完成!”);

}

public void asyncCallTwo() throws InterruptedException {

//this.testSyncTask();

// EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);

// emailService.testSyncTask();

boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象;

boolean isCglib = AopUtils.isCglibProxy(EmailController.class); //是否是 CGLIB 方式的代理对象;

boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class); //是否是 JDK 动态代理方式的代理对象;

//以下才是重点!!!

EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);

EmailService proxy = (EmailService) AopContext.currentProxy();

System.out.println(emailService == proxy ? true : false);

proxy.testSyncTask();

System.out.println(“end!!!”);

}

异步编程如何单元测试

由于上述两种方式都存在一定的缺陷, 因此我们不得不进一步的研究文档和源码, 以寻求更合适的测试方式.

查看 Spring 官方提供的引导教程 Creating Asynchronous Methods (opens new window)后发现, 这篇文章中仅介绍了如何开启异步功能并如何使用 @Async 注解, 但是完全没有介绍如何对其进行测试. 前往其对应的仓库 https://github.com/spring-guides/gs-async-method (opens new window), 也没有发现测试相关的内容. 😢

考虑到平时学习异步相关的内容时都会牵扯到 Executor, 因此以它为切入点, 想办法把测试用例中的异步执行器替换成同步执行器, 但是怎么去替换, 大家都不懂, 看来只能研究源码了 🕵️‍♂️.

spring-context-5.3.10.jar 为例, 查看 @EnableAsync 注解发现它导入了一个 AsyncConfigurationSelector 类:

1
2
3
// ...
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync { ... }

查看 AsyncConfigurationSelector 后发现其默认使用了 ProxyAsyncConfiguration 提供的配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// ...
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
	// ...
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY: // 默认使用 JDK 的 Proxy 实现代理功能
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

查看 ProxyAsyncConfiguration 时终于出现了 Executor 相关的内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ...
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	// ...
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler); // OHHHHHHHHHHHHHHHHHHHHHHHHH
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

ProxyAsyncConfiguration 使用了父类 AbstractAsyncConfiguration 提供的 executor 用于配置 AsyncAnnotationBeanPostProcessor, 而这个 executor 又是由一个带有 @Autowired 注解的方法初始化的 😉.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// ...
public abstract class AbstractAsyncConfiguration implements ImportAware {
	@Nullable
	protected Supplier<Executor> executor;
	// ...
	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {return;}
		if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}
}

这样的话, 我们只需要向 Spring 提供一个 AsyncConfigurer 对象, 然后在 getAsyncExecutor 方法中返回一个同步执行的 Executor 是不是就行了呢? 尝试添加一个 AsyncConfig 到测试代码集:

1
2
3
4
5
6
7
8
@Component
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        // 测试环境下立即执行异步任务
        return new org.springframework.core.task.SyncTaskExecutor();
    }
}

这里我们借用了 Spring 提供的 SyncTaskExecutor, 它的实现方式非常简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class SyncTaskExecutor implements TaskExecutor, Serializable {

	/**
	 * Executes the given {@code task} synchronously, through direct
	 * invocation of it's {@link Runnable#run() run()} method.
	 * @throws IllegalArgumentException if the given {@code task} is {@code null}
	 */
	@Override
	public void execute(Runnable task) {
		Assert.notNull(task, "Runnable must not be null");
		task.run();
	}
}

然后将 AsyncConfig 导入到我们的测试用例中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@SpringBootTest
@Import(AsyncConfig.class)
class ReplaceExecutorAsyncServceTest {

	@Autowired
	AsyncService service;

	@Test
	void test() {
		service.myAsyncMethod();
		// 原本的异步业务现在已经是同步执行了
		assertTrue(service.done);
	}
}

再次运行测试发现测试通过, 说明测试用例已经使用我们提供的同步执行器在运行原本的异步业务了 🎉.

这样, 只需要在希望同步测试的用例中导入 AsyncConfig 即可像平常一样对业务进行断言了, 既不用修改业务代码, 也不用浪费时间做不必要的等待.

#

参考文档:

这次事故不仅仅是 RestTemplate 的锅还有@Async

https://www.modb.pro/db/136939

https://www.eolink.com/news/post/24448.html

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up