Servlet3.0新特性:异步处理,太好用了!!!

互联网全栈架构

共 46625字,需浏览 94分钟

 ·

2021-05-24 21:39

最近有粉丝问我,讲 springboot 为什么需要从 servlet 说起,在这里给大家解释一下:servlet 属于非常基础的知识,可能现在开发中很少直接用 servlet 了,但是 springmvc 就是在 servlet 的基础上整起来的,所以基础的东西必须要吃透,基础扎实了,其他的就很容易了,还有 spring 系列还未学完的同学,最近赶紧回头去补补,spring 系列吃透之后,springboot 就是小菜一碟了,springboot 中的一切技术都源于 spring。

spring系列地址:https://mp.weixin.qq.com/s/E7wNLtU-453b9YC3XoUvqQ

好了,咱们继续今天的内容。

springmvc 中的 controller 支持异步处理的功能,不知大家是否有接触过,其内部原理是依靠 servlet 中的异步实现的,所以咱们需要先了解 servlet 中的异步处理。

1、早期 servlet 请求处理流程

servlet3.0 之前,一个请求过来之后,处理过程如下图:

从上图可以看出:请求过来后,从主线程池获取一个线程,处理业务,响应请求,然后将线程还回线程池,整个过程都是由同一个主线程在执行。

这里存在一个问题,通常 web 容器中的主线程数量是有限的,若执行业务的比较耗时,大量请求过来之后,主线程被耗光,新来的请求就会处于等待状态。

而 servlet3.0 中对这个过程做了改进,主线程可以将请求转交给其他线程去处理,比如开发者可以自定义一个线程,然后在自定义的线程中处理请求。

2、servlet3.0 异步处理流程

如下图:

在主线程中开启异步处理,主线程将请求交给其他线程去处理,主线程就结束了,被放回了主线程池,由其他线程继续处理请求。

可能有些朋友会说,直接提升主线程的数量不就可以了么?

老铁们,确实可以,但是咱们的目标是使用最少的线程做更多的事情。

异步处理的流程适合业务处理比较耗时而导致主线程长时间等待的场景,稍后我会给大家上一些案例。

下面咱们来看看 servlet 中异步处理如何使用?

3、servlet3.0 中异步处理使用步骤

step1:开启异步支持

设置@WebServlet 的 asyncSupported 属性为 true,表示支持异步处理

@WebServlet(asyncSupported = true)

step2:启动异步请求

启动异步处理:调用 req.startAsync(request,response)方法,获取异步处理上下文对象 AsyncContext

AsyncContext asyncContext = request.startAsync(request, response);

step3:异步处理业务&完成异步处理

其他线程中执行业务操作,输出结果,并调用 asyncContext.complete()完成异步处理,比如下面 2 种方式:

方式 1:启动一个新的线程来处理请求,代码如下:

new Thread(()->{
 System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
    try {
        //这里休眠2秒,模拟业务耗时
        TimeUnit.SECONDS.sleep(2);
        //这里是子线程,请求在这里被处理了
        asyncContext.getResponse().getWriter().write("ok");
        //调用complete()方法,表示请求请求处理完成
        asyncContext.complete();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end");
})

方式 2:如下代码,调用 asyncContext.start 方法来处理请求,传递的是一个 Runnable 对象,asyncContext.start 会将传递的 Runnable 放在新的线程中去执行

asyncContext.start(() -> {
    System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
    try {
        //这里休眠2秒,模拟业务耗时
        TimeUnit.SECONDS.sleep(2);
        //这里是子线程,请求在这里被处理了
        asyncContext.getResponse().getWriter().write("ok");
        //5、调用complete()方法,表示请求请求处理完成
        asyncContext.complete();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end");
});

下面来看几个案例,案例是精华,通过案例可以全面掌握异步处理。

4、案例 1:使用 asyncContext.start 处理异步请求

下面案例代码会输出 4 条日志,注意日志中包含的信息:时间、线程信息、耗时,通过这些信息可以分析主线程什么时候结束的。

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet1",
        urlPatterns = "/asyncServlet1",
        asyncSupported = true
)
public class AsyncServlet1 extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long st = System.currentTimeMillis();
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
        //2、启动异步处理:调用req.startAsync(request,response)方法,获取异步处理上下文对象AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        //3、调用start方法异步处理,调用这个方法之后主线程就结束了
        asyncContext.start(() -> {
            long stSon = System.currentTimeMillis();
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            try {
                //这里休眠2秒,模拟业务耗时
                TimeUnit.SECONDS.sleep(2);
                //这里是子线程,请求在这里被处理了
                asyncContext.getResponse().getWriter().write("ok");
                //4、调用complete()方法,表示请求请求处理完成
                asyncContext.complete();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
        });
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - st));

    }
}

