您现在的位置是:首页 > 文章详情

数据结构与算法的实际应用(有向图)——有依赖的任务并行处理框架

日期:2022-01-15点击:1360

背景

一开始接到一个技改需求,需要将我们系统的一些首页查询接口进行优化,合并为一个聚合接口提供给前端;最初与前端同学进行交流,认为这些首页查询接口没有依赖关系,因此对于我来说解决方案比较简单,直接将需要查询的接口进行并行处理即可:

其中每一个查询抽象成了一个Function,在子线程中调用apply方法执行。

但是在与前端同学联调时,发现其中一个接口的入参依赖另一个接口的结果,本想让前端给我传过来,但是这样的话又有一个接口会分离出去,影响首页加载速度,于是决定还是我后端想办法处理;如此一来,就需要解决查询时的前后依赖关系,一开始想了几种方案:

  • 使用CountDownLatch进行等待
  • 拿到前一个依赖任务的Future,调用get方法等待
  • 有依赖的任务串行执行

前两种方案大同小异,其实都是利用线程间的等待机制,待前置查询执行完成后,唤醒当前线程继续执行查询;第三种方案则简单粗暴,由于时间较紧,经过短暂思考后权利利弊还是决定用第三种方案,毕竟我现在只有两个子查询是有依赖的,这样做也是效率最高的办法;于是在生成查询的supplier中做了下手脚,将有依赖关系的Function通过andThen方法聚合为一个Function,便完成了需求,这样子查询顺序变成了(假设B依赖A):

至此,在需求上来说已经实现完成了,但是从技术角度来说不能算完整:假如新来了一个接口G,G也依赖于A,那么按照这种方案,ABG将会串行执行,然而BG之间没有依赖关系,它俩理应并行;又或者B依赖于A和C,则ABC会串行执行,然而AC应该并行才合理;再极端的例子,B依赖了ACEF,则整个聚合查询都退化为了单线程,显然更不合理。

所以,虽然现阶段这种方案没什么问题,但对于复杂的依赖关系处理是远不够完善的,因此我又思考了有没有更好的方式去处理,并且能够有一定的通用性。

重构的并行任务处理方案

在此类需求中,其实比较重要的一点就是如何处理这些任务的依赖关系,而且尽可能的让他们并行处理,由此引入了一些图的应用。 在我的方案中,有以下几个概念:

  • ExecutionContext:执行的上下文环境,所有子任务共用,用于处理数据交换和输入输出的问题,其实本质上是一个线程安全的Map。
  • ExecutionNode:任务节点,即一个子任务的抽象,内部包含:名称、依赖节点名称、可执行的Function
  • ExecutionGraph:整个任务流的执行图,包含所有的执行节点以及排序后的节点,任务流启动时将按照此类的描述执行。
  • ExecutionFlow:任务流的抽象,本质上也是一个Function,调用apply方法开始执行。
  • FlowConfiguration:任务执行的一些配置,包含前置Action、后置Action、线程执行器、超时时间等。

图的生成

ExecutionGraph是将任务的执行过程抽象成了一个有向无环图,在使用时,传入每个节点的定义及提前定义好的执行过程(Function)即可生成一个图。

其中节点的定义规则为:{节点名称}[{依赖节点A},{依赖节点B},...]

若无依赖,则没有方括号中的内容。 具体在构建执行图时,则相当于是对这个有向图做了一个简单的排序(拓扑排序):

  1. 首先将所有节点加入到remainMap,preMap为空
  2. 找到没有依赖的节点,作为第一层,并将这些节点从remainMap移除,加入到preMap中
  3. 遍历remainMap,找到所有依赖节点均已经在preMap中出现的节点,加入到preMap,并在remainMap移除,这些元素作为下一层
  4. 循环第3步,直至remainMap为空

