Spring WebFlux 请求的处理流程

了解 SpringMvc 的请求流程源码之后,理解 WebFlux 就容易的多,毕竟 WebFlux 处理流程是模仿 Servlet 另起炉灶的。

注解驱动请求的webflux请求处理流程

webflux 版本:

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webflux</artifactId>
      <version>5.3.22</version>
      <scope>compile</scope>
    </dependency>

注解驱动请求的 webflux 请求处理流程

我们可以对比 SpringMVC 的请求流程图对比来看

我们可以看到,处理流程基本一样,有以下主要的点不同

  • 处理核心
    • WebFlux–DispatcherHandler
    • SpringMvc–DispatcherServlet
  • 返回值处理器
    • WebFlux–HandlerResultHandler
    • SpringMvc–HandlerMethodReturnValueHandler
  • 内容协商配置器
    • WebFlux–RequestedContentTypeResolverBuilder
    • SpringMvc–ContentNegotiationConfigurer

Web MVC VS. WebFlux 核心组件对比

核心控制器 DispatcherHandler

核心控制器 DispatcherHandler,等同于阻塞方式的 DispatcherServlet
DispatcherHandler 实现 ApplicationContextAware,那么必然会调用 setApplicationContext 方法

initStrategies 初始化,获取 HandlerMappingHandlerAdapterHandlerResultHandler 的所有实例

protected void initStrategies(ApplicationContext context) {
    //获取HandlerMapping及其子类型的bean
    //HandlerMapping根据请求request获取handler执行链
    Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true, false);
 
    ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
    //排序
    AnnotationAwareOrderComparator.sort(mappings);
    this.handlerMappings = Collections.unmodifiableList(mappings);
 
    //获取HandlerAdapter及其子类型的bean
    Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true, false);
 
    this.handlerAdapters = new ArrayList<>(adapterBeans.values());
    //排序
    AnnotationAwareOrderComparator.sort(this.handlerAdapters);
 
    //获取HandlerResultHandler及其子类型的bean
    Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true, false);
 
    this.resultHandlers = new ArrayList<>(beans.values());
    AnnotationAwareOrderComparator.sort(this.resultHandlers);
}

DispatcherHandler 的总体流程:

  • 1、通过 HandlerMapping(和DispathcherServlet中的HandlerMapping不同)获取到HandlerAdapter放到ServerWebExchange的属性中
  • 2、获取到HandlerAdapter后触发handle方法,得到HandlerResult
  • 3、通过HandlerResult,触发handleResult,针对不同的返回类找到不同的HandlerResultHandler如视图渲染ViewResolutionResultHandlerServerResponseResultHandlerResponseBodyResultHandlerResponseEntityResultHandler不同容器有不同的实现,如ReactorJettyTomcat等。

HandlerMapping

webflux 中引入了一个新的 HandlerMapping,即 RouterFunctionMapping

RouterFunctionMapping 实现了 InitializingBean,因此在其实例化的时候,会调用 afterPropertiesSet 方法