发布到 tomcat 中,浏览器中访问下面地址:

http://localhost:8080/asyncServlet1

tomcat 控制台输出

主线程:Thread[http-nio-8080-exec-5,5,main]-1617373260994-start
主线程:Thread[http-nio-8080-exec-5,5,main]-1617373260994-end,耗时(ms):0
子线程:Thread[http-nio-8080-exec-6,5,main]-1617373260994-start
子线程:Thread[http-nio-8080-exec-6,5,main]-1617373262995-end,耗时(ms):2001

主线程耗时 0 毫秒,并不是耗时是 0,而是小于 1 毫秒,太快了,子线程中 sleep 了 2 秒,所以耗时是 2000 毫秒。

大家注意看下浏览器中的请求,在asyncContext.complete();被调用之前,浏览器中的请求一直处于阻塞状态,当这个方法执行完毕之后,浏览器端才会受到响应。如果没有asyncContext.complete();这行代码,请求等上一段时间会超时,异步请求是默认是有超时时间的,tomcat 默认是 30 秒,大家可以试试,在浏览器中通过 F12 可以看到 30 秒后会响应超时。

5、案例 2:自定义线程处理异步请求

案例 1 中,我们使用asyncContext.start来处理异步请求,start 方法内部会使用 web 容器中默认的线程池来处理请求,我们也可以自定义线程来处理异步请求,将案例 1 中asyncContext.start代码替换为下面代码,大家也可以自定义一个线程池,将请求丢到线程池中去处理。

//3、自定义一个线程来处理异步请求
Thread thread = new Thread(() -> {
    long stSon = System.currentTimeMillis();
    System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
    try {
        //这里休眠2秒,模拟业务耗时
        TimeUnit.SECONDS.sleep(2);
        //这里是子线程,请求在这里被处理了
        asyncContext.getResponse().getWriter().write(System.currentTimeMillis() + ",ok");
        //4、调用complete()方法,表示异步请求处理完成
        asyncContext.complete();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
});
thread.setName("自定义线程");
thread.start();

6、案例 3:通过 asyncContext.dispatch 结束异步请求

上面 2 个案例都是通过asyncContext.complete()来结束异步请求的,结束请求还有另外一种方式,子线程中处理完毕业务之后,将结果放在 request 中,然后调用asyncContext.dispatch()转发请求,此时请求又会进入当前 servlet,此时需在代码中判断请求是不是异步转发过来的,如果是的,则从 request 中获取结果,然后输出,这种方式就是 springmvc 处理异步的方式,所以这种看懂了,springmvc 就一目了然了,代码如下

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.ServletException;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet3",
        urlPatterns = "/asyncServlet3",
        asyncSupported = true
)
public class AsyncServlet3 extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        System.out.println("请求类型:" + request.getDispatcherType());
        //@1:判断请求类型,如果是异步类型(DispatcherType.ASYNC),则说明是异步转发过来的,将结果输出
        if (request.getDispatcherType() == DispatcherType.ASYNC) {
            System.out.println("响应结果:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            //从request中获取结果,然后输出
            Object result = request.getAttribute("result");
            response.getWriter().write(result.toString());
            System.out.println("响应结果:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end");
        } else {
            long st = System.currentTimeMillis();
            System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            //2、启动异步处理:调用req.startAsync(request,response)方法,获取异步处理上下文对象AsyncContext
            AsyncContext asyncContext = request.startAsync(request, response);
            //3、调用start方法异步处理,调用这个方法之后主线程就结束了
            asyncContext.start(() -> {
                long stSon = System.currentTimeMillis();
                System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
                try {
                    //这里休眠2秒,模拟业务耗时
                    TimeUnit.SECONDS.sleep(2);
                    //将结果丢到request中
                    asyncContext.getRequest().setAttribute("result""ok");
                    //转发请求,调用这个方法之后,请求又会被转发到当前的servlet,又会进入当前servlet的service方法
                    //此时请求的类型(request.getDispatcherType())是DispatcherType.ASYNC,所以通过这个值可以判断请求是异步转发过来的
                    //然后在request中将结果取出,对应代码@1,然后输出
                    asyncContext.dispatch();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
            });
            System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - st));
        }
    }
}