需要注意的是,第3步中,如果remainMap在筛选前后大小不变,则认为是出现了循环依赖,或者依赖了一个不存在的节点,即有向图中出现了环,应抛出异常,否则将陷入死循环。 核心代码如下:

    /**
     * 构建执行图,实质上是对节点做排序
     *
     * @param preNodes   已处理的节点
     * @param remainNode 剩余节点
     * @param levels     有序节点(结果集)
     */
    private static void buildGraph(Map<String, ExecutionNode> preNodes, Map<String, ExecutionNode> remainNode, List<List<ExecutionNode>> levels) {

        // 若剩余节点为空,结束
        if (remainNode.isEmpty()) {
            return;
        }

        List<String> currentLevel;
        // 记录剩余节点个数
        int remainSize = remainNode.size();

        if (preNodes.isEmpty()) {
            // 若preNodes为空,则为第一层,找出无依赖的节点,即入度为0的节点
            currentLevel = remainNode.values()
                    .stream()
                    .filter(e -> e.getDepend().length == 0)
                    .map(ExecutionNode::getName)
                    .collect(Collectors.toList());
            // 若没有无依赖的节点,抛出异常
            if (currentLevel.isEmpty()) {
                throw new IllegalStateException("could not found init node");
            }
        } else {
            // 继续寻找前置节点已处理的节点,相当于继续寻找删除前面的节点依赖后,入度为0的节点
            currentLevel = remainNode.entrySet()
                    .stream()
                    .filter(e -> Stream.of(e.getValue().getDepend()).allMatch(preNodes::containsKey))
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());
        }

        List<ExecutionNode> nodes = new ArrayList<>(currentLevel.size());
        // currentLevel中为当前层级的节点,将其加入preNodes中记录起来,并从剩余节点中移除
        for (String name : currentLevel) {
            nodes.add(remainNode.get(name));
            preNodes.put(name, remainNode.remove(name));
        }

        // 若为剩余节点数量不变,说明图中有环形结构,或依赖了未定义的节点
        if (remainNode.size() == remainSize) {
            throw new IllegalStateException("there may be circular dependencies or dependency does not exist in your graph, exception nodes is: " + remainNode.values());
        }

        // 放入结果集
        levels.add(nodes);
        // 递归至下一层
        buildGraph(preNodes, remainNode, levels);
    }

示例 简单介绍个例子,对于如下节点定义:

A
B
C
D
E[A,B]
F[A,E]
G[C]
H[B,C]
I[G,H]
J[I]
K[D,E]
L[E,H]

则对应的图为:

这样,所有的执行任务被分为了四层,每一层之间的任务是互不依赖的,同时下一层的执行依赖上一层。

任务节点执行方案

按照这种图的排序算法分割之后,剩下的就比较简单了,我能想到的有两种方式:

  1. 每一层并行执行,全部完成之后执行下一层
  2. 利用线程间等待机制,等待前置任务执行完成通知当前线程

第一种我觉得不算很好,例如上图,考虑如下情况:L的执行时间较长,大于I和J的执行时间总和,即 t(L) > t(I) + t(J),则J完全没必要等待第三层全部执行完成,当I结束之后J就可以执行,若t(J) > t(L)时,更是浪费了较多时间。 所以我采用了CompleteFuture的get方法阻塞有依赖的线程,只要前置任务执行完毕,当前任务就可以开始,而不必等待上一层所有的任务执行完毕。

方案如下:

  1. 遍历每一层节点

    a. 对于当前层级的每一个节点,生成CompleteFuture,在构造Supplier时,先从futureMap获取所有的依赖节点的Future,循环调用其get方法,阻塞当前线程(若已经执行完成则直接返回结果,此处也可以使用allOf.get()方法代替)

    b. 将构造完成的CompleteFuture加入futureMap中

  2. 将futureMap所有的CompleteFuture取出来,使用allOf(...).get()方法等待全部执行完成。

