您现在的位置是:首页 > 文章详情

数据结构与算法的实际应用(有向图)——有依赖的任务并行处理框架

日期:2022-01-15点击:1041

背景

一开始接到一个技改需求,需要将我们系统的一些首页查询接口进行优化,合并为一个聚合接口提供给前端;最初与前端同学进行交流,认为这些首页查询接口没有依赖关系,因此对于我来说解决方案比较简单,直接将需要查询的接口进行并行处理即可:

其中每一个查询抽象成了一个Function,在子线程中调用apply方法执行。

但是在与前端同学联调时,发现其中一个接口的入参依赖另一个接口的结果,本想让前端给我传过来,但是这样的话又有一个接口会分离出去,影响首页加载速度,于是决定还是我后端想办法处理;如此一来,就需要解决查询时的前后依赖关系,一开始想了几种方案:

  • 使用CountDownLatch进行等待
  • 拿到前一个依赖任务的Future,调用get方法等待
  • 有依赖的任务串行执行

前两种方案大同小异,其实都是利用线程间的等待机制,待前置查询执行完成后,唤醒当前线程继续执行查询;第三种方案则简单粗暴,由于时间较紧,经过短暂思考后权利利弊还是决定用第三种方案,毕竟我现在只有两个子查询是有依赖的,这样做也是效率最高的办法;于是在生成查询的supplier中做了下手脚,将有依赖关系的Function通过andThen方法聚合为一个Function,便完成了需求,这样子查询顺序变成了(假设B依赖A):

至此,在需求上来说已经实现完成了,但是从技术角度来说不能算完整:假如新来了一个接口G,G也依赖于A,那么按照这种方案,ABG将会串行执行,然而BG之间没有依赖关系,它俩理应并行;又或者B依赖于A和C,则ABC会串行执行,然而AC应该并行才合理;再极端的例子,B依赖了ACEF,则整个聚合查询都退化为了单线程,显然更不合理。

所以,虽然现阶段这种方案没什么问题,但对于复杂的依赖关系处理是远不够完善的,因此我又思考了有没有更好的方式去处理,并且能够有一定的通用性。

重构的并行任务处理方案

在此类需求中,其实比较重要的一点就是如何处理这些任务的依赖关系,而且尽可能的让他们并行处理,由此引入了一些图的应用。 在我的方案中,有以下几个概念:

  • ExecutionContext:执行的上下文环境,所有子任务共用,用于处理数据交换和输入输出的问题,其实本质上是一个线程安全的Map。
  • ExecutionNode:任务节点,即一个子任务的抽象,内部包含:名称、依赖节点名称、可执行的Function
  • ExecutionGraph:整个任务流的执行图,包含所有的执行节点以及排序后的节点,任务流启动时将按照此类的描述执行。
  • ExecutionFlow:任务流的抽象,本质上也是一个Function,调用apply方法开始执行。
  • FlowConfiguration:任务执行的一些配置,包含前置Action、后置Action、线程执行器、超时时间等。

图的生成

ExecutionGraph是将任务的执行过程抽象成了一个有向无环图,在使用时,传入每个节点的定义及提前定义好的执行过程(Function)即可生成一个图。

其中节点的定义规则为:{节点名称}[{依赖节点A},{依赖节点B},...]

若无依赖,则没有方括号中的内容。 具体在构建执行图时,则相当于是对这个有向图做了一个简单的排序(拓扑排序):

  1. 首先将所有节点加入到remainMap,preMap为空
  2. 找到没有依赖的节点,作为第一层,并将这些节点从remainMap移除,加入到preMap中
  3. 遍历remainMap,找到所有依赖节点均已经在preMap中出现的节点,加入到preMap,并在remainMap移除,这些元素作为下一层
  4. 循环第3步,直至remainMap为空