浏览器中访问

http://localhost:8080/asyncServlet3

tomcat 控制台输出

请求类型:REQUEST
主线程:Thread[http-nio-8080-exec-1,5,main]-1617375432076-start
主线程:Thread[http-nio-8080-exec-1,5,main]-1617375432084-end,耗时(ms):8
子线程:Thread[http-nio-8080-exec-2,5,main]-1617375432084-start
子线程:Thread[http-nio-8080-exec-2,5,main]-1617375434092-end,耗时(ms):2008
请求类型:ASYNC
响应结果:Thread[http-nio-8080-exec-3,5,main]-1617375434100-start
响应结果:Thread[http-nio-8080-exec-3,5,main]-1617375434102-end

7、案例 4:设置异步处理超时时间

异步请求总不能让他一直执行吧,所以咱们可以设置超时时间。

asyncContext.setTimeout(超时时间,毫秒,默认是30秒);

我们案例 1 的代码进行改造,添加一行代码,如下,设置超时时间为 1 秒

然后浏览器中访问一下请求,可以看到超时了,如下

对应的案例源码

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet4",
        urlPatterns = "/asyncServlet4",
        asyncSupported = true
)
public class AsyncServlet4 extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long st = System.currentTimeMillis();
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
        //2、启动异步处理:调用req.startAsync(request,response)方法,获取异步处理上下文对象AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        //设置异步处理超时时间为1秒
        asyncContext.setTimeout(1000);
        //3、调用start方法异步处理,调用这个方法之后主线程就结束了
        asyncContext.start(() -> {
            long stSon = System.currentTimeMillis();
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            try {
                //这里休眠2秒,模拟业务耗时
                TimeUnit.SECONDS.sleep(2);
                //这里是子线程,请求在这里被处理了
                asyncContext.getResponse().getWriter().write(System.currentTimeMillis() + ",ok");
                //4、调用complete()方法,表示异步请求处理完成
                asyncContext.complete();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
        });
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - st));

    }
}

8、案例 5:设置监听器

还可以为异步处理添加监听器,当异步处理完成、发生异常错误、出现超时的时候,会回调监听器中对应的方法,如下:

//添加监听器
asyncContext.addListener(new AsyncListener() {
    @Override
    public void onComplete(AsyncEvent event) throws IOException {
        //异步处理完成会被回调
        event.getAsyncContext().getResponse().getWriter().write("
onComplete"
);
    }

    @Override
    public void onTimeout(AsyncEvent event) throws IOException {
        //超时会被回调
        event.getAsyncContext().getResponse().getWriter().write("
onTimeout"
);
    }

    @Override
    public void onError(AsyncEvent event) throws IOException {
        //发生错误会被回调
        event.getAsyncContext().getResponse().getWriter().write("
onError"
);
    }

    @Override
    public void onStartAsync(AsyncEvent event) throws IOException {
        //开启异步请求调用的方法
        event.getAsyncContext().getResponse().getWriter().write("
onStartAsync"
);
    }
});