理论上每一个节点只要前置执行完成,它就会开始,但是实际上,由于线程池调度等原因,可能还会在阻塞队列中等待一段时间,但是站在系统资源分配的角度,可以接受其带来的影响。 实现的核心代码:(其实这种传入参数并生成新函数的方式还有一个高大上的名字:函数柯里化?=_= )

    /**
     * 生成一个执行任务,返回的是一个新函数
     *
     * @param executionGraph    执行任务图
     * @param flowConfiguration 执行参数
     * @return
     */
    static ExecutionFlow buildFlow(@NonNull ExecutionGraph executionGraph, @NonNull FlowConfiguration flowConfiguration) {
        SpelExpressionParser expressionParser = new SpelExpressionParser();
        return ctx -> {
            long startTime = System.currentTimeMillis();
            List<List<ExecutionNode>> sortedNodes = executionGraph.getSortedNodes();
            Map<String, CompletableFuture<ExecutionContext>> futureMap = new HashMap<>(16);
            // 按图序构建CompleteFuture
            for (List<ExecutionNode> nodes : sortedNodes) {
                // 同一层级之间的节点没有依赖,且依赖的CompletableFuture一定已经构建完成
                for (ExecutionNode node : nodes) {
                    String currNodeName = node.getName();
                    CompletableFuture<ExecutionContext> future = CompletableFuture.supplyAsync(() -> {
                        try {
                            long t1 = System.currentTimeMillis();
                            // 前置操作
                            ofNullable(flowConfiguration.getPreAction()).ifPresent(c -> c.accept(ctx));
                            String[] depend = node.getDepend();
                            // 遍历依赖节点,确保依赖节点已经执行完成
                            for (String s : depend) {
                                try {
                                    // 阻塞等待
                                    futureMap.get(s).get();
                                    logger.info("current node [{}] waiting node [{}] done ({}ms)", currNodeName, s, System.currentTimeMillis() - t1);
                                } catch (InterruptedException | ExecutionException e) {
                                    throw new IllegalStateException("current node [" + currNodeName + "] waiting node [" + s + "] failed", e);
                                }
                            }
                            long t2 = System.currentTimeMillis();
                            logger.info("current node [{}] start", currNodeName);

                            // 前置表达式参数检查
                            //noinspection ConstantConditions
                            if (Objects.nonNull(node.getBeforeCheckExpression()) &&
                                !node.getBeforeCheckExpression().getValue(ctx.getCtx(), boolean.class)) {
                                throw new IllegalStateException(String.format("before check failed by expression [%s]"
                                        , node.getBeforeCheckExpression().getExpressionString()));
                            }
                            // 执行节点
                            ExecutionContext ret = node.getExecution().apply(ctx);
                            // 用于标识节点是否正确执行
                            ctx.setVar("$" + currNodeName + "_success", true);

                            // 后置表达式参数检查
                            //noinspection ConstantConditions
                            if (Objects.nonNull(node.getAfterCheckExpression()) &&
                                !node.getAfterCheckExpression().getValue(ctx.getCtx(), boolean.class)) {
                                throw new IllegalStateException(String.format("after check failed by expression [%s]"
                                        , node.getAfterCheckExpression().getExpressionString()));
                            }
                            long tl = System.currentTimeMillis();
                            logger.info("current node [{}] execute done (total: {}ms, execution: {}ms)", currNodeName, tl - t1, tl - t2);
                            return ret;
                        } catch (Throwable e) {
                            logger.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e);
                            if (node.isSkipOnFail()) {
                                return ctx;
                            }
                            throw e;
                        } finally {
                            // 后置操作
                            ofNullable(flowConfiguration.getFinalAction()).ifPresent(c -> c.accept(ctx));
                        }
                    }, flowConfiguration.getExecutor() == null ? ForkJoinPool.commonPool() : flowConfiguration.getExecutor());
                    // 将构建好的CompletableFuture放入Map
                    futureMap.put(node.getName(), future);
                }
            }
            try {
                // 组合所有的Future,等待所有节点行完毕
                CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0]));
                logger.info("waiting flow execution down, ttl={}", flowConfiguration.getTimeout());
                if (flowConfiguration.getTimeout() > 0) {
                    completableFuture.get(flowConfiguration.getTimeout(), TimeUnit.SECONDS);
                } else {
                    completableFuture.get();
                }
                logger.info("execution flow success ({}ms)", System.currentTimeMillis() - startTime);
                return ctx;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                logger.warn("execution flow failed: {} ({}ms)", e.getMessage(), System.currentTimeMillis() - startTime);
                throw new IllegalStateException(e);
            }
        };
    }

扩展功能

虽然已经完成了我们需要的功能,但是还不够完善,例如在实际执行每一个节点时,有的节点可以容忍失败,有的不可以,或者想在节点执行前后能检查一些参数是否满足要求,等等,所以我在这里又对节点定义的表达式做了一些增强,以此来支持一些额外的操作,同时也能在后续进行扩展:

在原有的基础上做一下改造:

{节点名称}[{依赖节点A},{依赖节点B}, ...]({扩展操作A名称}({表达式}), {扩展操作A名称}({表达式}), ...)

是否忽略失败

用于在当前节点执行发生异常的时候,是否将异常抛出,还是忽略这个异常

扩展名称:skipOnFail

示例:E[A,B](skipOnFail(true))