需要注意的是,第3步中,如果remainMap在筛选前后大小不变,则认为是出现了循环依赖,或者依赖了一个不存在的节点,即有向图中出现了环,应抛出异常,否则将陷入死循环。 核心代码如下:

 /** * 构建执行图,实质上是对节点做排序 * * @param preNodes 已处理的节点 * @param remainNode 剩余节点 * @param levels 有序节点(结果集) */ private static void buildGraph(Map<String, ExecutionNode> preNodes, Map<String, ExecutionNode> remainNode, List<List<ExecutionNode>> levels) { // 若剩余节点为空,结束 if (remainNode.isEmpty()) { return; } List<String> currentLevel; // 记录剩余节点个数 int remainSize = remainNode.size(); if (preNodes.isEmpty()) { // 若preNodes为空,则为第一层,找出无依赖的节点,即入度为0的节点 currentLevel = remainNode.values() .stream() .filter(e -> e.getDepend().length == 0) .map(ExecutionNode::getName) .collect(Collectors.toList()); // 若没有无依赖的节点,抛出异常 if (currentLevel.isEmpty()) { throw new IllegalStateException("could not found init node"); } } else { // 继续寻找前置节点已处理的节点,相当于继续寻找删除前面的节点依赖后,入度为0的节点 currentLevel = remainNode.entrySet() .stream() .filter(e -> Stream.of(e.getValue().getDepend()).allMatch(preNodes::containsKey)) .map(Map.Entry::getKey) .collect(Collectors.toList()); } List<ExecutionNode> nodes = new ArrayList<>(currentLevel.size()); // currentLevel中为当前层级的节点,将其加入preNodes中记录起来,并从剩余节点中移除 for (String name : currentLevel) { nodes.add(remainNode.get(name)); preNodes.put(name, remainNode.remove(name)); } // 若为剩余节点数量不变,说明图中有环形结构,或依赖了未定义的节点 if (remainNode.size() == remainSize) { throw new IllegalStateException("there may be circular dependencies or dependency does not exist in your graph, exception nodes is: " + remainNode.values()); } // 放入结果集 levels.add(nodes); // 递归至下一层 buildGraph(preNodes, remainNode, levels); } 

示例 简单介绍个例子,对于如下节点定义:

A B C D E[A,B] F[A,E] G[C] H[B,C] I[G,H] J[I] K[D,E] L[E,H] 

则对应的图为:

这样,所有的执行任务被分为了四层,每一层之间的任务是互不依赖的,同时下一层的执行依赖上一层。

任务节点执行方案

按照这种图的排序算法分割之后,剩下的就比较简单了,我能想到的有两种方式:

  1. 每一层并行执行,全部完成之后执行下一层
  2. 利用线程间等待机制,等待前置任务执行完成通知当前线程

第一种我觉得不算很好,例如上图,考虑如下情况:L的执行时间较长,大于I和J的执行时间总和,即 t(L) > t(I) + t(J),则J完全没必要等待第三层全部执行完成,当I结束之后J就可以执行,若t(J) > t(L)时,更是浪费了较多时间。 所以我采用了CompleteFuture的get方法阻塞有依赖的线程,只要前置任务执行完毕,当前任务就可以开始,而不必等待上一层所有的任务执行完毕。

方案如下:

  1. 遍历每一层节点

    a. 对于当前层级的每一个节点,生成CompleteFuture,在构造Supplier时,先从futureMap获取所有的依赖节点的Future,循环调用其get方法,阻塞当前线程(若已经执行完成则直接返回结果,此处也可以使用allOf.get()方法代替)

    b. 将构造完成的CompleteFuture加入futureMap中

  2. 将futureMap所有的CompleteFuture取出来,使用allOf(...).get()方法等待全部执行完成。

理论上每一个节点只要前置执行完成,它就会开始,但是实际上,由于线程池调度等原因,可能还会在阻塞队列中等待一段时间,但是站在系统资源分配的角度,可以接受其带来的影响。 实现的核心代码:(其实这种传入参数并生成新函数的方式还有一个高大上的名字:函数柯里化?=_= )

 /** * 生成一个执行任务,返回的是一个新函数 * * @param executionGraph 执行任务图 * @param flowConfiguration 执行参数 * @return */ static ExecutionFlow buildFlow(@NonNull ExecutionGraph executionGraph, @NonNull FlowConfiguration flowConfiguration) { SpelExpressionParser expressionParser = new SpelExpressionParser(); return ctx -> { long startTime = System.currentTimeMillis(); List<List<ExecutionNode>> sortedNodes = executionGraph.getSortedNodes(); Map<String, CompletableFuture<ExecutionContext>> futureMap = new HashMap<>(16); // 按图序构建CompleteFuture for (List<ExecutionNode> nodes : sortedNodes) { // 同一层级之间的节点没有依赖,且依赖的CompletableFuture一定已经构建完成 for (ExecutionNode node : nodes) { String currNodeName = node.getName(); CompletableFuture<ExecutionContext> future = CompletableFuture.supplyAsync(() -> { try { long t1 = System.currentTimeMillis(); // 前置操作 ofNullable(flowConfiguration.getPreAction()).ifPresent(c -> c.accept(ctx)); String[] depend = node.getDepend(); // 遍历依赖节点,确保依赖节点已经执行完成 for (String s : depend) { try { // 阻塞等待 futureMap.get(s).get(); logger.info("current node [{}] waiting node [{}] done ({}ms)", currNodeName, s, System.currentTimeMillis() - t1); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("current node [" + currNodeName + "] waiting node [" + s + "] failed", e); } } long t2 = System.currentTimeMillis(); logger.info("current node [{}] start", currNodeName); // 前置表达式参数检查 //noinspection ConstantConditions if (Objects.nonNull(node.getBeforeCheckExpression()) && !node.getBeforeCheckExpression().getValue(ctx.getCtx(), boolean.class)) { throw new IllegalStateException(String.format("before check failed by expression [%s]" , node.getBeforeCheckExpression().getExpressionString())); } // 执行节点 ExecutionContext ret = node.getExecution().apply(ctx); // 用于标识节点是否正确执行 ctx.setVar("$" + currNodeName + "_success", true); // 后置表达式参数检查 //noinspection ConstantConditions if (Objects.nonNull(node.getAfterCheckExpression()) && !node.getAfterCheckExpression().getValue(ctx.getCtx(), boolean.class)) { throw new IllegalStateException(String.format("after check failed by expression [%s]" , node.getAfterCheckExpression().getExpressionString())); } long tl = System.currentTimeMillis(); logger.info("current node [{}] execute done (total: {}ms, execution: {}ms)", currNodeName, tl - t1, tl - t2); return ret; } catch (Throwable e) { logger.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e); if (node.isSkipOnFail()) { return ctx; } throw e; } finally { // 后置操作 ofNullable(flowConfiguration.getFinalAction()).ifPresent(c -> c.accept(ctx)); } }, flowConfiguration.getExecutor() == null ? ForkJoinPool.commonPool() : flowConfiguration.getExecutor()); // 将构建好的CompletableFuture放入Map futureMap.put(node.getName(), future); } } try { // 组合所有的Future,等待所有节点行完毕 CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])); logger.info("waiting flow execution down, ttl={}", flowConfiguration.getTimeout()); if (flowConfiguration.getTimeout() > 0) { completableFuture.get(flowConfiguration.getTimeout(), TimeUnit.SECONDS); } else { completableFuture.get(); } logger.info("execution flow success ({}ms)", System.currentTimeMillis() - startTime); return ctx; } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.warn("execution flow failed: {} ({}ms)", e.getMessage(), System.currentTimeMillis() - startTime); throw new IllegalStateException(e); } }; } 

扩展功能

虽然已经完成了我们需要的功能,但是还不够完善,例如在实际执行每一个节点时,有的节点可以容忍失败,有的不可以,或者想在节点执行前后能检查一些参数是否满足要求,等等,所以我在这里又对节点定义的表达式做了一些增强,以此来支持一些额外的操作,同时也能在后续进行扩展:

在原有的基础上做一下改造:

{节点名称}[{依赖节点A},{依赖节点B}, ...]({扩展操作A名称}({表达式}), {扩展操作A名称}({表达式}), ...)

是否忽略失败

用于在当前节点执行发生异常的时候,是否将异常抛出,还是忽略这个异常

扩展名称:skipOnFail

示例:E[A,B](skipOnFail(true))

...... } catch (Throwable e) { log.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e); if (node.isSkipOnFail()) { return ctx; } throw e; } finally { ...... 

参数检查

在针对有依赖的场景下,当前节点如何知道所依赖的节点是否执行正常呢,若有异常,我们可以通过异常得知执行失败,但是有时候不会有异常抛出(例如调用的子方法有拦截器的时候,有可能在拦截器中会处理掉异常信息),需要检查数据才能得知是否正确执行,如果把这个检查的操作由节点内部定义的函数来做,似乎是比较繁琐的,而且这一部分我觉得由框架来统一处理掉是比较合适的,我想到的方案是用表达式的形式来检查参数: 给每个节点增加一个前置和后置的表达式,表达式的返回值是boolean类型,若为false则抛出异常;

表达式结构:{节点名称}[{依赖节点A},{依赖节点B}, ...](beforeCheck({布尔表达式}), afterCheck({布尔表达式}))

示例:J[I](beforeCheck([c] == 0 && [k] == 9 && [c] != null), afterCheck([a] == 1))

表达式遵循SpringEL语法,解析时的上下文为ExecutionContext中的变量

执行样例

执行样例代码:

public static void main(String[] args) { Random random = new Random(); // 简单映射一个函数集合,随机暂停一段时间,模拟函数执行,部分函数里面做了一些简单的值的设定,模拟结果输出 Map<String, Function<ExecutionContext, ExecutionContext>> functionMap = Arrays.stream("A,B,C,D,E,F,G,H,I,J,K,L".split(",")).collect(Collectors.toMap( e -> e, e -> ctx -> { try { int i = 500 + 500 * random.nextInt(5); Thread.sleep(i); ctx.setVar(e, i); switch (e) { case "A": ctx.setVar("a", 1); break; case "B": ctx.setVar("a", 0); break; case "C": ctx.setVar("c", 3); break; case "G": ctx.setVar("g", 9); ctx.setVar("a", 1); break; } } catch (InterruptedException ignored) { } return ctx; } )); // 构造一个执行图的定义 ExecutionGraph graph = ExecutionGraph.createGraph(new ArrayList<>() {{ add("A"); add("B[A](beforeCheck([a] == 1))"); add("C"); add("D"); add("E[A,B](beforeCheck([a]==2); skipOnFail(true))"); // 该节点会发生异常,但是配置了跳过 add("F[A,E]"); add("G[C]"); add("H[B,C]"); add("I[G,H]"); add("J[I](beforeCheck( T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null); afterCheck([c] >= 1))"); add("K[D,E]"); add("L[E,H]"); }}, functionMap); // CompletableFuture默认使用的ForkJoinPool.commonPool,可以试下使用cachedThreadPool会有什么不一样? // ExecutorService executorService = Executors.newCachedThreadPool(); ExecutionFlow executionFlow = ExecutionFlow.buildFlow(graph); executionFlow.apply(new ExecutionContext()); } 

执行日志:

/Library/Java/JavaVirtualMachines/jdk-11.0.8.jdk/Contents/Home/bin/java -Dvisualvm.id=76728482855487 -javaagent:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=55008:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/windlively/IdeaProjects/leisure/frame/target/classes:/Users/windlively/.m2/repository/org/projectlombok/lombok/1.18.10/lombok-1.18.10.jar:/Users/windlively/.m2/repository/org/springframework/spring-expression/5.2.2.RELEASE/spring-expression-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-core/5.2.2.RELEASE/spring-core-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-jcl/5.2.2.RELEASE/spring-jcl-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.2.RELEASE/spring-boot-starter-logging-2.2.2.RELEASE.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/windlively/.m2/repository/org/slf4j/slf4j-api/1.7.29/slf4j-api-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/Users/windlively/.m2/repository/org/slf4j/jul-to-slf4j/1.7.29/jul-to-slf4j-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/commons/commons-lang3/3.9/commons-lang3-3.9.jar ink.windlively.frame.parallelprocessor.Main 12:38:04.122 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - start create graph, node definition: [A, B[A<-](beforeCheck([a] == 1)), C, D, E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true)), F[A<-,E<-], G[C<-], H[B<-,C<-], I[G<-,H<-], J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1)), K[D<-,E<-], L[E<-,H<-]] 12:38:04.165 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - build execution graph success: A C D B[A<-](beforeCheck([a] == 1)) G[C<-] E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true)) H[B<-,C<-] F[A<-,E<-] I[G<-,H<-] K[D<-,E<-] L[E<-,H<-] J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1)) 12:38:04.183 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - waiting flow execution down, ttl=0 12:38:04.183 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] start 12:38:04.184 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] start 12:38:04.183 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] start 12:38:05.189 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] execute done (total: 1006ms, execution: 1006ms) 12:38:05.687 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] execute done (total: 1504ms, execution: 1503ms) 12:38:05.689 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [A] done (498ms) 12:38:05.689 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] waiting node [A] done (1506ms) 12:38:05.689 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [A] done (1505ms) 12:38:05.690 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] start 12:38:06.688 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] execute done (total: 2505ms, execution: 2505ms) 12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] waiting node [C] done (2505ms) 12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] start 12:38:06.689 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [D] done (0ms) 12:38:07.247 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] execute done (total: 3064ms, execution: 1557ms) 12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [B] done (3064ms) 12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [B] done (3064ms) 12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] start 12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [C] done (3064ms) 12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] start 12:38:07.261 [ForkJoinPool.commonPool-worker-15] WARN ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] execute failed: before check failed by expression [[a]==2], skip exception=true java.lang.IllegalStateException: before check failed by expression [[a]==2] at ink.windlively.frame.parallelprocessor.ExecutionFlow.lambda$buildFlow$2(ExecutionFlow.java:81) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) 12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [E] done (2071ms) 12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] start 12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [E] done (573ms) 12:38:07.262 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [E] done (14ms) 12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] start 12:38:07.767 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] execute done (total: 2576ms, execution: 505ms) 12:38:08.194 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] execute done (total: 4010ms, execution: 1505ms) 12:38:08.195 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [G] done (2506ms) 12:38:09.266 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] execute done (total: 2577ms, execution: 2004ms) 12:38:09.749 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] execute done (total: 5565ms, execution: 2501ms) 12:38:09.750 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [H] done (2502ms) 12:38:09.750 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [H] done (4062ms) 12:38:09.751 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] start 12:38:09.751 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] start 12:38:10.753 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] execute done (total: 3505ms, execution: 1002ms) 12:38:11.752 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] execute done (total: 6064ms, execution: 2001ms) 12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] waiting node [I] done (4491ms) 12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] start 12:38:12.301 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] execute done (total: 5039ms, execution: 548ms) 12:38:12.301 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - execution flow success (8130ms) 进程已结束,退出代码0 

后记

在引入SpringEL表达式之后,其扩展性已经非常强了,Spring表达式支持Java的许多语法、Spring Bean的调用、方法调用等,例如可以仅用配置表达式的方式,在表达式中通过Bean调用的方式定义任务执行过程,并自动设置出入参,配置出一个任务流程,而无需再在代码中预先定义好functionMap。

该小框架已经上线稳定运行一段时间了,支撑着我们业务APP首页数据大量的查询请求,将原有几秒钟的接口总耗时将至平均300ms左右,但仍有许多可以改进的地方,也欢迎提出意见,共同探讨。

代码地址

GitHub:https://github.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor

Gitee:https://gitee.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor

原文链接:https://my.oschina.net/windlively/blog/5398125
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章