案例代码如下,代码@1通过请求参数中的 timeout 来控制超时时间,@2中让异步处理休眠了 2 秒,稍后我们会模拟超时和不超时两种情况,大家注意关注 tomcat 控制台日志及浏览器中日志,可以看到监听器中哪些方法会被调用。

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet5",
        urlPatterns = "/asyncServlet5",
        asyncSupported = true
)
public class AsyncServlet5 extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long st = System.currentTimeMillis();
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
        //2、启动异步处理:调用req.startAsync(request,response)方法,获取异步处理上下文对象AsyncContext
        AsyncContext asyncContext = request.startAsync(request, response);
        response.setContentType("text/html;charset=UTF-8");
        //@1:设置异步处理超时时间
        Long timeout = Long.valueOf(request.getParameter("timeout"));
        asyncContext.setTimeout(timeout);
        //添加监听器
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                //异步处理完成会被回调
                System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onComplete()");
                event.getAsyncContext().getResponse().getWriter().write("
onComplete"
);
            }

            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                //超时会被回调
                System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onTimeout()");
                event.getAsyncContext().getResponse().getWriter().write("
onTimeout"
);
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                //发生错误会被回调
                System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onError()");
                event.getAsyncContext().getResponse().getWriter().write("
onError"
);
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                //开启异步请求调用的方法
                System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onStartAsync()");
                event.getAsyncContext().getResponse().getWriter().write("
onStartAsync"
);
            }
        });
        //3、调用start方法异步处理,调用这个方法之后主线程就结束了
        asyncContext.start(() -> {
            long stSon = System.currentTimeMillis();
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            try {
                //@2:这里休眠2秒,模拟业务耗时
                TimeUnit.SECONDS.sleep(2);
                //这里是子线程,请求在这里被处理了
                asyncContext.getResponse().getWriter().write(System.currentTimeMillis() + ",ok");
                //4、调用complete()方法,表示异步请求处理完成
                asyncContext.complete();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
        });
        System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - st));

    }
}

模拟非超时请求,访问下面地址

http://localhost:8080/asyncServlet5?timeout=5000

输出:

tomcat 控制台输出,可以看出 onComplete 被调用了。

下面模拟超时请求,访问下面地址

http://localhost:8080/asyncServlet5?timeout=100

tomcat 控制台输出

主线程:Thread[http-nio-8080-exec-8,5,main]-1617378344250-start
主线程:Thread[http-nio-8080-exec-8,5,main]-1617378344251-end,耗时(ms):1
子线程:Thread[http-nio-8080-exec-9,5,main]-1617378344251-start
Thread[http-nio-8080-exec-10,5,main]-1617378344634-onTimeout()
Thread[http-nio-8080-exec-10,5,main]-1617378344634-onComplete()
java.lang.IllegalStateException: AsyncContext关联的请求已经完成处理。
 at org.apache.catalina.core.AsyncContextImpl.check(AsyncContextImpl.java:522)
 at org.apache.catalina.core.AsyncContextImpl.getResponse(AsyncContextImpl.java:228)
 at com.javacode2018.springboot.lesson002.demo1.AsyncServlet5.lambda$service$0(AsyncServlet5.java:70)
 at org.apache.catalina.core.AsyncContextImpl$RunnableWrapper.run(AsyncContextImpl.java:548)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
 at java.lang.Thread.run(Thread.java:745)
子线程:Thread[http-nio-8080-exec-9,5,main]-1617378346260-end,耗时(ms):2009

代码中出现了异常,为什么?

是因为发生超时的时候,onTimeOut 方法执行完毕之后,异步处理就结束了,此时,子线程还在运行,子线程执行到下面这样代码,向客户端输出信息,所以报错了。

