了解 SpringMvc 的请求流程源码之后,理解 WebFlux 就容易的多,毕竟 WebFlux 处理流程是模仿 Servlet 另起炉灶的。
注解驱动请求的webflux请求处理流程
webflux 版本:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.3.22</version>
<scope>compile</scope>
</dependency>
注解驱动请求的 webflux 请求处理流程
我们可以对比 SpringMVC 的请求流程图对比来看
我们可以看到,处理流程基本一样,有以下主要的点不同
- 处理核心
- WebFlux–
DispatcherHandler
- SpringMvc–
DispatcherServlet
- WebFlux–
- 返回值处理器
- WebFlux–
HandlerResultHandler
- SpringMvc–
HandlerMethodReturnValueHandler
- WebFlux–
- 内容协商配置器
- WebFlux–
RequestedContentTypeResolverBuilder
- SpringMvc–
ContentNegotiationConfigurer
- WebFlux–
Web MVC VS. WebFlux 核心组件对比
核心控制器 DispatcherHandler
核心控制器 DispatcherHandler
,等同于阻塞方式的 DispatcherServlet
DispatcherHandler
实现 ApplicationContextAware
,那么必然会调用 setApplicationContext
方法
initStrategies
初始化,获取 HandlerMapping
,HandlerAdapter
,HandlerResultHandler
的所有实例
protected void initStrategies(ApplicationContext context) {
//获取HandlerMapping及其子类型的bean
//HandlerMapping根据请求request获取handler执行链
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
//排序
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
//获取HandlerAdapter及其子类型的bean
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
//排序
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
//获取HandlerResultHandler及其子类型的bean
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
DispatcherHandler 的总体流程:
- 1、通过
HandlerMapping
(和DispathcherServlet
中的HandlerMapping
不同)获取到HandlerAdapter
放到ServerWebExchange
的属性中 - 2、获取到
HandlerAdapter
后触发handle
方法,得到HandlerResult
- 3、通过
HandlerResult
,触发handleResult
,针对不同的返回类找到不同的HandlerResultHandler
如视图渲染ViewResolutionResultHandler
、ServerResponseResultHandler
、ResponseBodyResultHandler
、ResponseEntityResultHandler
不同容器有不同的实现,如Reactor
,Jetty
,Tomcat
等。
HandlerMapping
webflux 中引入了一个新的 HandlerMapping
,即 RouterFunctionMapping
RouterFunctionMapping
实现了 InitializingBean
,因此在其实例化的时候,会调用 afterPropertiesSet
方法
public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
@Nullable
private RouterFunction<?> routerFunction;
//省略部分代码
//afterPropertiesSet()方法 是组件初始化后回调 必须实现InitializingBean接口
//
@Override
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(this.messageReaders)) {
ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
this.messageReaders = codecConfigurer.getReaders();
}
//初始化routerFunction
if (this.routerFunction == null) {
initRouterFunctions();
}
}
/**
* Initialized the router functions by detecting them in the application context.
* 从应用上下文中查找他们并初始化路由方法
*/
protected void initRouterFunctions() {
if (logger.isDebugEnabled()) {
logger.debug("Looking for router functions in application context: " +
getApplicationContext());
}
//查找合并所有路由方法的bean
List<RouterFunction<?>> routerFunctions = routerFunctions();
if (!CollectionUtils.isEmpty(routerFunctions) && logger.isInfoEnabled()) {
routerFunctions.forEach(routerFunction -> logger.info("Mapped " + routerFunction));
}
//将一个请求中含有多个路由请求方法合并成一个方法
this.routerFunction = routerFunctions.stream()
.reduce(RouterFunction::andOther)
.orElse(null);
}
//查找并合并所有路由方法
private List<RouterFunction<?>> routerFunctions() {
//声明 SortedRouterFunctionsContainer bean
SortedRouterFunctionsContainer container = new SortedRouterFunctionsContainer();
//自动注入到上下文中
obtainApplicationContext().getAutowireCapableBeanFactory().autowireBean(container);
//返回路由
return CollectionUtils.isEmpty(container.routerFunctions) ? Collections.emptyList() :
container.routerFunctions;
}
//省略部分代码
private static class SortedRouterFunctionsContainer {
@Nullable
private List<RouterFunction<?>> routerFunctions;
//由上面的方法 自动注入bean时实现依赖查找,查找所有的 RouterFunction beans
//并注入到 List<RouterFunction<?>> 中。这样就会得到所有实现路由方法的集合
@Autowired(required = false)
public void setRouterFunctions(List<RouterFunction<?>> routerFunctions) {
this.routerFunctions = routerFunctions;
}
}
}
HandlerAdapter
webflux 中引入了一个新的 HandlerAdapter
,即 HandlerFunctionAdapter
webflux 中引入了一个新的 HandlerResultHandler
,即 ServerResponseResultHandler
ServerResponseResultHandler
实现了 InitializingBean
,因此在其实例化的时候,会调用 afterPropertiesSet
方法
流式处理请求 handler()
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
//handlerMappings在initStrategies()方法中已经构造好了
if (this.handlerMappings == null) {
return createNotFoundError();
}
//构造Flux,数据源为handlerMappings集合
return Flux.fromIterable(this.handlerMappings)
//获取Mono<Handler>对象,通过concatMap保证顺序和handlerMappings顺序一致
//严格保证顺序是因为在一个系统中可能存在一个Url有多个能够处理的HandlerMapping的情况
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
//如果next()娶不到值则抛出错误
.switchIfEmpty(createNotFoundError())
//触发HandlerApter的handle方法
.flatMap(handler -> invokeHandler(exchange, handler))
//触发HandlerResultHandler 的handleResult方法
.flatMap(result -> handleResult(exchange, result));
}
触发 HandlerApter
的 handle
方法
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
函数式端点请求处理流程
通过上图,我们可以看到,这个处理跟之前的注解驱动请求大有不同,但是请求的流程是万变不离其宗,只是组件有所变化。
接下来我们就跟着流程图一步一步的来解读WebFlux函数端点式请求的源码。
装配阶段
由上图我们可以看到 RouterFunctionMapping
是由WebFluxConfigurationSupport
创建的,接下来看一下RouterFunctions
是怎么合并RouterFunction
的并且如何关联到RouterFunctionMapping
的。
RouterFunctionMapping
的源码,前面已经介绍了。
请求阶段
请求阶段的核心代码就是 org.springframework.web.reactive.DispatcherHandler#handle
方法,我们来看一下源码。
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (logger.isDebugEnabled()) {
ServerHttpRequest request = exchange.getRequest();
logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
}
if (this.handlerMappings == null) {
return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
}
// 1.HTTP请求进来后执行的流程
return Flux.fromIterable(this.handlerMappings) //2 遍历handlerMappings定位RouterFunctionMapping
.concatMap(mapping -> mapping.getHandler(exchange)) // 3.获取HandlerFunction
.next()
.switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
.flatMap(handler -> invokeHandler(exchange, handler)) //4.执行
.flatMap(result -> handleResult(exchange, result)); //5. 处理结果
}
上面的代码已经把大部分的流程说明清楚了,那么我们来看一下 lambda 表达式中每个内部方法的具体实现。
首先我们来看一下步骤3的具体实现 org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
//调用 getHandlerInternal 方法来确定HandlerFunction
return getHandlerInternal(exchange).map(handler -> {
if (CorsUtils.isCorsRequest(exchange.getRequest())) {
CorsConfiguration configA = this.globalCorsConfigSource.getCorsConfiguration(exchange);
CorsConfiguration configB = getCorsConfiguration(handler, exchange);
CorsConfiguration config = (configA != null ? configA.combine(configB) : configB);
if (!getCorsProcessor().process(config, exchange) ||
CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
上面一大段代码其实主要来获取 handler 的方法是 getHandlerInternal(exchange)
剩下的部分是 跨域处理的逻辑。
我们看一下 这个方法:
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.routerFunction != null) {
ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
exchange.getAttributes().put(RouterFunctions.REQUEST_ATTRIBUTE, request);
return this.routerFunction.route(request); //通过路由获取到对应处理的HandlerFunction 也就是执行方法
}
else {
return Mono.empty();
}
}
获取到对应的HandlerFunction
后我们就来执行第四步,调用HandlerFunction
:
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) { //判断HandlerAdapters中是否支持之前获取到的handler
return handlerAdapter.handle(exchange, handler); //执行handler 对应下面handle的方法
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter#handle
方法,这个类中的方法就是处理函数式端点请求的 Adapter 具体实现
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
return handlerFunction.handle(request) //由lambda模式 (返回值-参数) 无需准确的方法签名
.map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE)); //返回HandlerResult对象
}
这里的 lambda 模式比较难理解,主要是看 HandlerFunction
这个函数式接口
@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
/**
* Handle the given request.
* @param request the request to handle
* @return the response
*/
Mono<T> handle(ServerRequest request);
}
我们只需要满足 入参是 ServerRequest
类型 返回值是 Mono<T>
就可以执行。
调用完具体方法之后,我们就可以进行返回值解析序列化了。这里就是 步骤5 处理结果。
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
//获取对应的返回结果处理器并处理
return getResultHandler(result).handleResult(exchange, result)
//如果出现错误或者异常 则选择对应的异常结果处理器进行处理
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult -> getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
我们再来看一下getResultHandler
代码
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}
我们看一下resultHandlers
中都含有哪些返回值处理器
通过截图可以看出返回值解析器跟流程图一一对应。
在匹配到对应的返回值解析器之后进行返回值的封装和写会,这里要注意 DataBuffer
是NIO的写处理,最后写回到浏览器客户端。
Ref:http://www.enmalvi.com/2024/02/26/spring-webflux-dispatcherhandler/