异步化,你的高并发大杀器
今天来聊聊如何让项目异步化的一些事。
1.同步和异步,阻塞和非阻塞
同步和异步,阻塞和非阻塞, 这个几个词已经是老生常谈,但是常常还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其实他们不是一回事。
同步和异步关注的是结果消息的通信机制
- 同步:同步的意思就是调用方需要主动等待结果的返回
- 异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等。
阻塞和非阻塞主要关注的是等待结果返回调用方的状态
- 阻塞:是指结果返回之前,当前线程被挂起,不做任何事
- 非阻塞:是指结果在返回之前,线程可以做一些其他事,不会被挂起。
可以看见同步和异步,阻塞和非阻塞主要关注的点不同,有人会问同步还能非阻塞,异步还能阻塞?当然是可以的,下面为了更好的说明他们的组合之间的意思,用几个简单的例子说明: 1.同步阻塞:同步阻塞基本也是编程中最常见的模型,打个比方你去商店买衣服,你去了之后发现衣服卖完了,那你就在店里面一直等,期间不做任何事(包括看手机),等着商家进货,直到有货为止,这个效率很低。
2.同步非阻塞:同步非阻塞在编程中可以抽象为一个轮询模式,你去了商店之后,发现衣服卖完了,这个时候不需要傻傻的等着,你可以去其他地方比如奶茶店,买杯水,但是你还是需要时不时的去商店问老板新衣服到了吗。
3.异步阻塞:异步阻塞这个编程里面用的较少,有点类似你写了个线程池,submit然后马上future.get(),这样线程其实还是挂起的。有点像你去商店买衣服,这个时候发现衣服没有了,这个时候你就给老板留给电话,说衣服到了就给我打电话,然后你就守着这个电话,一直等着他响什么事也不做。这样感觉的确有点傻,所以这个模式用得比较少。
4.异步非阻塞:异步非阻塞这也是现在高并发编程的一个核心,也是今天主要讲的一个核心。好比你去商店买衣服,衣服没了,你只需要给老板说这是我的电话,衣服到了就打。然后你就随心所欲的去玩,也不用操心衣服什么时候到,衣服一到,电话一响就可以去买衣服了。
2.同步阻塞 PK 异步非阻塞
上面已经看到了同步阻塞的效率是多么的低,如果使用同步阻塞的方式去买衣服,你有可能一天只能买一件衣服,其他什么事都不能干,如果用异步非阻塞的方式去买,买衣服只是你一天中进行的一个小事。
我们把这个映射到我们代码中,当我们的线程发生一次rpc调用或者http调用,又或者其他的一些耗时的IO调用,发起之后,如果是同步阻塞,我们的这个线程就会被阻塞挂起,直到结果返回,试想一下如果IO调用很频繁那我们的CPU使用率其实是很低很低。正所谓是物尽其用,既然CPU的使用率被IO调用搞得很低,那我们就可以使用异步非阻塞,当发生IO调用时我并不马上关心结果,我只需要把回调函数写入这次IO调用,我这个时候线程可以继续处理新的请求,当IO调用结束结束时,会调用回调函数。而我们的线程始终处于忙碌之中,这样就能做更多的有意义的事了。
这里首先要说明的是,异步化不是万能,异步化并不能缩短你整个链路调用时间长的问题,但是他能极大的提升你的最大qps。一般我们的业务中有两处比较耗时:
- cpu: cpu耗时指的是我们的一般的业务处理逻辑,比如一些数据的运算,对象的序列化。这些异步化是不能解决的,得需要靠一些算法的优化,或者一些高性能框架。
- iowait: io耗时就像我们上面说的,一般发生在网络调用,文件传输中等等,这个时候线程一般会挂起阻塞。而我们的异步化通常用于解决这部分的问题。
3.哪些可以异步化?
上面说了异步化是用于解决IO阻塞的问题,而我们一般项目中可以使用异步化如下:
- servlet异步化,springmvc异步化
- rpc调用如(dubbo,thrift),http调用异步化
- 数据库调用,缓存调用异步化
下面我会从上面几个方面进行异步化的介绍.
4.servlet异步化
对于Java开发程序员来说servlet并不陌生吧,在项目中不论你使用struts2,还是使用的springmvc,本质上都是封装的servlet。但是我们的一般的开发,其实都是使用的同步阻塞模式如下:
上面的模式优点在于编码简单,适合在项目启动初期,访问量较少,或者是CPU运算较多的项目
缺点在于,业务逻辑线程和servlet容器线程是同一个,一般的业务逻辑总得发生点IO,比如查询数据库,比如产生RPC调用,这个时候就会发生阻塞,而我们的servlet容器线程肯定是有限的,当servlet容器线程都被阻塞的时候我们的服务这个时候就会发生拒绝访问,线程不然我当然们可以通过增加机器的一系列手段来解决这个问题,但是俗话说得好靠人不如靠自己,靠别人替我分担请求,还不如我自己搞定。所以在servlet3.0之后支持了异步化,我们采用异步化之后就会变成如下:
在这里我们采用新的线程处理业务逻辑,IO调用的阻塞就不会影响我们的serlvet了,实现异步serlvet的代码也比较简单,如下:
@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true) public class WorkServlet extends HttpServlet{ private static final long serialVersionUID = 1L; @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //设置ContentType,关闭缓存 resp.setContentType("text/plain;charset=UTF-8"); resp.setHeader("Cache-Control","private"); resp.setHeader("Pragma","no-cache"); final PrintWriter writer= resp.getWriter(); writer.println("老师检查作业了"); writer.flush(); List<String> zuoyes=new ArrayList<String>(); for (int i = 0; i < 10; i++) { zuoyes.add("zuoye"+i);; } //开启异步请求 final AsyncContext ac=req.startAsync(); doZuoye(ac, zuoyes); writer.println("老师布置作业"); writer.flush(); } private void doZuoye(final AsyncContext ac, final List<String> zuoyes) { ac.setTimeout(1*60*60*1000L); ac.start(new Runnable() { @Override public void run() { //通过response获得字符输出流 try { PrintWriter writer=ac.getResponse().getWriter(); for (String zuoye:zuoyes) { writer.println("\""+zuoye+"\"请求处理中"); Thread.sleep(1*1000L); writer.flush(); } ac.complete(); } catch (Exception e) { e.printStackTrace(); } } }); } }
实现serlvet的关键在于http采取了长连接,也就是当请求打过来的时候就算有返回也不会关闭,因为可能还会有数据,直到返回关闭指令。 AsyncContext ac=req.startAsync(); 用于获取异步上下文,后续我们通过这个异步上下文进行回调返回数据,有点像我们买衣服的时候,给老板一个电话,而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就可以打电话发送数据了。 ac.complete(); 用来进行长链接的关闭。
4.1springmvc异步化
现在其实很少人来进行serlvet编程,都是直接采用现成的一些框架,比如struts2,springmvc。下面介绍下使用springmvc如何进行异步化:
- 首先确认你的项目中的Servlet是3.0以上的!!,其次springMVC4.0+
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.2.3.RELEASE</version> </dependency>
- web.xml头部声明,必须要3.0,filter和serverlet设置为异步
<?xml version="1.0" encoding="UTF-8"?> <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"> <filter> <filter-name>testFilter</filter-name> <filter-class>com.TestFilter</filter-class> <async-supported>true</async-supported> </filter> <servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> ......... <async-supported>true</async-supported> </servlet>
- 使用springmvc封装了servlet的AsyncContext,使用起来比较简单。以前我们同步的模式的Controller是返回额ModelAndView,而异步模式直接生成一个defrredResult(支持我们超时扩展)即可保存上下文,下面给出如何和我们HttpClient搭配的简单demo
@RequestMapping(value="/asynctask", method = RequestMethod.GET) public DeferredResult<String> asyncTask() throws IOReactorException { IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); conManager.setMaxTotal(100); conManager.setDefaultMaxPerRoute(100); CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); //设置超时时间200ms final DeferredResult<String> deferredResult = new DeferredResult<String>(200L); deferredResult.onTimeout(new Runnable() { @Override public void run() { System.out.println("异步调用执行超时!thread id is : " + Thread.currentThread().getId()); deferredResult.setResult("超时了"); } }); System.out.println("/asynctask 调用!thread id is : " + Thread.currentThread().getId()); final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); return deferredResult; }
注意: 在serlvet异步化中有个问题是filter的后置结果处理,没法使用,对于我们一些打点,结果统计直接使用serlvet异步是没法用的。在springmvc中就很好的解决了这个问题,springmvc采用了一个比较取巧的方式通过请求转发,能让请求再次过滤器。但是又引入了新的一个问题那就是过滤器会处理两次,这里可以通过SpringMVC源码中自身判断的方法,我们可以在filter中使用下面这句话来进行判断是不是属于springmvc转发过来的请求,从而不处理filter的前置事件,只处理后置事件:
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); return asyncManagerAttr instanceof WebAsyncManager ;
5.全链路异步化
上面我们介绍了serlvet的异步化,相信细心的同学都看出来似乎并没有解决根本的问题,我的IO阻塞依然存在,只是换了个位置而已,当IO调用频繁同样会让业务线程池快速变满,虽然serlvet容器线程不被阻塞,但是这个业务依然会变得不可用。
那么怎么才能解决上面的问题呢?答案就是全链路异步化,全链路异步追求的是没有阻塞,打满你的CPU,把机器的性能压榨到极致模型图如下:
具体的NIO client到底做了什么事呢,具体如下面模型:
上面就是我们全链路异步的图了(部分线程池可以优化)。全链路的核心在于只要我们遇到IO调用的时候,我们就可以使用NIO,从而避免阻塞,也就解决了之前说的业务线程池被打满得到尴尬场景。
5.1远程调用异步化
我们一般远程调用使用rpc或者http。对于rpc来说一般thrift,http,motan等支持都异步调用,其内部原理也都是采用事件驱动的NIO模型,对于http来说一般的apachehttpclient和okhttp也都提供了异步调用。 下面简单介绍下Http异步化调用是怎么做的: 首先来看一个例子:
public class HTTPAsyncClientDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException { //具体参数含义下文会讲 //apache提供了ioReactor的参数配置,这里我们配置IO 线程为1 IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); //根据这个配置创建一个ioReactor ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); //asyncHttpClient使用PoolingNHttpClientConnectionManager管理我们客户端连接 PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); //设置总共的连接的最大数量 conManager.setMaxTotal(100); //设置每个路由的连接的最大数量 conManager.setDefaultMaxPerRoute(100); //创建一个Client CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); // Execute request final HttpGet request1 = new HttpGet("http://www.apache.org/"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1 = future.get(); System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine()); // One most likely would want to use a callback for operation result final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { //Complete成功后会回调这个方法 public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); } }
下面给出httpAsync的整个类图:
对于我们的HTTPAysncClient 其实最后使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有个ConnectionManager,这个就是我们管理连接的管理器,而在httpAsync中只有一个实现那就是PoolingNHttpClientConnectionManager,这个连接管理器中有两个我们比较关心的一个是Reactor,一个是Cpool。
-
Reactor :所有的Reactor这里都是实现了IOReactor接口。在PoolingNHttpClientConnectionManager中会有拥有一个Reactor,那就是DefaultConnectingIOReactor,这个DefaultConnectingIOReactor,负责处理Acceptor。在DefaultConnectingIOReactor有个excutor方法,生成IOReactor也就是我们图中的BaseIOReactor,进行IO的操作。这个模型就是我们上面的1.2.2的模型
-
CPool :在PoolingNHttpClientConnectionManager中有个CPool,主要是负责控制我们连接,我们上面所说的maxTotal和defaultMaxPerRoute,都是由其进行控制,如果每个路由的满了他会断开最老的一个链接,如果总共的total满了他会放入leased队列,释放空间的时候就会将其重新连接。
5.2数据库调用异步化
对于数据库调用一般的框架并没有提供异步化的方法,这里推荐自己封装或者使用网上开源的,这里我们公司有个开源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持异步化
6.最后
异步化并不是高并发的银弹,但是有了异步化的确能提高你机器的qps,吞吐量等等。上述讲的一些模型如果能合理的做一些优化,然后进行应用,相信能对你的服务有很大的帮助的。
想要获取更多信息请关注我的公众号吧
最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一起共建的Java学习路线,如果您想参与开源项目的维护,可以一起共建,github地址为:https://github.com/javagrowing/JGrowing 麻烦给个小星星哟。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Object类是如何成为所有类的“父亲”?
前言 也许大家在刚开始学Java的时候就听说过,Object类是所有的类的父类。但是有没有思考过,为什么我们自己创建的类,没有继承Object类,但是却能调用Object类的方法呢? 正文 下面直接看一下截图,分别是自定义类ObjectTest跟Object类。 既然我们没有显式继承Object类,也能调用Object类里的方法,那是什么机制可以达到“自动继承”呢? 1.由于我们的Java是运行在JVM之上的语言,所以有一种可能是,我们的编译器在编译代码的时候,如果该类没有继承任何类,它会自动的帮我们在编译的时候加上默认的父类“Object“,若该类有父类了就不作处理,这样一来我们就可以使用Object里面public跟protected的方法了。 2.还有一种可能是,在编译的时候保持原样,只是在运行的时候去判断有没有显式继承其他类,若没有则默认把Object当成自己父类处理。我们可以直接反编译刚才的class文件来一探究竟。 通过“javap”命令可以看到反编译出来的文件,编译器并没有在编译的时候把“extends Object“添加上去,那么可见是第2种方式实现的。 此时如...
- 下一篇
J2Cache 两级缓存中的 Region 到底是什么东西?
不时有人来询问 J2Cache 里的 Region 到底是什么概念,这里做统一的解答。 J2Cache 的 Region 来源于 Ehcache 的 Region 概念。 一般我们在使用像 Redis、Caffeine、Guava Cache 时都没有 Region 这样的概念,特别是 Redis 是一个大哈希表,更没有这个概念。 在实际的缓存场景中,不同的数据会有不同的 TTL 策略,例如有些缓存数据可以永不失效,而有些缓存我们希望是 30 分钟的有效期,有些是 60 分钟等不同的失效时间策略。在 Redis 我们可以针对不同的 key 设置不同的 TTL 时间。但是一般的 Java 内存缓存框架(如 Ehcache、Caffeine、Guava Cache 等),它没法为每一个 key 设置不同 TTL,因为这样管理起来会非常复杂,而且会检查缓存数据是否失效时性能极差。所以一般内存缓存框架会把一组相同 TTL 策略的缓存数据放在一起进行管理。 J2Cache 的 Region 概念对应关系如下所示: Ehcache region Caffeine Cache Guava Cache...
相关文章
文章评论
共有0条评论来说两句吧...