asyncContext.getResponse().getWriter().write(System.currentTimeMillis() + ",ok");

9、案例 6:对案例 5 进行改造

对案例 5 进行改造,如下代码,看一下@3处的代码,通过一个原子变量来控制请求是否处理完毕了,代码中有 3 处可能会修改这个变量,通过 cas 操作来控制谁会修改成功,修改成功者,将结果设置到 request.setAttribute 中,然后调用asyncContext.dispatch();转发请求,这种处理方式很好的解决案例 5 中异常问题,springmvc 中异步处理过程这个过程类似,所以这段代码大家一定要好好看看,若能够理解,springmvc 中异步处理的代码可以秒懂。

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.*;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet6",
        urlPatterns = "/asyncServlet6",
        asyncSupported = true
)
public class AsyncServlet6 extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        if (request.getDispatcherType() == DispatcherType.ASYNC) {
            response.setContentType("text/html;charset=UTF-8");
            response.getWriter().write(request.getAttribute("result").toString());
        } else {
            long st = System.currentTimeMillis();
            System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
            //2、启动异步处理:调用req.startAsync(request,response)方法,获取异步处理上下文对象AsyncContext
            AsyncContext asyncContext = request.startAsync(request, response);

            //@1:设置异步处理超时时间
            Long timeout = Long.valueOf(request.getParameter("timeout"));
            asyncContext.setTimeout(timeout);
            //@3:用来异步处理是否结束,在这3个地方(子线程中处理完毕时、onComplete、onTimeout)将其更新为true
            AtomicBoolean finish = new AtomicBoolean(false);
            //添加监听器
            asyncContext.addListener(new AsyncListener() {
                @Override
                public void onComplete(AsyncEvent event) throws IOException {
                    //异步处理完成会被回调
                    System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onComplete()");
                    if (finish.compareAndSet(falsetrue)) {
                        event.getAsyncContext().getRequest().setAttribute("result""onComplete");
                        //转发请求
                        asyncContext.dispatch();
                    }
                }

                @Override
                public void onTimeout(AsyncEvent event) throws IOException {
                    //超时会被回调
                    System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onTimeout()");
                    if (finish.compareAndSet(falsetrue)) {
                        event.getAsyncContext().getRequest().setAttribute("result""onTimeout");
                        //转发请求
                        asyncContext.dispatch();
                    }
                }

                @Override
                public void onError(AsyncEvent event) throws IOException {
                    //发生错误会被回调
                    System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onError()");
                    event.getAsyncContext().getResponse().getWriter().write("
onError"
);
                }

                @Override
                public void onStartAsync(AsyncEvent event) throws IOException {
                    //开启异步请求调用的方法
                    System.out.println(Thread.currentThread() + "-" + System.currentTimeMillis() + "-onStartAsync()");
                    event.getAsyncContext().getResponse().getWriter().write("
onStartAsync"
);
                }
            });
            //3、调用start方法异步处理,调用这个方法之后主线程就结束了
            asyncContext.start(() -> {
                long stSon = System.currentTimeMillis();
                System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-start");
                try {
                    //@2:这里休眠2秒,模拟业务耗时
                    TimeUnit.SECONDS.sleep(2);
                    if (finish.compareAndSet(falsetrue)) {
                        asyncContext.getRequest().setAttribute("result""ok");
                        //转发请求
                        asyncContext.dispatch();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("子线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - stSon));
            });
            System.out.println("主线程:" + Thread.currentThread() + "-" + System.currentTimeMillis() + "-end,耗时(ms):" + (System.currentTimeMillis() - st));
        }
    }
}

模拟超时请求

http://localhost:8080/asyncServlet6?timeout=100

tomcat 控制台输出

模拟非超时请求

http://localhost:8080/asyncServlet6?timeout=5000

tomcat 控制台输出

主线程:Thread[http-nio-8080-exec-6,5,main]-1617379567665-start
主线程:Thread[http-nio-8080-exec-6,5,main]-1617379567666-end,耗时(ms):1
子线程:Thread[http-nio-8080-exec-6,5,main]-1617379567667-start
子线程:Thread[http-nio-8080-exec-6,5,main]-1617379569667-end,耗时(ms):2000
Thread[http-nio-8080-exec-10,5,main]-1617379569668-onComplete()

10、案例 7:模拟一个业务场景

业务场景

ServiceA 接受到一个请求之后,将请求发送到 mq,然后主线程就结束了,另外一个服务 ServiceB 从 mq 中取出这条消息,然后对消息进行处理,将处理结果又丢到 mq 中,ServiceA 中监听器监听 mq 中的结果,然后将结果再输出。

案例代码

package com.javacode2018.springboot.lesson002.demo1;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.annotation.WebServlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

//1.设置@WebServlet的asyncSupported属性为true,表示支持异步处理
@WebServlet(name = "AsyncServlet7",
        urlPatterns = "/asyncServlet7",
        asyncSupported = true
)
public class AsyncServlet7 extends HttpServlet {
    Map orderIdAsyncContextMap = new ConcurrentHashMap<>();

    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String orderId = request.getParameter("orderId");
        String result = request.getParameter("result");
        AsyncContext async;
        if (orderId != null && result != null && (async = orderIdAsyncContextMap.get(orderId)) != null) {
            async.getResponse().getWriter().write(String.format("
"
 +
                    "%s:%s:result:%s", Thread.currentThread(), System.currentTimeMillis(), result));
            async.complete();
        } else {
            AsyncContext asyncContext = request.startAsync(request, response);
            orderIdAsyncContextMap.put("1", asyncContext);
            asyncContext.getResponse().setContentType("text/html;charset=utf-8");
            asyncContext.getResponse().getWriter().write(String.format("%s:%s:%s", Thread.currentThread(), System.currentTimeMillis(), "start"));
        }
    }
}

