如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz
使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。
什么是 Apache Camel?
Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。
Apache Camel 架构
Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。
使用 Elasticsearch 和 Apache Camel
我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。
1. 添加依赖项
配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。
如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>4.7.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-elasticsearch</artifactId> <version>4.7.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jackson</artifactId> <version>4.7.0</version> </dependency> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>8.14.3</version> </dependency>
2. 配置和运行 Camel 上下文
配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。
public class ESComponent { public static ElasticsearchComponent getInstance() { var elasticsearch = new ElasticsearchComponent(); elasticsearch.setHostAddresses("localhost:9200"); return elasticsearch; } public static String getName() { return "elasticsearch"; } }
然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。
try (var context = new DefaultCamelContext()) { context.addComponent(ESComponent.getName(), ESComponent.getInstance()); context.addRoutes(new OperationBulkRoute()); context.start(); }
随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。
3. 配置 Camel 路由
数据索引
我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。
路由 Elasticsearch 操作 bulk:
String URI_BULK_OPERATION = String .format("elasticsearch://elasticsearch?operation=%s&indexName=%s", IndexOperationConfig.BULK_OPERATION, INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder { private static final Log log = LogFactory.getLog(OperationBulkRoute.class); private static final int BULK_SIZE = 500; @Override public void configure() { from("file:src/main/resources?fileName=movies.json&noop=true") .routeId("route-bulk-ingest") .unmarshal().json() .split(body()) .aggregate(constant(true), new BulkAggregationStrategy()) .completionSize(BULK_SIZE) .to(URI_BULK_OPERATION) .process(exchange -> { var body = exchange.getIn().getBody(String.class); log.info(String.format("Response: %s", body)); }) .end(); } }
这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。
数据更新
下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。
我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。
try (var context = new DefaultCamelContext()) { context.addComponent(ESComponent.getName(), ESComponent.getInstance()); context.addRoutes(new IngestionRoute()); context.start(); ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:update-ingestion", documentCode); Thread.sleep(5000); }
接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。
public class IngestionRoute extends RouteBuilder { private static final Log log = LogFactory.getLog(IngestionRoute.class); @Override public void configure() throws Exception { from("direct:update-ingestion") .pipeline() .to("direct:search-by-id") .to(URI_SEARCH_OPERATION) .to("direct:update-rating") .to(URI_UPDATE_OPERATION) .process(exchange -> { var body = exchange.getIn().getBody(String.class); log.info(String.format("Response: %s", body)); }) .end(); from("direct:search-by-id") .process(new SearchByCodeProcessor()); from("direct:update-rating") .process(new UpdateRatingProcessor()); } }
SearchByCodeProcessor 处理器仅配置为执行搜索查询:
public class SearchByCodeProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { var code = exchange.getIn().getBody(); String query = "{\n" + " \"query\": {\n" + " \"term\": {\n" + " \"code\": {\n" + " \"value\":" + code + "\n" + " }\n" + " }\n" + " }\n" + "}"; exchange.setProperty("document_code", code); exchange.getIn().setBody(query); } }
UpdateRatingProcessor 处理器负责更新评级字段。
public class UpdateRatingProcessor implements Processor { private final ObjectMapper objectMapper; public UpdateRatingProcessor() { this.objectMapper = new ObjectMapper(); this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @Override public void process(Exchange exchange) throws Exception { HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class); var code = Long.parseLong(exchange.getProperty("document_code").toString()); if (response != null && response.hits() != null) { var documents = parseToMovies(response); var optionalMovie = documents.stream() .filter(document -> code == (document.getSource().getCode())).findAny(); optionalMovie.ifPresent(document -> { document.getSource().setRating(13.0); Map<String, Object> updateMap = new HashMap<>(); updateMap.put("doc", document.getSource()); exchange.getIn().setHeader("indexId", document.getId()); exchange.getIn().setBody(updateMap); }); } }
数据删除
最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。
路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。
public class OperationDeleteRoute extends RouteBuilder { private static final Log log = LogFactory.getLog(OperationDeleteRoute.class); @Override public void configure() { from("direct:op-delete") .routeId("route-delete") .setHeader("indexId", simple("${body}")) .to(URI_DELETE_OPERATION) .process(exchange -> { var body = exchange.getIn().getBody(String.class); log.info(String.format("Response: %s", body)); }) .end(); ; } }
String URI_DELETE_OPERATION = String .format("elasticsearch://elasticsearch?operation=%s&indexName=%s", IndexOperationConfig.DELETE_OPERATION, INDEX_NAME);
最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。
try (var context = new DefaultCamelContext()) { context.addComponent(ESComponent.getName(), ESComponent.getInstance()); context.addRoutes(new OperationDeleteRoute()); context.start(); ProducerTemplate producerTemplate = context.createProducerTemplate(); producerTemplate.sendBody("direct:op-delete", documentId); }
接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。
结论
Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。
参考资料
准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!
原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
3分钟理清QPS、TPS、RT 以及它们之间的关系
在评估系统性能的时候,我们经常会听到 QPS、TPS、RT、吞吐量等等一些概念,包括在一些面试场景下可能也会遇到这些概念,我们来稍微梳理一下。 做一个简单的概念扫盲。 一 QPS QPS(Queries Per Second) 是每秒的查询率,它表示一台服务每秒响应的查询的次数。 具体来说,QPS 反映了系统在高并发环境下处理请求的能力。一个高 QPS 的系统能够在单位时间内处理更多的请求,从而提供更好的用户体验和更高的吞吐量。相反,QPS 较低的系统可能在面对大量请求时会出现响应延迟或请求失败的情况。 举个栗子: 假设服务器 1 秒响应 500 次请求,那么此时 QPS 就是 500。 二 TPS TPS(Transactions Per Second) 表示每秒事务处理的数量,一个事务表示客户端向服务器发送请求,然后响应的过程。 举个栗子 比如用户在 jd 上下单的时候,每当用户下单请求被服务器接受到之后,服务需要保存订单、扣减商品库存、确认支付等等这一些列的操作,所有过程都完成后,将结果响应给客户端。这个完整的过程就是一次事务,TPS 则表示每秒内可以完成多少次这样的请求。 整体...
- 下一篇
包材推荐中的算法应用|得物技术
目录 一、业务背景 二、算法架构 规则 算法 三、算法原理 装箱 装袋 四、衍生应用 切箱 合包 箱型设计 包装方案推荐 五、作者结语 一、业务背景 任何一家电商的商品出库场景中,都涉及到打包——即把订单中的商品用包材进行包裹,常见的打包方式有装袋和装箱。 仓库打包作业主要决策两点: 包材类型:一般根据商品种类判断,比如衣服使用塑料袋、日用品使用纸箱等,具体由人工录入规则维护在系统中,针对订单输出对应可用包材类型; 包材型号:具体取决于装箱还是装袋,要求使用的纸箱的体积最小或者袋子的面积最小,并且能装下订单中的所有商品。在实操中,系统通常通过拆箱逻辑应对订单中商品较多以至最大型号都装不下的情形。 针对第二点,人工决策不仅效率低而且容易造成浪费,主要是包材型号较多,而且是折叠状态。作业人员肉眼难以甄别能装下所有商品且体积最小的型号,最保险的做法是选择较大的纸箱。这样会造成浪费包材,增加履约成本。此外空间利用率不高将导致商品在运输中容易破损,引发客诉甚至退换货。 在这样背景下,需要一套包材推荐算法来解决上述包材浪费和空间利用率不高的问题。 二、算法架构 在详细进入算法细节之前,先大致了...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Windows10,CentOS7,CentOS8安装Nodejs环境
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境