......
} catch (Throwable e) {
  log.warn("current node [{}] execute failed: {}, skip exception={}", currNodeName, e.getMessage(), node.isSkipOnFail(), e);
  if (node.isSkipOnFail()) {
    return ctx;
  }
	throw e;
} finally {
......

参数检查

在针对有依赖的场景下,当前节点如何知道所依赖的节点是否执行正常呢,若有异常,我们可以通过异常得知执行失败,但是有时候不会有异常抛出(例如调用的子方法有拦截器的时候,有可能在拦截器中会处理掉异常信息),需要检查数据才能得知是否正确执行,如果把这个检查的操作由节点内部定义的函数来做,似乎是比较繁琐的,而且这一部分我觉得由框架来统一处理掉是比较合适的,我想到的方案是用表达式的形式来检查参数: 给每个节点增加一个前置和后置的表达式,表达式的返回值是boolean类型,若为false则抛出异常;

表达式结构:{节点名称}[{依赖节点A},{依赖节点B}, ...](beforeCheck({布尔表达式}), afterCheck({布尔表达式}))

示例:J[I](beforeCheck([c] == 0 && [k] == 9 && [c] != null), afterCheck([a] == 1))

表达式遵循SpringEL语法,解析时的上下文为ExecutionContext中的变量

执行样例

执行样例代码:

public static void main(String[] args) {
    Random random = new Random();
    // 简单映射一个函数集合,随机暂停一段时间,模拟函数执行,部分函数里面做了一些简单的值的设定,模拟结果输出
    Map<String, Function<ExecutionContext, ExecutionContext>> functionMap = Arrays.stream("A,B,C,D,E,F,G,H,I,J,K,L".split(",")).collect(Collectors.toMap(
            e -> e,
            e -> ctx -> {
                try {
                    int i = 500 + 500 * random.nextInt(5);
                    Thread.sleep(i);
                    ctx.setVar(e, i);
                    switch (e) {
                        case "A":
                            ctx.setVar("a", 1);
                            break;
                        case "B":
                            ctx.setVar("a", 0);
                            break;
                        case "C":
                            ctx.setVar("c", 3);
                            break;
                        case "G":
                            ctx.setVar("g", 9);
                            ctx.setVar("a", 1);
                            break;
                    }
                } catch (InterruptedException ignored) {
                }
                return ctx;
            }
    ));

    // 构造一个执行图的定义
    ExecutionGraph graph = ExecutionGraph.createGraph(new ArrayList<>() {{
        add("A");
        add("B[A](beforeCheck([a] == 1))");
        add("C");
        add("D");
        add("E[A,B](beforeCheck([a]==2); skipOnFail(true))"); // 该节点会发生异常,但是配置了跳过
        add("F[A,E]");
        add("G[C]");
        add("H[B,C]");
        add("I[G,H]");
        add("J[I](beforeCheck( T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null); afterCheck([c] >= 1))");
        add("K[D,E]");
        add("L[E,H]");
    }}, functionMap);

    // CompletableFuture默认使用的ForkJoinPool.commonPool,可以试下使用cachedThreadPool会有什么不一样?
    // ExecutorService executorService = Executors.newCachedThreadPool();
    ExecutionFlow executionFlow = ExecutionFlow.buildFlow(graph);
    executionFlow.apply(new ExecutionContext());
}

执行日志:

/Library/Java/JavaVirtualMachines/jdk-11.0.8.jdk/Contents/Home/bin/java -Dvisualvm.id=76728482855487 -javaagent:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=55008:/Users/windlively/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/213.6461.79/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/windlively/IdeaProjects/leisure/frame/target/classes:/Users/windlively/.m2/repository/org/projectlombok/lombok/1.18.10/lombok-1.18.10.jar:/Users/windlively/.m2/repository/org/springframework/spring-expression/5.2.2.RELEASE/spring-expression-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-core/5.2.2.RELEASE/spring-core-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/spring-jcl/5.2.2.RELEASE/spring-jcl-5.2.2.RELEASE.jar:/Users/windlively/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.2.RELEASE/spring-boot-starter-logging-2.2.2.RELEASE.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/windlively/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/windlively/.m2/repository/org/slf4j/slf4j-api/1.7.29/slf4j-api-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/Users/windlively/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/Users/windlively/.m2/repository/org/slf4j/jul-to-slf4j/1.7.29/jul-to-slf4j-1.7.29.jar:/Users/windlively/.m2/repository/org/apache/commons/commons-lang3/3.9/commons-lang3-3.9.jar ink.windlively.frame.parallelprocessor.Main
12:38:04.122 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - start create graph, node definition: [A, B[A<-](beforeCheck([a] == 1)), C, D, E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true)), F[A<-,E<-], G[C<-], H[B<-,C<-], I[G<-,H<-], J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1)), K[D<-,E<-], L[E<-,H<-]]
12:38:04.165 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionGraph - build execution graph success: 
A		C		D
B[A<-](beforeCheck([a] == 1))		G[C<-]
E[A<-,B<-](beforeCheck([a]==2),skipOnFail(true))		H[B<-,C<-]
F[A<-,E<-]		I[G<-,H<-]		K[D<-,E<-]		L[E<-,H<-]
J[I<-](beforeCheck(T(java.util.Arrays).asList(1,2,3).contains([c]) and [g] == [c]*[c] and [c] != null),afterCheck([c] >= 1))

