使用 LF Edge eKuiper 将物联网流处理数据写入 Databend
作者:韩山杰
Databend Cloud 研发工程师
LF Edge eKuiper
LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。
Databend Sql Sink
eKuiper 支持通过 Golang 或者 Python 在源 (Source)
,SQL 函数
, 目标 (Sink)
三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。
编译 eKuiper 和 Databend Sql Plugin
eKuiper
git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make
Databend Sql Plugin
go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go
编译后的 sink plugin 拷贝到 build 目录:
cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks
Databend 建表
在 Databend 中先创建目标表 ekuiper_test:
create table ekuiper_test (name string,size bigint,id bigint);
启动 eKuiperd
cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64
./bin/kuiperd
服务正常启动:
创建流(stream) 和 规则 (rule)
eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。
创建 stream
流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。
首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml
。
default:
# 文件的类型,支持 json, csv 和 lines
fileType: json
# 文件以 eKuiper 为根目录的目录或文件的绝对路径。
# 请勿在此处包含文件名。文件名应在流数据源中定义
path: data
# 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0
interval: 0
# 读取后,两条数据发送的间隔时间
sendInterval: 0
# 是否并行读取目录中的文件
parallel: false
# 文件读取后的操作
# 0: 文件保持不变
# 1: 删除文件
# 2: 移动文件到 moveTo 定义的位置
actionAfterRead: 0
# 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况
moveTo: /tmp/kuiper/moved
# 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。
hasHeader: false
# 定义文件的列。如果定义了文件头,该选项将被覆盖。
# columns: [id, name]
# 忽略开头多少行的内容。
ignoreStartLines: 0
# 忽略结尾多少行的内容。最后的空行不计算在内。
ignoreEndLines: 0
# 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。
decompression: ""
使用 CLI 创建 steam 名为 stream1
:
./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'
Json 文件的内容为:
[
{"id": 1,"size":100, "name": "John Doe"},
{"id": 2,"size":200, "name": "Jane Smith"},
{"id": 3,"size":300, "name": "Kobe Brant"},
{"id": 4,"size":400, "name": "Alen Iverson"}
]
创建 Databend Sink Rule
一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。
这里我们通过指定 sql
和 actions
属性,以声明的方式定义规则的业务逻辑。其中,sql
定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action
路由到多个位置。
规则由 JSON 定义,下面是准备创建的规则 myRule.json:
{
"id": "myRule",
"sql": "SELECT id, name from stream1",
"actions": [
{
"log": {
},
"sql": {
"url": "databend://databend:databend@localhost:8000/default?sslmode=disable",
"table": "ekuiper_test",
"fields": ["id","name"]
}
}
]
}
执行 CLI 创建规则:
./bin/kuiper create rule myRule -f myRule.json
可以查看所创建规则的运行状态:
./bin/kuiper getstatus rule myRule
规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:
可以看到由于我们的规则 SQL 中只指定了 id
, name
字段,所以这里只有这两个字段被写入。
结论
eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
GaussDB数据库SQL系列-定义重载函数
目录 一、前言 二、函数重载的定义 三、GaussDB创建自定义重载函数的事项说明 四、GaussDB数据库中的自定义重载函数示例 示例一:创建package属性重载函数,根据不同的SQL条件获取生成视图 示例二:创建package属性重载函数,根据不同的参数求图形面积 五、小结 一、前言 在本文中,我们将介绍GaussDB数据库中的用户定义函数重载的概念、用法以及示例。用户定义函数是 SQL 中常用的“编程工具”,允许我们自定义函数来处理和操作数据。而函数重载则是指在一个数据库中定义多个具有相同名字但参数不同的函数,以此实现不同的功能。 二、函数重载的定义 函数重载是一种允许在同一个数据库中定义多个同名函数的特性。这些同名函数在参数类型、数量或顺序上有所不同,因此可以根据传入的参数的不同来调用不同的函数。通过函数重载,我们可以使用相同的函数名字来实现多个功能,提高了代码的可读性和重用性。 三、GaussDB创建自定义重载函数的事项说明 兼容PostgreSQL风格的函数或者带有PACKAGE属性的函数支持重载。在指定REPLACE的时候,如果参数个数、类型、返回值有变化,不会替换原有...
-
下一篇
Node.js中常用的设计模式有哪些?
本文由葡萄城技术团队首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 设计模式简介 设计模式是由经验丰富的程序员在日积月累中抽象出的用以解决通用问题的可复用解决方案,它提供了标准化的代码设计方案提升开发体验。Node.js 作为一款用来构建可扩展高性能应用的流行平台,自然也遵循设计模式解决通用问题。本文中,我们将讨论 Node.js 中设计模式的重要性并提供一些代码示例。 构建 Node.js 应用为何需要设计模式 设计模式为软件开发提供了一套标准化的解决方案。构建 Node.js 应用时,善用设计模式能够帮助开发者提升代码质量,节约开发时间,减少出错几率。同时也方便开发人员之间的沟通交流。 示例代码 单例模式 该模式用来保证特定的类在整个应用中只能创建唯一实例。Node.js 中,单例模式可以保证在同一个应用中,每个模块只有唯一实例。 class Singleton { constructor() { if (Singleton.instance) { return Singleton.instance; } Singleton.ins...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- MySQL数据库在高并发下的优化方案
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8编译安装MySQL8.0.19