public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
 
   @Nullable
   private RouterFunction<?> routerFunction;
    //省略部分代码
    
    //afterPropertiesSet()方法 是组件初始化后回调 必须实现InitializingBean接口
    //
   @Override
   public void afterPropertiesSet() throws Exception {
      if (CollectionUtils.isEmpty(this.messageReaders)) {
         ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
         this.messageReaders = codecConfigurer.getReaders();
      }
 
      //初始化routerFunction
      if (this.routerFunction == null) {
         initRouterFunctions();
      }
   }
 
   /**
    * Initialized the router functions by detecting them in the application context.
    * 从应用上下文中查找他们并初始化路由方法
    */
   protected void initRouterFunctions() {
      if (logger.isDebugEnabled()) {
         logger.debug("Looking for router functions in application context: " +
               getApplicationContext());
      }
 
      //查找合并所有路由方法的bean
      List<RouterFunction<?>> routerFunctions = routerFunctions();
      if (!CollectionUtils.isEmpty(routerFunctions) && logger.isInfoEnabled()) {
         routerFunctions.forEach(routerFunction -> logger.info("Mapped " + routerFunction));
      }
       
      //将一个请求中含有多个路由请求方法合并成一个方法
      this.routerFunction = routerFunctions.stream()
            .reduce(RouterFunction::andOther)
            .orElse(null);
   }
 
    //查找并合并所有路由方法
   private List<RouterFunction<?>> routerFunctions() {
       //声明 SortedRouterFunctionsContainer bean
      SortedRouterFunctionsContainer container = new SortedRouterFunctionsContainer();
       //自动注入到上下文中 
      obtainApplicationContext().getAutowireCapableBeanFactory().autowireBean(container);
      //返回路由
      return CollectionUtils.isEmpty(container.routerFunctions) ? Collections.emptyList() :
            container.routerFunctions;
   }
    //省略部分代码
   private static class SortedRouterFunctionsContainer {
 
      @Nullable
      private List<RouterFunction<?>> routerFunctions;
 
       //由上面的方法 自动注入bean时实现依赖查找,查找所有的 RouterFunction beans
       //并注入到 List<RouterFunction<?>> 中。这样就会得到所有实现路由方法的集合
      @Autowired(required = false)
      public void setRouterFunctions(List<RouterFunction<?>> routerFunctions) {
         this.routerFunctions = routerFunctions;
      }
   }
 
}

HandlerAdapter

webflux 中引入了一个新的 HandlerAdapter,即 HandlerFunctionAdapter

webflux 中引入了一个新的 HandlerResultHandler,即 ServerResponseResultHandler

ServerResponseResultHandler 实现了 InitializingBean,因此在其实例化的时候,会调用 afterPropertiesSet 方法

流式处理请求 handler()

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    //handlerMappings在initStrategies()方法中已经构造好了
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    //构造Flux,数据源为handlerMappings集合
    return Flux.fromIterable(this.handlerMappings)
            //获取Mono<Handler>对象,通过concatMap保证顺序和handlerMappings顺序一致
            //严格保证顺序是因为在一个系统中可能存在一个Url有多个能够处理的HandlerMapping的情况
            .concatMap(mapping -> mapping.getHandler(exchange))
            .next()
            //如果next()娶不到值则抛出错误
            .switchIfEmpty(createNotFoundError())
            //触发HandlerApter的handle方法
            .flatMap(handler -> invokeHandler(exchange, handler))
            //触发HandlerResultHandler 的handleResult方法
            .flatMap(result -> handleResult(exchange, result));
}

触发 HandlerApterhandle 方法

private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
    return getResultHandler(result).handleResult(exchange, result)
            .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
                    getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}
 
private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
    if (this.resultHandlers != null) {
        for (HandlerResultHandler resultHandler : this.resultHandlers) {
            if (resultHandler.supports(handlerResult)) {
                return resultHandler;
            }
        }
    }
    throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}

函数式端点请求处理流程

通过上图,我们可以看到,这个处理跟之前的注解驱动请求大有不同,但是请求的流程是万变不离其宗,只是组件有所变化。

接下来我们就跟着流程图一步一步的来解读WebFlux函数端点式请求的源码。

装配阶段

由上图我们可以看到 RouterFunctionMapping是由WebFluxConfigurationSupport创建的,接下来看一下RouterFunctions是怎么合并RouterFunction的并且如何关联到RouterFunctionMapping的。

RouterFunctionMapping 的源码,前面已经介绍了。

请求阶段

