作者:vivo 互联网服务器团队-Wang Zhi
责任链模式作为常用的设计模式而被大家熟知和使用。本文介绍责任链的常见实现方式,并结合开源框架如Dubbo、Sentinel等进行延伸探讨。
在GoF 的《设计模式》一书中对责任链模定义的:将请求的发送和接收解耦,让多个接收对象都有机会处理这个请求。将这些接收对象串成一条链,并沿着这条链传递这个请求,直到链上的某个接收对象能够处理它为止或者所有接收对象处理一遍。
用通俗的话解释在责任链模式中,多个处理器(接收对象)依次处理同一个请求。一个请求先经过 A 处理器处理,然后再把请求传递给 B 处理器,B 处理器处理完后再传递给 C 处理器,以此类推,形成一个链条。链条上的每个处理器各自承担各自的处理职责,所以叫作责任链模式。
责任链模式有效地降低了发送和接收者之间的耦合度,增强了系统的可扩展性。在责任链的模式下不仅能够针对单个处理器对象进行定制升级(每个处理器对象关注各自的任务),而且能够对整个责任链的处理器对象的顺序的调整以及增删。
本文约定:责任链上的接收对象统一称为处理器;本文中介绍的责任链属于GOF定义中责任链的变种即责任链上的所有处理器都会参与任务的处理。
责任链模式有多种实现方式,从驱动责任链上处理器方式的角度可以分类两类,即责任链驱动 和 责任链处理器自驱动。
2.1 处理器自驱动
public abstract class AbstractHandler { protected Handler next = null; public void setSuccessor(Handler next) { this.next = next; } public abstract void handle();} public class HandlerA extends AbstractHandler { @Override public void handle() { if (next != null) { next.handle(); } }} public class HandlerB extends AbstractHandler { @Override public void handle() { if (next != null) { next.handle(); } }} public class HandlerChain { private AbstractHandler head = null; private AbstractHandler tail = null; public void addHandler(AbstractHandler handler) { handler.setSuccessor(null); if (head == null) { head = handler; tail = handler; return; } tail.setSuccessor(handler); tail = handler; } public void handle() { if (head != null) { head.handle(); } }} public class Application { public static void main(String[] args) { HandlerChain chain = new HandlerChain(); chain.addHandler(new HandlerA()); chain.addHandler(new HandlerB()); chain.handle(); }}
说明:
2.2 责任链驱动
public interface IHandler { void doSomething();} public class HandlerA implements IHandler { @Override public void doSomething() { }} public class HandlerB implements IHandler { @Override public void doSomething() { }} public class HandlerChain { private List<IHandler> handlers = new ArrayList<>(); public void addHandler(IHandler handler) { handlers.add(handler); } public void handle() { for (IHandler handler : handlers) { handler.handle(); } }} public class Application { public static void main(String[] args) { HandlerChain chain = new HandlerChain(); chain.addHandler(new HandlerA()); chain.addHandler(new HandlerB()); chain.handle(); }}
说明:
责任链低耦合高扩展的特点让它在很多开源的框架中被采用,本文选取了开源框架中的Spring Interceptor、Servlet Filter、Dubbo、Sentinel进行责任链的实现介绍,通过对常用框架中责任链应用的了解能够更好掌握责任链落地并在日常的开发中积极的使用。
3.1 Spring Interceptor
3.1.1 Interceptor介绍
![]()
-
Spring中的拦截器(Interceptor) 用于拦截控制器方法的执行,可以在方法执行前后添加自定义逻辑类似于AOP编程思想。
-
Inteceptor的作用时机是在请求(request)进入servlet后,在进入Controller之前进行预处理。
-
Inteceptor的实际应用包括:认证授权、日志记录、字符编码转换,敏感词过滤等等。
-
Inteceptor中责任链的实现会从处理器的介绍,责任链的构建以及责任链的执行三个角度进行阐述。
3.1.2 处理器介绍
public interface HandlerInterceptor { boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception; void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception; void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception;} @Componentpublic class TimeInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { System.out.println("time interceptor preHandle"); return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { System.out.println("time interceptor postHandle"); } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { System.out.println("time interceptor afterCompletion"); }}
说明:
-
处理器Interceptor的接口HandlerInterceptor定义了三个方法,可在控制器方法执行前后添加自定义逻辑。
-
自定义处理器如上的TimeInterceptor需要自定义实现上述3个方法实现自我的逻辑。
-
所有的自定义处理会串联在HandlerExecutionChain类实现的责任链上。
3.1.3 责任链构建
public class HandlerExecutionChain { private final Object handler; private HandlerInterceptor[] interceptors; private List<HandlerInterceptor> interceptorList; private int interceptorIndex = -1; public void addInterceptor(HandlerInterceptor interceptor) { initInterceptorList().add(interceptor); } public void addInterceptors(HandlerInterceptor... interceptors) { if (!ObjectUtils.isEmpty(interceptors)) { CollectionUtils.mergeArrayIntoCollection(interceptors, initInterceptorList()); } } private List<HandlerInterceptor> initInterceptorList() { if (this.interceptorList == null) { this.interceptorList = new ArrayList<HandlerInterceptor>(); if (this.interceptors != null) { CollectionUtils.mergeArrayIntoCollection(this.interceptors, this.interceptorList); } } this.interceptors = null; return this.interceptorList; }}
说明:
3.1.4 责任链执行
public class DispatcherServlet extends FrameworkServlet { protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception { try { try { HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler()); if (!mappedHandler.applyPreHandle(processedRequest, response)) { return; } mv = ha.handle(processedRequest, response, mappedHandler.getHandler()); if (asyncManager.isConcurrentHandlingStarted()) { return; } applyDefaultViewName(processedRequest, mv); mappedHandler.applyPostHandle(processedRequest, response, mv); } catch (Exception ex) { } processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException); } catch (Exception ex) { } finally { } }} public class HandlerExecutionChain { private final Object handler; private HandlerInterceptor[] interceptors; private List<HandlerInterceptor> interceptorList; private int interceptorIndex = -1; boolean applyPreHandle(HttpServletRequest request, HttpServletResponse response) throws Exception { HandlerInterceptor[] interceptors = getInterceptors(); if (!ObjectUtils.isEmpty(interceptors)) { for (int i = 0; i < interceptors.length; i++) { HandlerInterceptor interceptor = interceptors[i]; if (!interceptor.preHandle(request, response, this.handler)) { triggerAfterCompletion(request, response, null); return false; } this.interceptorIndex = i; } } return true; } void applyPostHandle(HttpServletRequest request, HttpServletResponse response, ModelAndView mv) throws Exception { HandlerInterceptor[] interceptors = getInterceptors(); if (!ObjectUtils.isEmpty(interceptors)) { for (int i = interceptors.length - 1; i >= 0; i--) { HandlerInterceptor interceptor = interceptors[i]; interceptor.postHandle(request, response, this.handler, mv); } } }}
说明:
-
在servlet的doDispatch方法中依次触发责任链的applyPreHandle的前置处理方法、applyPostHandle的后置处理方法。
-
前置处理方法applyPreHandle会遍历责任链上的处理器从前往后依次处理,后置处理方法applyPostHandle会遍历责任链上的处理器从后往前依次处理。
-
处理器的驱动由责任链对象负责依次触发,非处理器对象自驱执行。
3.2 Servlet Filter
3.2.1 Filter介绍
![]()
-
Servlet过滤器是在Java Servlet规范2.3中定义的,它能够对Servlet容器的请求和响应对象进行检查和修改,是个典型的责任链。
-
在Servlet被调用之前检查Request对象并支持修改Request Header和Request内容。
-
在Servlet被调用之后检查Response对象并支修改Response Header和Response内容。
3.2.2 处理器介绍
public interface Filter { public void init(FilterConfig filterConfig) throws ServletException; public void doFilter ( ServletRequest request, ServletResponse response, FilterChain chain ) throws IOException, ServletException; public void destroy();} public class TimeFilter implements Filter { @Override public void init(FilterConfig filterConfig) throws ServletException { System.out.println("time filter init"); } @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { System.out.println("time filter doFilter"); filterChain.doFilter(servletRequest, servletResponse); }}
说明:
-
Servlet过滤器类要实现javax.servlet.Filter接口,该接口定义了通用的3个方法。
-
init方法:负责Servlet过滤器的初始化方法,Servlet容器创建Servlet过滤器实例过程中调用这个方法。
-
doFilter方法:当客户请求访问与过滤器关联的URL时,Servlet容器会调用该方法。
-
destroy方法:Servlet容器在销毁过滤器实例前调用该方法,可以释放过滤器占用的资源。
3.2.3 责任链构建
public final class ApplicationFilterChain implements FilterChain { private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0]; private int pos = 0; private int n = 0; void addFilter(ApplicationFilterConfig filterConfig) { for(ApplicationFilterConfig filter:filters) if(filter==filterConfig) return; if (n == filters.length) { ApplicationFilterConfig[] newFilters = new ApplicationFilterConfig[n + INCREMENT]; System.arraycopy(filters, 0, newFilters, 0, n); filters = newFilters; } filters[n++] = filterConfig; }}
说明:
-
ApplicationFilterChain作为Filter的责任链,负责责任链的构建和执行。
-
责任链通过ApplicationFilterConfig类型的数组对象filters保存Filter处理器。
-
责任链上处理器的添加通过保存到数组filters来实现。
3.2.4 责任链执行
public final class ApplicationFilterChain implements FilterChain { private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0]; private int pos = 0; private int n = 0; private void internalDoFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException { ApplicationFilterConfig filterConfig = filters[pos++]; try { Filter filter = filterConfig.getFilter(); if( Globals.IS_SECURITY_ENABLED ) { } else { filter.doFilter(request, response, this); } } catch (Throwable e) { } return; } try { if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) && Globals.IS_SECURITY_ENABLED ) { } else { servlet.service(request, response); } } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); throw new ServletException(sm.getString("filterChain.servlet"), e); } }}
说明:
-
整个责任链上Filter处理器的执行通过处理器自驱进行实现,而非由责任链对象驱动。
-
Filter处理器的在处理过程中除了执行自我逻辑,会通过filterChain.doFilter
(servletRequest, servletResponse)触发下一个处理器的执行。
3.3 Dubbo
3.3.1 Dubbo Filter介绍
![]()
图片分享自《DUBBO官网》
-
Dubbo的Filter作用时机如上图所示,Filter实现是专门为服务提供方和服务消费方调用过程进行拦截,Dubbo本身的大多功能均基于此扩展点实现,每次远程方法执行该拦截都会被执行。
-
Dubbo官方针对Filter做了很多的原生支持,目前大致有20来个吧,包括我们熟知的RpcContext,accesslog功能都是通过filter来实现了。
-
在实际业务开发中会对Filter接口进行扩展,在服务调用链路中嵌入我们自身的处理逻辑,如日志打印、调用耗时统计等。
3.3.2 处理器介绍
@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)public class AccessLogFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { try { if (ConfigUtils.isNotEmpty(accessLogKey)) { AccessLogData logData = buildAccessLogData(invoker, inv); log(accessLogKey, logData); } } catch (Throwable t) { } return invoker.invoke(inv); }}
说明:
3.3.3 责任链构建
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { this.protocol = protocol; } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } }; } } return last; }} static final class ProtocolFilterWrapper.1 implements Invoker < T > { final Invoker val$invoker; final Filter val$filter; final Invoker val$next; public Result invoke(Invocation invocation) throws RpcException { return this.val$filter.invoke(this.val$next, invocation); } ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) { this.val$invoker = invoker; this.val$filter = filter; this.val$next = invoker2; }}
说明:
3.3.4 责任链执行
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { public FailfastClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { } }} static final class ProtocolFilterWrapper.1 implements Invoker < T > { final Invoker val$invoker; final Filter val$filter; final Invoker val$next; public Result invoke(Invocation invocation) throws RpcException { return this.val$filter.invoke(this.val$next, invocation); } ProtocolFilterWrapper.1(Invoker invoker, Filter filter, Invoker invoker2) { this.val$invoker = invoker; this.val$filter = filter; this.val$next = invoker2; }
说明:
3.4 Sentinel
3.4.1 Sentinel Slot介绍
![]()
图片分享自《Sentinel官网》
-
Sentinel是面向分布式服务架构的流量治理组件,以流量为切入点提供熔断限流的功能保证系统的稳定性。
-
Sentinel 里面以Entry作为限流的资源对象,每个Entry创建的同时会关联一系列功能插槽(slot chain)。
-
Sentinel提供了通用的原生Slot处理不同的逻辑,同时支持自定义Slot来定制功能。
3.4.2 处理器介绍
public interface ProcessorSlot<T> { void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable; void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable; void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);} public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; }} public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); }}
说明:
-
Sentinel中的Slot需要实现
com.alibaba.csp.sentinel.slotchain.ProcessorSlot的通用接口。
-
自定义Slot一般继承抽象类AbstractLinkedProcessorSlot且只要改写entry/exit方法实现自定义逻辑。
-
Slot通过next变量保存下一个处理器Slot对象。
-
在自定义实现的entry方法中需要通过fireEntry触发下一个处理器的执行,在exit方法中通过fireExit触发下一个处理器的执行。
3.4.3 责任链构建
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { continue; } chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; }} public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; }}
说明:
-
ProcessorSlotChain作为Slot的责任链,负责责任链的构建和执行。
-
责任链上的处理器对象
AbstractLinkedProcessorSlot通过保存指向下一个处理器的对象的进行关联,整体以链表的形式进行串联。
-
责任链上的第一个处理器对象first本身不起任何作用,只是保存链表的头部。
3.4.4 责任链执行
public class CtSph implements Sph { private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); chain.entry(context, resourceWrapper, null, count, prioritized, args); return e; }} public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; }} public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; }}
说明:
-
整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
-
DefaultProcessorSlotChain的entry首先头部对象first,进而触发处理器的自驱实现处理器的执行。
-
整体按照entry →fireEntry →
transformEntry→ entry的循环顺序依次触发处理器的自驱。
在日常项目实践中,责任链的设计模式会在很多业务场景中落地。
譬如对于支持用户生成内容(UGC)的应用来说,用户生成的内容可能包含一些敏感内容如敏感言论或者图片等。针对这种应用场景,可以通过责任链模式设置多个处理器来处理不同的任务,如文本过滤器处理敏感词,图片过滤器处理敏感图片等等。
譬如对于电商服务中的下单流程来说,一个下单流程包含订单拆合单,优惠计算,订单生成等多个步骤,我们可以通过责任链模式设置多个处理器来处理不同的任务等等。
责任链的应用场景非常广泛,在常见的开源框架中有丰富的落地场景,同样在业务开发中也可以根据场景灵活使用。