12:38:04.183 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - waiting flow execution down, ttl=0
12:38:04.183 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] start
12:38:04.184 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] start
12:38:04.183 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] start
12:38:05.189 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [D] execute done (total: 1006ms, execution: 1006ms)
12:38:05.687 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [A] execute done (total: 1504ms, execution: 1503ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [A] done (498ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] waiting node [A] done (1506ms)
12:38:05.689 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [A] done (1505ms)
12:38:05.690 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] start
12:38:06.688 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [C] execute done (total: 2505ms, execution: 2505ms)
12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] waiting node [C] done (2505ms)
12:38:06.689 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] start
12:38:06.689 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [D] done (0ms)
12:38:07.247 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [B] execute done (total: 3064ms, execution: 1557ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] waiting node [B] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [B] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] start
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] waiting node [C] done (3064ms)
12:38:07.248 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] start
12:38:07.261 [ForkJoinPool.commonPool-worker-15] WARN ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [E] execute failed: before check failed by expression [[a]==2], skip exception=true
java.lang.IllegalStateException: before check failed by expression [[a]==2]
	at ink.windlively.frame.parallelprocessor.ExecutionFlow.lambda$buildFlow$2(ExecutionFlow.java:81)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] waiting node [E] done (2071ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] start
12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] waiting node [E] done (573ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [E] done (14ms)
12:38:07.262 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] start
12:38:07.767 [ForkJoinPool.commonPool-worker-9] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [F] execute done (total: 2576ms, execution: 505ms)
12:38:08.194 [ForkJoinPool.commonPool-worker-11] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [G] execute done (total: 4010ms, execution: 1505ms)
12:38:08.195 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [G] done (2506ms)
12:38:09.266 [ForkJoinPool.commonPool-worker-5] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [K] execute done (total: 2577ms, execution: 2004ms)
12:38:09.749 [ForkJoinPool.commonPool-worker-13] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [H] execute done (total: 5565ms, execution: 2501ms)
12:38:09.750 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] waiting node [H] done (2502ms)
12:38:09.750 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] waiting node [H] done (4062ms)
12:38:09.751 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] start
12:38:09.751 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] start
12:38:10.753 [ForkJoinPool.commonPool-worker-7] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [L] execute done (total: 3505ms, execution: 1002ms)
12:38:11.752 [ForkJoinPool.commonPool-worker-3] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [I] execute done (total: 6064ms, execution: 2001ms)
12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] waiting node [I] done (4491ms)
12:38:11.753 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] start
12:38:12.301 [ForkJoinPool.commonPool-worker-15] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - current node [J] execute done (total: 5039ms, execution: 548ms)
12:38:12.301 [main] INFO ink.windlively.frame.parallelprocessor.ExecutionFlow - execution flow success (8130ms)

进程已结束,退出代码0

后记

在引入SpringEL表达式之后,其扩展性已经非常强了,Spring表达式支持Java的许多语法、Spring Bean的调用、方法调用等,例如可以仅用配置表达式的方式,在表达式中通过Bean调用的方式定义任务执行过程,并自动设置出入参,配置出一个任务流程,而无需再在代码中预先定义好functionMap。

该小框架已经上线稳定运行一段时间了,支撑着我们业务APP首页数据大量的查询请求,将原有几秒钟的接口总耗时将至平均300ms左右,但仍有许多可以改进的地方,也欢迎提出意见,共同探讨。

代码地址

GitHub:https://github.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor

Gitee:https://gitee.com/windlively/leisure/tree/master/frame/src/main/java/ink/windlively/frame/parallelprocessor

原文链接:https://my.oschina.net/windlively/blog/5398125
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章