请求阶段的核心代码就是 org.springframework.web.reactive.DispatcherHandler#handle方法,我们来看一下源码。

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
   if (logger.isDebugEnabled()) {
      ServerHttpRequest request = exchange.getRequest();
      logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
   }
   if (this.handlerMappings == null) {
      return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
   }
   // 1.HTTP请求进来后执行的流程
   return Flux.fromIterable(this.handlerMappings)  //2 遍历handlerMappings定位RouterFunctionMapping
         .concatMap(mapping -> mapping.getHandler(exchange))   // 3.获取HandlerFunction
         .next()
         .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))  
         .flatMap(handler -> invokeHandler(exchange, handler))   //4.执行
         .flatMap(result -> handleResult(exchange, result));  //5. 处理结果
}

上面的代码已经把大部分的流程说明清楚了,那么我们来看一下 lambda 表达式中每个内部方法的具体实现。

首先我们来看一下步骤3的具体实现 org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler

@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
    //调用 getHandlerInternal 方法来确定HandlerFunction
   return getHandlerInternal(exchange).map(handler -> {
      if (CorsUtils.isCorsRequest(exchange.getRequest())) {
         CorsConfiguration configA = this.globalCorsConfigSource.getCorsConfiguration(exchange);
         CorsConfiguration configB = getCorsConfiguration(handler, exchange);
         CorsConfiguration config = (configA != null ? configA.combine(configB) : configB);
         if (!getCorsProcessor().process(config, exchange) ||
               CorsUtils.isPreFlightRequest(exchange.getRequest())) {
            return REQUEST_HANDLED_HANDLER;
         }
      }
      return handler;
   });
}

上面一大段代码其实主要来获取 handler 的方法是 getHandlerInternal(exchange) 剩下的部分是 跨域处理的逻辑。

我们看一下 这个方法:

@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
   if (this.routerFunction != null) {
      ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
      exchange.getAttributes().put(RouterFunctions.REQUEST_ATTRIBUTE, request);
      return this.routerFunction.route(request);  //通过路由获取到对应处理的HandlerFunction 也就是执行方法
   }
   else {
      return Mono.empty();
   }
}

获取到对应的HandlerFunction后我们就来执行第四步,调用HandlerFunction

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
   if (this.handlerAdapters != null) {
      for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
         if (handlerAdapter.supports(handler)) {  //判断HandlerAdapters中是否支持之前获取到的handler
            return handlerAdapter.handle(exchange, handler);  //执行handler 对应下面handle的方法
         }
      }
   }
   return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}

org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter#handle 方法,这个类中的方法就是处理函数式端点请求的 Adapter 具体实现

@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
   HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
   ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
   return handlerFunction.handle(request)   //由lambda模式 (返回值-参数)  无需准确的方法签名
         .map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE));   //返回HandlerResult对象 
}

这里的 lambda 模式比较难理解,主要是看 HandlerFunction 这个函数式接口

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
 
   /**
    * Handle the given request.
    * @param request the request to handle
    * @return the response
    */
   Mono<T> handle(ServerRequest request);
 
}

我们只需要满足 入参是 ServerRequest 类型 返回值是 Mono<T> 就可以执行。

调用完具体方法之后,我们就可以进行返回值解析序列化了。这里就是 步骤5 处理结果。

private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
    //获取对应的返回结果处理器并处理          
   return getResultHandler(result).handleResult(exchange, result)   
       //如果出现错误或者异常 则选择对应的异常结果处理器进行处理
         .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->                   getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}

我们再来看一下getResultHandler代码

private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
   if (this.resultHandlers != null) {
      for (HandlerResultHandler resultHandler : this.resultHandlers) {
         if (resultHandler.supports(handlerResult)) {
            return resultHandler;
         }
      }
   }
   throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}

我们看一下resultHandlers中都含有哪些返回值处理器

通过截图可以看出返回值解析器跟流程图一一对应。

在匹配到对应的返回值解析器之后进行返回值的封装和写会,这里要注意 DataBuffer 是NIO的写处理,最后写回到浏览器客户端。


Ref:http://www.enmalvi.com/2024/02/26/spring-webflux-dispatcherhandler/