相关文章推荐
豪爽的小狗  ·  HECATE GS02 遊戲USB ...·  5 月前    · 
近视的眼镜  ·  游戏王OCG直播双子构筑浅谈——上篇【构筑篇 ...·  6 月前    · 
好帅的咖啡  ·  第19届三星车险杯世界围棋公开赛·  2 年前    · 
很拉风的红酒  ·  便民||@所有人,好消息!羊城通APP线上商 ...·  2 年前    · 
跑龙套的弓箭  ·  大学生安全教育心得体会- 知乎·  2 年前    · 
小百科  ›  Spring WebFlux运用中的反思与区隔-腾讯云开发者社区-腾讯云
test spring框架 线程池 undertow
瘦瘦的泡面
2 年前
作者头像
干货满满张哈希
0 篇文章

Spring WebFlux运用中的思考与对比

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP
返回腾讯云官网
社区首页 > 专栏 > 干货满满张哈希 > Spring WebFlux运用中的思考与对比

Spring WebFlux运用中的思考与对比

作者头像
干货满满张哈希
发布 于 2021-04-12 14:22:46
945 0
发布 于 2021-04-12 14:22:46
举报

系列目录:

  1. Spring WebFlux运用中的思考与对比
  2. CompletableFuture与Spring的Sleuth结合工具类
  3. CommpetableFuture使用anyOf过程中的一些优化思考
  4. 结合CompletableFuture与Spring的Sleuth结合工具类与allOf以及anyOf

本文基于Spring Cloud Finchley SR4

本文通过几个问题,解析下Spring WebFlux用法最佳实践,并与另一框架Vertx作对比

1. 是否一定要用默认的Web容器,用自己的Web容器是否可以,同时是否可以有web和webflux

是可以的,这样的依赖是可行的( 容器 用tomcat和undertow或者其他都可以,这里使用undertow):

2. 怎样实现真正的异步背压的Reactor模型呢?

这个问题,除此运用像WebFlux和Vertx的框架的人,都会对这个有误解。认为仅仅简单的把webFlux的依赖添加进来,之后接口返回Mono就实现了异步背压的Reactor模型。实际上并不是这样的。 我们来举几个例子,分步骤深入了解下。 首先为了测试方便,我们将web容器的处理http请求线程池的大小改成唯一一个,对于Tomcat,配置:

server.thread.max-thread=1

对于UnderTow(我们这里用的是underTow):

# 设置IO线程数, 它主要执行非阻塞的任务,它们会负责多个连接, 默认设置每个CPU核心一个线程
server.undertow.io-threads=1
# 阻塞任务线程池, 当执行类似servlet请求阻塞IO操作, undertow会从这个线程池中取得线程
# 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8
server.undertow.worker-threads=1

之后,配置Log4J2日志格式为:

test
    yyyy-MM-dd HH:mm:ss.SSS
    %xwEx
        %d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}

这样的格式可以使我们看到线程号,还有sleuth的traceId和spanId(我们的项目依赖了sleuth)。 首先编写测试代码,看看直接简单调用并just是否实现了异步背压:

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;
    @RequestMapping("/test")
    public Mono test() {
        log.info("test started");
        return Mono.just(testService.simulateIOTest());
    @Service
    public static class TestService {
        public String simulateIOTest() {
            try {
                //simulate io
                log.info("simulate start");
                TimeUnit.SECONDS.sleep(5);
                log.info("simulate end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            return "hello";
}

并发调用接口,查看日志,发现:

2019-11-12 09:05:41.595  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:41.596  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:46.598  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:51.636  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:56.644  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end

可以明显看出,请求是串行处理的,因为只有一个线程,并且这个线程还在等待请求处理完。这就不符合Reactor模型,处理http请求的线程XNIO-2 task-1应该不等待请求处理完而直接处理下一个请求才对。 把 Mono.just(testService.simulateIOTest()) 替换成 Mono.fromCallable(() -> testService.simulateIOTest()) 等等类似的是一样的效果 ,这里必须自己用其他的线程池,去处理实际请求,处理结束的时候,将结果填写到最外层的Mono里面。这样的话,考虑到代码整洁性不采用纯回调写法,要求每一个调用方法返回的都是Future类型的。这里我们返回CompletableFuture。

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;
    @RequestMapping("/test")
    public Mono test() {
        log.info("test started");
        return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> {
            log.info("apply");
            //填写成功结果
            stringMonoSink.success(s);
            return s;
    @Service
    public static class TestService {
        public CompletableFuture simulateIOTest() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                return "hello";
}

结果是:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.155  INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.962  INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.963  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:05.756  INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:05.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply

这样,才真正实现了Reactor模型。

3. CompletableFuture线程池管理还有日志追踪

CompletableFuture可以指定线程池,亦可以不指定。如果像上面不指定的话,那么使用的线程池就是Java8之后会默认启动一个大小为CPU核数减一的CommonForkJoinPool去执行。需要指定的话,基本上每个方法都可以额外传入一个线程池作为参数。

最佳实践是,只要涉及到IO的,就交给不同的线程池去做,不同种类的IO的线程池不同。例如,用于 数据库 IO的线程池,用于RPC的线程池,用于缓存访问的线程池等等。

这里还有一个问题存在,就是异步调用,导致spanId和traceId丢失了,例如上面的例子:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start

8d6eddc9cc80612f 这个丢失了,导致微服务调用链日志追踪变得不可行,所以,这里我们对于异步的代码,也需要在异步调用前强制设置下spanId和traceId。

综上之后,修改的代码是:

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;
    @RequestMapping("/test")
    public Mono test() {
        log.info("test started");
        return Mono.fromFuture(testService.simulateIOTest());
    @Service
    public static class TestService {
        @Autowired
        private Tracer tracer;
        ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build();
        private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy());
        public CompletableFuture simulateIOTest() {
            Span span = tracer.currentSpan();
            return CompletableFuture.supplyAsync(() -> {
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                    return "hello";
                } catch (Exception e) {
                    throw new RuntimeException(e);
 
推荐文章
豪爽的小狗  ·  HECATE GS02 遊戲USB 音效卡使用者指南
5 月前
近视的眼镜  ·  游戏王OCG直播双子构筑浅谈——上篇【构筑篇】 - 哔哩哔哩
6 月前
好帅的咖啡  ·  第19届三星车险杯世界围棋公开赛
2 年前
很拉风的红酒  ·  便民||@所有人,好消息!羊城通APP线上商城上线,残疾人乘车卡 ...
2 年前
跑龙套的弓箭  ·  大学生安全教育心得体会- 知乎
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
小百科 - 百科知识指南
© 2024 ~ 沪ICP备11025650号