Pipeline模式应用 | 京东云技术团队
本文记录Pipeline设计模式在业务流程编排中的应用
前言
Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。
本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整条流水线的组装,并将“原材料”加工为“商品”。其中管道产品负责承载各个阶段的产品信息;管道任务负责不同阶段对产品的加工;管道节点约束了管道产品及任务的关系,通过信号量定义了任务的执行方式。
依赖
工具依赖如下
<!-- 工具类大全 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>最新版本</version> </dependency>
编程示例
1. 管道产品定义
package com.example.demo.pipeline.model; /** * 管道产品接口 * * @param <S> 信号量 * @author * @date 2023/05/15 11:49 */ public interface PipelineProduct<S> { }
2. 管道任务定义
package com.example.demo.pipeline.model; /** * 管道任务接口 * * @param <P> 管道产品 * @author * @date 2023/05/15 11:52 */ @FunctionalInterface public interface PipelineJob<P> { /** * 执行任务 * * @param product 管道产品 * @return {@link P} */ P execute(P product); }
3. 管道节点定义
package com.jd.baoxian.mall.market.service.pipeline.model; import java.util.function.Predicate; /** * 管道节点定义 * * @param <S> 信号量 * @param <P> 管道产品 * @author * @date 2023/05/15 11:54 */ public interface PipelineNode<S, P extends PipelineProduct<S>> { /** * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob * * @param pipelineJob 管道任务 * @return {@link PipelineNode}<{@link S}, {@link P}> */ PipelineNode<S, P> flax(PipelineJob<P> pipelineJob); /** * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob * * @param signal 信号 * @param pipelineJob 管道任务 * @return {@link PipelineNode}<{@link S}, {@link P}> */ PipelineNode<S, P> flax(S signal, PipelineJob<P> pipelineJob); /** * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob * * @param predicate 信号 * @param pipelineJob 管道任务 * @return {@link PipelineNode}<{@link S}, {@link P}> */ PipelineNode<S, P> flax(Predicate<S> predicate, PipelineJob<P> pipelineJob); /** * 管道节点-任务执行 * * @param product 管道产品 * @return {@link P} */ P execute(P product); }
4. 管道产品、任务,节点的实现
4.1 管道产品
package com.example.demo.pipeline.factory; import com.example.demo.model.request.DemoReq; import com.example.demo.model.response.DemoResp; import com.example.demo.pipeline.model.PipelineProduct; import lombok.*; /** * 样例-管道产品 * * @author * @date 2023/05/15 14:04 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class DemoPipelineProduct implements PipelineProduct<DemoPipelineProduct.DemoSignalEnum> { /** * 信号量 */ private DemoSignalEnum signal; /** * 产品-入参及回参 */ private DemoProductData productData; /** * 异常信息 */ private Exception exception; /** * 流程Id */ private String tradeId; @Data @Builder @NoArgsConstructor @AllArgsConstructor public static class DemoProductData { /** * 待验证入参 */ private DemoReq userRequestData; /** * 待验证回参 */ private DemoResp userResponseData; } /** * 产品-信号量 * * @author * @date 2023/05/15 13:54 */ @Getter public enum DemoSignalEnum { /** * */ NORMAL(0, "正常"), /** * */ CHECK_NOT_PASS(1, "校验不通过"), /** * */ BUSINESS_ERROR(2, "业务异常"), /** * */ LOCK_ERROR(3, "锁处理异常"), /** * */ DB_ERROR(4, "事务处理异常"), ; /** * 枚举码值 */ private final int code; /** * 枚举描述 */ private final String desc; /** * 构造器 * * @param code * @param desc */ DemoSignalEnum(int code, String desc) { this.code = code; this.desc = desc; } } }
4.2 管道任务(抽象类)
package com.example.demo.pipeline.factory.job; import cn.hutool.core.util.ClassUtil; import cn.hutool.json.JSONUtil; import com.example.demo.pipeline.factory.DemoPipelineProduct; import com.example.demo.pipeline.model.PipelineJob; import lombok.extern.slf4j.Slf4j; /** * 管道任务-抽象层 * * @author * @date 2023/05/15 19:48 */ @Slf4j public abstract class AbstractDemoJob implements PipelineJob<DemoPipelineProduct> { /** * 公共执行逻辑 * * @param product 产品 * @return */ @Override public DemoPipelineProduct execute(DemoPipelineProduct product) { DemoPipelineProduct.DemoSignalEnum newSignal; try { newSignal = execute(product.getTradeId(), product.getProductData()); } catch (Exception e) { product.setException(e); newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR; } product.setSignal(newSignal); defaultLogPrint(product.getTradeId(), product); return product; } /** * 子类执行逻辑 * * @param tradeId 流程Id * @param productData 请求数据 * @return * @throws Exception 异常 */ abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception; /** * 默认的日志打印 */ public void defaultLogPrint(String tradeId, DemoPipelineProduct product) { if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) { log.info("流水线任务处理异常:流程Id=【{}】,信号量=【{}】,任务=【{}】,参数=【{}】", tradeId, product.getSignal(), ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException()); } } }
4.3 管道节点
package com.example.demo.pipeline.factory; import cn.hutool.core.util.ClassUtil; import cn.hutool.json.JSONUtil; import com.example.demo.pipeline.model.PipelineJob; import com.example.demo.pipeline.model.PipelineNode; import lombok.extern.slf4j.Slf4j; import java.util.function.Predicate; /** * 审核-管道节点 * * @author * @date 2023/05/15 14:32 */ @Slf4j public class DemoPipelineNode implements PipelineNode<DemoPipelineProduct.DemoSignalEnum, DemoPipelineProduct> { /** * 下一管道节点 */ private DemoPipelineNode next; /** * 当前管道任务 */ private PipelineJob<DemoPipelineProduct> job; /** * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob * * @param pipelineJob 管道任务 * @return {@link DemoPipelineNode} */ @Override public DemoPipelineNode flax(PipelineJob<DemoPipelineProduct> pipelineJob) { return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob); } /** * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob * * @param signal 信号 * @param pipelineJob 管道任务 * @return {@link DemoPipelineNode} */ @Override public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob<DemoPipelineProduct> pipelineJob) { return flax(signal::equals, pipelineJob); } /** * 节点组装,上个管道过来的信号运行 predicate 后是true的话,执行 pipelineJob * * @param predicate * @param pipelineJob * @return */ @Override public DemoPipelineNode flax(Predicate<DemoPipelineProduct.DemoSignalEnum> predicate, PipelineJob<DemoPipelineProduct> pipelineJob) { this.next = new DemoPipelineNode(); this.job = (job) -> { if (predicate.test(job.getSignal())) { return pipelineJob.execute(job); } else { return job; } }; return next; } /** * 管道节点-任务执行 * * @param product 管道产品 * @return */ @Override public DemoPipelineProduct execute(DemoPipelineProduct product) { // 执行当前任务 try { product = job == null ? product : job.execute(product); return next == null ? product : next.execute(product); } catch (Exception e) { log.error("流水线处理异常:流程Id=【{}】,任务=【{}】,参数=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException()); return null; } } }
5. 业务实现
通过之前的定义,我们已经可以通过Pipeline完成流水线的搭建,接下来以“审核信息提交”这一业务场景,完成应用。
5.1 定义Api、入参、回参
package com.example.demo.api; import com.example.demo.model.request.DemoReq; import com.example.demo.model.response.DemoResp; import com.example.demo.pipeline.factory.PipelineForManagerSubmit; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * 演示-API * * @author * @date 2023/08/06 16:27 */ @Service public class DemoManagerApi { /** * 管道-审核提交 */ @Resource private PipelineForManagerSubmit pipelineForManagerSubmit; /** * 审核提交 * * @param requestData 请求数据 * @return {@link DemoResp} */ public DemoResp managerSubmit(DemoReq requestData) { return pipelineForManagerSubmit.managerSubmitCheck(requestData); } } package com.example.demo.model.request; /** * 演示入参 * * @author * @date 2023/08/06 16:33 */ public class DemoReq { } package com.example.demo.model.response; import lombok.Data; /** * 演示回参 * * @author * @date 2023/08/06 16:33 */ @Data public class DemoResp { /** * 成功标识 */ private Boolean success = false; /** * 结果信息 */ private String resultMsg; /** * 构造方法 * * @param message 消息 * @return {@link DemoResp} */ public static DemoResp buildRes(String message) { DemoResp response = new DemoResp(); response.setResultMsg(message); return response; } }
5.2 定义具体任务
假定审核提交的流程需要包含:参数验证、加锁、解锁、事务提交
package com.example.demo.pipeline.factory.job; import cn.hutool.json.JSONUtil; import com.example.demo.model.request.DemoReq; import com.example.demo.pipeline.factory.DemoPipelineProduct; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * 加锁-实现层 * * @author * @date 2023/05/17 17:00 */ @Service @Slf4j public class CheckRequestLockJob extends AbstractDemoJob { /** * 子类执行逻辑 * * @param tradeId 流程Id * @param productData 请求数据 * @return * @throws Exception 异常 */ @Override DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception { DemoReq userRequestData = productData.getUserRequestData(); log.info("任务[{}]加锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId); return DemoPipelineProduct.DemoSignalEnum.NORMAL; } } package com.example.demo.pipeline.factory.job; import cn.hutool.json.JSONUtil; import com.example.demo.model.request.DemoReq; import com.example.demo.pipeline.factory.DemoPipelineProduct; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * 解锁-实现层 * * @author * @date 2023/05/17 17:00 */ @Service @Slf4j public class CheckRequestUnLockJob extends AbstractDemoJob { /** * 子类执行逻辑 * * @param tradeId 流程Id * @param productData 请求数据 * @return * @throws Exception 异常 */ @Override DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception { DemoReq userRequestData = productData.getUserRequestData(); log.info("任务[{}]解锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId); return DemoPipelineProduct.DemoSignalEnum.NORMAL; } } package com.example.demo.pipeline.factory.job; import cn.hutool.json.JSONUtil; import com.example.demo.model.request.DemoReq; import com.example.demo.pipeline.factory.DemoPipelineProduct; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 审核-参数验证-实现类 * * @author * @date 2023/05/15 19:50 */ @Slf4j @Component public class ManagerCheckParamJob extends AbstractDemoJob { /** * 执行基本入参验证 * * @param tradeId * @param productData 请求数据 * @return */ @Override DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) { /* * 入参验证 */ DemoReq userRequestData = productData.getUserRequestData(); log.info("任务[{}]入参验证,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId); // 非空验证 // 有效验证 // 校验通过,退出 return DemoPipelineProduct.DemoSignalEnum.NORMAL; } } package com.example.demo.pipeline.factory.job; import cn.hutool.json.JSONUtil; import com.example.demo.model.request.DemoReq; import com.example.demo.model.response.DemoResp; import com.example.demo.pipeline.factory.DemoPipelineProduct; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * 审核-信息提交-业务实现 * * @author * @date 2023/05/12 14:36 */ @Service @Slf4j public class ManagerSubmitJob extends AbstractDemoJob { /** * 子类执行逻辑 * * @param tradeId 流程Id * @param productData 请求数据 * @return * @throws Exception 异常 */ @Override DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception { DemoReq userRequestData = productData.getUserRequestData(); try { /* * DB操作 */ log.info("任务[{}]信息提交,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId); productData.setUserResponseData(DemoResp.buildRes("成功")); } catch (Exception ex) { log.error("审核-信息提交-DB操作失败,入参:{}", JSONUtil.toJsonStr(userRequestData), ex); throw ex; } return DemoPipelineProduct.DemoSignalEnum.NORMAL; } }
5.3 完成流水线组装
针对入回参转换,管道任务执行顺序及执行信号量的构建
package com.example.demo.pipeline.factory; import com.example.demo.model.request.DemoReq; import com.example.demo.model.response.DemoResp; import com.example.demo.pipeline.factory.job.CheckRequestLockJob; import com.example.demo.pipeline.factory.job.CheckRequestUnLockJob; import com.example.demo.pipeline.factory.job.ManagerCheckParamJob; import com.example.demo.pipeline.factory.job.ManagerSubmitJob; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Objects; import java.util.UUID; /** * 管道工厂入口-审核流水线 * * @author * @date 2023/05/15 19:52 */ @Slf4j @Service @RequiredArgsConstructor public class PipelineForManagerSubmit { /** * 审核-管道节点 */ private final DemoPipelineNode managerSubmitNode = new DemoPipelineNode(); /** * 审核-管道任务-提交-防刷锁-加锁 */ private final CheckRequestLockJob checkRequestLockJob; /** * 审核-管道任务-提交-防刷锁-解锁 */ private final CheckRequestUnLockJob checkRequestUnLockJob; /** * 审核-管道任务-参数验证 */ private final ManagerCheckParamJob managerCheckParamJob; /** * 审核-管道任务-事务操作 */ private final ManagerSubmitJob managerSubmitJob; /** * 组装审核的处理链 */ @PostConstruct private void assembly() { assemblyManagerSubmit(); } /** * 组装处理链 */ private void assemblyManagerSubmit() { managerSubmitNode // 参数验证及填充 .flax(managerCheckParamJob) // 防刷锁 .flax(checkRequestLockJob) // 事务操作 .flax(managerSubmitJob) // 锁释放 .flax((ignore) -> true, checkRequestUnLockJob); } /** * 审核-提交处理 * * @param requestData 入参 * @return */ public DemoResp managerSubmitCheck(DemoReq requestData) { DemoPipelineProduct initialProduct = managerSubmitCheckInitial(requestData); DemoPipelineProduct finalProduct = managerSubmitNode.execute(initialProduct); if (Objects.isNull(finalProduct) || Objects.nonNull(finalProduct.getException())) { return DemoResp.buildRes("未知异常"); } return finalProduct.getProductData().getUserResponseData(); } /** * 审核-初始化申请的流水线数据 * * @param requestData 入参 * @return 初始的流水线数据 */ private DemoPipelineProduct managerSubmitCheckInitial(DemoReq requestData) { // 初始化 return DemoPipelineProduct.builder() .signal(DemoPipelineProduct.DemoSignalEnum.NORMAL) .tradeId(UUID.randomUUID().toString()) .productData(DemoPipelineProduct.DemoProductData.builder().userRequestData(requestData).build()) .build(); } }
总结
本文重点为管道模式的抽象与应用,上述示例仅为个人理解。实际应用中,此案例长于应对各种规则冗杂的业务场景,便于规则编排。
待改进点:
-
各个任务其实隐含了执行的先后顺序,此项内容可进一步实现;
-
针对最后“流水线组装”这一步,可通过配置描述的方式,进一步抽象,从而将变动控制在每个“管道任务”的描述上,针对规则项做到“可插拔”式处理。
作者:京东保险 侯亚东
来源:京东云开发者社区 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
一文讲解如何从 Clickhouse 迁移数据至 DolphinDB
ClickHouse是 Yandex 公司于2016年开源的 OLAP 列式数据库管理系统,主要用于 WEB 流量分析。凭借面向列式存储、支持数据压缩、完备的 DBMS 功能、多核心并行处理的特点,ClickHouse 被广泛应用于广告流量、移动分析、网站分析等领域。 DolphinDB是一款国产高性能分布式时序数据库,拥有图灵完备性的编程语言 DolphinDB Script 和高性能低延时的流计算框架,为海量结构化数据的快速存储、检索、分析及计算提供一站式解决方案,适用于量化金融及工业物联网等领域。自发布以来,DolphinDB 凭借卓越的性能、低维护成本、操作简单、可扩展性高等优势,吸引了大量的国内外用户。 本教程旨在为 ClickHouse 使用者提供一份迁移数据至 DolphinDB 的简明参考。 1. DolphinDB vs ClickHouse 1.1 DolphinDB 与 ClickHouse 的相同点 分布式存储和分析:DolphinDB 和 ClickHouse 都设计为支持大规模数据的分布式存储和分析。它们可以处理海量数据,并提供快速的查询和分析能力。 列式存...
- 下一篇
从ClickHouse通往MySQL的几条道路 | 京东物流技术团队
一、应用背景简介 ClickHouse 是 Yandex(俄罗斯最大的搜索引擎)开源的一个用于实时数据分析的基于列存储的数据库,其处理数据的速度比传统方法快 100-1000 倍。ClickHouse 的性能超过了目前市场上可比的面向列的 DBMS,每秒钟每台服务器每秒处理数亿至十亿多行和数十千兆字节的数据。它是一个用于联机分析(OLAP)的列式数据库管理系统;(OLAP是仓库型数据库,主要是读取数据,做复杂数据分析,侧重技术决策支持,提供直观简单的结果) 那 ClickHouse OLAP 适用场景有:1)读多于写;2)大宽表,读大量行但是少量列,结果集较小;3)数据批量写入,且数据不更新或少更新;4)无需事务,数据一致性要求低;5)灵活多变,不适合预先建模。 MySQL是一个关系型数据库管理系统,广泛用于各种应用程序和网站开发。MySQL容易上手和学习,已经被广泛应用于各种生产环境中有良好的稳定性和可靠性,MySQL支持事务处理,能够保证数据的完整性和一致性,适合需要复杂数据处理和事务控制的应用。 在我们应用中的使用场景来看,简单来说通常会看中了clickhouse在处理大批量数据...
相关文章
文章评论
共有0条评论来说两句吧...