测试过程

step1、启动项目
step2、浏览器中访问:http://localhost:8080/asyncServlet7,会发现浏览器中请求一直处于等待中
step3、等待5秒,用来模拟ServiceB处理耗时
step4、浏览器中访问:http://localhost:8080/asyncServlet7?orderId=1&result=success;用来模拟将结果通知给请求者,这步执行完毕之后,step2会立即收到响应

这里稍微扩展下

可能有些朋友已经想到了,通常我们的项目是集群部署的,假如这个业务场景中 ServiceA 是集群部署的,有 3 台机器【ServiceA1、ServiceA2、ServiceA3】,如果 ServiceB 将处理完成的结果消息丢到 mq 后,如果消息类型是点对点的,那么消息只能被一台机器消费,需要确保 ServiceA 中接受用户请求的机器和最终接受 mq 中消息结果的机器是一台机器才可以,如果接受请求的机器是 ServceA1,而消费结果消息的机器是 ServiceA2,那么 ServiceA1 就一直拿不到结果,直到超时,如何解决?

此时需要广播消息,ServiceB 将处理结果广播出去,ServiceA 所有机器都会监听到这条广播消息。

可以使用 redis 的发布订阅功能解决这个问题,有兴趣的朋友可以研究一下 redis 发布定义的功能。

11、总结

  • 开启异步处理:request.startAsync(request,response) 获取异步处理上下文对象 AsyncContext
  • 设置异步处理超时时间:asyncContext.setTimeout(毫秒)
  • 设置异步处理监听器:asyncContext.addListener,可以添加多个监听器
  • 完成异步处理的 2 种方式:asyncContext.dispatch() 或 asyncContext.complete()

12、源码

https://gitee.com/javacode2018/springboot-series

推荐阅读:
你管这破玩意儿叫 Token?
一举拿下高可用与分布式协调系统设计!

一文读懂微内核架构


互联网全栈架构


浏览 118
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报