如何将数据从 AWS S3 导入到 Elastic Cloud - 第 1 部分:Elastic Serverless Forwarder
作者:来自 Elastic Hemendra Singh Lodhi
这是多部分博客系列的第一部分,探讨了将数据从 AWS S3 导入 Elastic Cloud 的不同选项。
Elasticsearch 提供了多种从 AWS S3 存储桶导入数据的选项,允许客户根据其特定需求和架构策略选择最合适的方法。
这些是从 AWS S3 导入数据的主要选项:
- Elastic Serverless Forwarder (ESF) - 我们在本博客中的重点
- Elastic Agent - 第 2 部分
- Elastic S3 Native Connector - 第 3 部分
选项比较
Features | ESF | Elastic Agent | S3 Connector |
---|---|---|---|
Logs | ✅ | ✅ | ✅[[^1]] |
Metrics | ❌ | ✅ | ✅[[^2]] |
Cost | Medium-Lambda,SQS | Low-EC2,SQS | Low-Elastic Enterprise Search |
Scaling | Auto - Unlimited | EC2 instance size | Enterprise Search Node size |
Operation | Low - Monitor Lambda function | High - Manage Agents | Low |
PrivateLink | ✅ | ✅ | NA (Pull from S3) |
Primary Use Case | Logs |
注 1:由于 AWS 对可触发 Lambda 函数的服务有限制,并且你无法使用 CloudWatch 指标上的订阅过滤器调用 Lambda,因此 ESF 不支持指标收集。但是,考虑到成本,可以将指标存储在 S3 中,并通过 SQS 触发提取到 Elastic。
注 2:虽然 S3 连接器可以从 S3 存储桶中提取日志和指标,但它最适合提取内容、文件、图像和其他数据类型
在本博客中,我们将重点介绍如何使用 Elastic Serverless Forwarder (ESF) 从 AWS S3 中提取数据。在接下来的部分中,我们将探索 Elastic Agent 和 Elastic S3 Native Connector 方法。
让我们开始吧。
按照以下步骤启动 Elastic Cloud 部署:
Elastic Cloud
1)如果尚未创建,请创建一个帐户,并在 AWS 中创建 Elastic 部署。
2)创建部署后,请记下 Elasticsearch 端点。可以在 Elastic Cloud 控制台的-> Manage -> Deployments 下找到它。
Elastic Serverless Forwarder
Elastic Serverless Forwarder 是一个 AWS Lambda 函数,可将 VPC Flow 日志、WAF、Cloud Trail 等日志从 AWS 环境转发到 Elastic。它可用于将数据发送到 Elastic Cloud 以及进行自我管理部署。
功能
- 支持多个输入
- S3(通过 SQS 事件通知)
- Kinesis 数据流
- CloudWatch Logs 订阅过滤器
- SQS 消息负载
- 使用 “continuing queue” 和 “replay queue”(由无服务器转发器自动创建)至少传递一次
- 支持通过 PrivateLink 进行数据传输,允许在 AWS 虚拟私有云(或 VPC)内而不是在公共网络上传输数据。
- Lambda 函数是一种 AWS 无服务器计算托管服务,可根据代码执行请求自动扩展
- 函数执行时间经过优化,并根据需要分配最佳内存大小
- 按使用量付费定价,只需为 Lambda 函数执行期间的计算时间和 SQS 事件通知付费
数据流
我们将使用 S3 输入和 SQS 通知将 VPC 流日志发送到 Elastic Cloud:
- VPC 流日志配置为写入 S3 存储桶
- 将日志写入 S3 存储桶后,S3 事件通知 (S3:ObjectCreated) 将发送到 SQS
- 包含事件元数据的 SQS 事件通知触发 Lambda 函数,该函数从存储桶中读取日志
- 部署转发器时会创建连续队列(Continuing queue),并确保至少交付一次。转发器会跟踪上次发送的事件,并在转发器函数超过 15 分钟的运行时间(Lambda 最大默认值)时帮助处理待处理事件
- 部署转发器时也会创建重放队列(Replay queue),并处理日志提取异常。转发器会跟踪失败的事件并将其写入重放队列以供以后提取。例如,在我的测试中,我输入了错误的 Elastic API 密钥,导致身份验证失败,从而填满了重放队列。你可以启用重播队列作为 ESF lambda 函数的触发器,以再次使用来自 S3 存储桶的消息。首先解决交付失败很重要;否则消息将在重放队列中累积。你可以永久设置此触发器,但可能需要根据消息失败问题删除/重新启用。要启用触发器,请转到 SQS -> elastic-serverless-forwarder-replay-queue- -> under Lambda triggers -> Configure Lambda function trigger -> Select the ESF lamnda function
设置
1)创建 S3 存储桶 s3-vpc-flow-logs-elastic 来存储 VPC 流日志
AWS Console -> S3 -> Create bucket.。你可以将其他设置保留为默认设置,也可以根据要求进行更改:
复制存储桶 ARN,下一步配置流日志时需要此 ARN:
2)启用 VPC Flow 日志并发送到 S3 bucket s3-vpc-flow-logs-elastic
AWS Console -> VPC -> Select VPC -> Flow logs。保留其他设置或根据要求进行更改:
提供流日志的名称,选择要应用的过滤器、聚合间隔和流日志存储的目标:
完成后,它将如下所示,以 S3 为目的地。今后,通过此 VPC 的所有流量都将存储在存储桶 s3-vpc-flow-logs-elastic 中:
3)创建 SQS 队列
注 1:在与 S3 存储桶相同的区域中创建 SQS 队列
注 2:将可 visiblity timeout 设置为 910 秒,比 AWS Lambda 函数最大运行时间 900 秒多 10 秒。
AWS Console -> Amazon SQS -> Create queue
提供队列名称并将可见性超时更新为 910 秒。Lambda 函数最多运行 900 秒(15 分钟),为可见性超时设置更高的值允许消费者 Elastic Serverless Forwarder(ESF)处理并从队列中删除消息:
更新 SQS 访问策略(高级)以允许 S3 存储桶向 SQS 队列发送通知。将 account-id 替换为你的 AWS 帐户 ID。保留其他选项的默认设置。
在这里,我们指定 S3 从 S3 存储桶向 SQS 队列 (ARN) 发送消息:
{ "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": "SQS:SendMessage", "Resource": "arn:aws:sqs:ap-southeast-2:<account-id>:sqs-vpc-flow-logs-elastic-serverless-forwarder", "Condition": { "StringEquals": { "aws:SourceAccount": "<account-id>" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:::s3-vpc-flow-logs-elastic" } } } ] }
有关 AWS 集成的权限要求(IAM 用户)的更多详细信息,请参见此处。
在“详细信息”下的队列设置中复制 SQS ARN:
4)在 S3 存储桶中启用 VPC 流日志事件通知
AWS Console > S3. Select bucket s3-vpc-flow-logs-elastic
-> Properties and Create event notification
提供名称以及你想要触发 SQS 的事件类型。我们已选择在将任何对象添加到存储桶时创建对象:
选择 destination 为 SQS queue 并选择 sqs-vpc-flow-logs-elastic-serverless-forwarder:
保存后,配置将如下所示:
创建另一个 S3 存储桶来存储 Elastic Serverless Forwarder 的配置文件:
创建一个名为 config.yaml 的文件并使用以下配置进行更新。完整选项集在此处:
inputs: - type: "s3-sqs" id: "arn:aws:sqs:ap-southeast-2:xxxxxxxxxx:sqs-vpc-flow-logs-elastic-serverless-forwarder" outputs: - type: "elasticsearch" args: # either elasticsearch_url or cloud_id, elasticsearch_url takes precedence if both are included elasticsearch_url: "https://e286410s58ae4ad6a446c10596ked613.ap-southeast-2.aws.found.io:443" #cloud_id: "cloud_id:bG9jYWxob3N0OjkyMDAkMA==" # either api_key or username/password, username/password takes precedence if both are included api_key: "LlVqN3Q1RUi3TThuexxxxxxxxxx9RlJRdjniY0JubktEdm9oOUtaNU9mdw==" #username: "username" #password: "password" #es_datastream_name: "aws.vpcflow" es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760
输入类型:s3-sqs。我们使用带有 SQS 通知选项的 S3
输出:
- elasticsearch_url:来自上述 Elastic Cloud 部署创建部分的 elasticsearch 端点
- api_key:使用此处的说明创建 Elasticsearch API 密钥(用户 API 密钥)
- es_datastream_name:转发器支持自动路由 aws.cloudtrail、aws.cloudwatch_logs、aws.elb_logs、aws.firewall_logs、aws.vpcflow 和 aws.waf 日志。对于其他日志类型,你可以将其设置为所需的命名约定。
将其他选项保留为默认值。
将 config.yaml 上传到 s3 存储桶 s3-vpc-flow-logs-serverless-forwarder-config 中:
6)安装 AWS 集成资产
Elastic 集成预先打包了资产,可简化收集、解析、索引和可视化。集成使用具有特定索引命名约定的数据流,这有助于入门。转发器也可以写入任何其他流名称。
按照步骤安装 Elastic AWS 集成。
Kibana -> Management -> Integrations,搜索 AWS:
7)部署 Elastic Serverless Forwarder
有几种方法可以从 SAR(Serverless Application Repository)部署 Elastic Serverless Forwarder:
- 使用 AWS 控制台
- 使用 AWS Cloudformation
- 使用 Terraform
- 直接部署可提供更多自定义选项
我们将使用 AWS 控制台选项来部署 ESF。
注意:直接使用 AWS 控制台时,每个区域只允许部署一次。
AWS Console -> Lambda -> Application -> Create Application,搜索 elastic-serverless-forwarder:
在应用程序设置下提供以下详细信息:
- Application name -
elastic-serverless-forwarder
- ElasticServerlessForwarderS3Buckets -
s3-vpc-flow-logs-elastic
- ElasticServerlessForwarderS3ConfigFile -
s3://s3-vpc-flow-logs-serverless-forwarder-config/config.yaml
- ElasticServerlessForwarderS3SQSEvent -
arn:aws:sqs:ap-southeast-2:xxxxxxxxxxx:sqs-vpc-flow-logs-elastic-serverless-forwarder
部署成功后,Lambda 部署的状态应为 “Create Complete”:
以下是成功部署 ESF 后自动创建的 SQS 队列:
一切设置正确后,S3 存储桶 s3-vpc-flow-logs-elastic 中发布的流日志将向 SQS 发送通知,你将看到队列 sqs-vpc-flow-logs-elastic-serverless-forwarder 中可供 ESF 使用的消息。
如果出现诸如 SQS 消息数持续增加等问题,请检查 Lambda 执行日志 Lambda -> Application -> serverlessrepo-elastic-serverless-forwarder-ElasticServerlessForwarderApplication*
-> Monitoring -> Cloudwatch Log Insights。单击 LogStream 获取详细信息:
有关故障排除的更多信息,请参见此处。
8)在 Kibana Discover 和仪表板中验证 VPC 流日志
Kibana -> Discover 。这将显示 VPC 流日志:
Kibana -> Dashboards。查找 VPC VPC Flow log Overview 表板:
更多仪表板!
如前所述,除了其他资产外,AWS 集成还提供预构建的仪表板。我们可以使用 Elastic 代理提取方法监控我们设置中涉及的 AWS 服务,我们将在本系列的第 2 部分中介绍该方法。这将有助于跟踪使用情况并有助于优化。
结论
Elasticsearch 提供了多种选项来将数据从 AWS S3 同步到 Elasticsearch 部署中。在本演练中,我们证明了实现 Elastic Serverless Forwarder (ESF) 提取选项以从 AWS S3 提取数据并利用 Elastic 业界领先的搜索和分析功能相对容易。
在本系列的第 2 部分中,我们将深入研究使用 Elastic Agent 作为提取 AWS S3 数据的另一种选择。
你可以使用来自任何来源的数据构建搜索。查看此网络研讨会以了解 Elasticsearch 支持的不同连接器和来源。
准备好自己尝试一下了吗?开始免费试用。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
情绪稳定!别再让Git合并冲突影响你工作了
大家好,我是陈哥,今天想和大家聊聊Git合并冲突解决~ 背景 前几天,我正好收到了一位读者的留言:又又又又遇到了Git合并冲突,解决冲突比写代码还费劲,突然想起SVN的好。该怎么避免Git冲突啊? 我想,比如这样? 在我看来,Git合并冲突是不可避免的。在本文,我想和大家简单分享一下遇到Git冲突该如何解决,希望对大家有所帮助。在此之前,我们先来了解一下Git的合并冲突是什么以及合并冲突的类型有哪些。或者如果只对如何解决Git合并冲突感兴趣,也可以滑至第三部分直接阅读。 一、如何识别Git中的合并冲突? 在Git中,我们在两个不同的分支对同一个文件进行更改,特别是在同一文件的同一行尽心更改,会容易产生合并冲突。在这种情况下,Git无法自动解决这些更改之间的不一致性,它会暂停合并过程,并将冲突标记出来,等待我们手动解决。 举一个简单的合并冲突的示例: 在分支main中工作,并修改了mytext.txt文件的第1行,如Hi world。 切换到分支new-feature,然后对mytext.txt的第二行进行修改,如Hello earth。 当我们准备尝试将new-feature分支合并到...
- 下一篇
Bitmap 和 布隆过滤器傻傻分不清?你这不应该啊
大家好,我是小富~ 有个兄弟私下跟我说,他在面试狗东时,有一道面试题没回答上来:Redis 的Bitmap和布隆过滤器啥区别与关系? 其实就是考小老弟对这两种工具的底层数据结构是否了解,不算太难的题。不过,bitmap和布隆过滤器在大数据量和高并发业务的使用频率不低,知识点应该掌握下,既然问了那咱们简单的梳理下它们的底层原理、应用场景以及它们之间的关联。 Bitmap Redis中的Bitmap(位图)是一种较为特殊数据类型,它以最小单位bit来存储数据,我们知道一个字节由 8个 bit 组成,和传统数据结构用字节存储相比,这使得它在处理大量二值状态(true、false 或 0、1等只有两种状态)数据时具有极高的空间效率。不过,它不是一种全新的数据类型,其底层实现仍是基于 String 类型。 便于理解,你可以将 Bitmap 的底层结构看成是由一系列 bit 位组成的数组,在此数组中,每个位都对应一个偏移量(类似数组的下标)。通过将特定偏移量上的位值设置为 0 或 1,来表示不同的状态。 比如我们要设计一个答题游戏系统。其规则为:若用户答对全部 7 道题,则可获得大奖。 每个答题用...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6