基于Openresty+CEPH实现海量数据管理系统
「持续更新中,欢迎关注...」
1. 需求:
作为一家专注于三维高精度地图服务的公司,有海量(PB级)的原始数据、中间数据、成功数据,需要存储、管理、并定期归档。
- 按项目管理数据,数据分类航飞数据、控制点数据、中间数据、成果数据、其他数据。数据来源包括无人机数据、载荷数据、地面站数据、人工打点数据等。不同渠道汇集而来的数据。
- 采用类似百度网盘的形式,上传、下载,支持断点续传、进度跟踪。
- 支持细化到文件级别的权限控制,以及更多的文件(夹)属性。
2. 分析:
- 系统重点在于数据存储的选型,支持海量数据的存储,能够支持在复杂网络下的数据上传。选用CEPH作为数据存储,RGW对象存储,S3协议上传下载,完美支持分片和断点续传。
- 系统难点在于文件级别的业务权限控制,以及文件(夹)更多的属性支持。CEPH RGW本身支持权限控制,但是无法和业务权限做对接。对象存储本身没有文件夹的概念,无法对文件夹做分类、数量展示、大小展示。所以实现自定义索引服务,CEPH主要负责存储,自定义索引服务实现展示与查询。
3. 实现
系统重点在于海量数据上传的可靠性与海量数据索引的管理。
3.1 架构
- 上传助手就是类百度网盘的桌面端软件,采用Electron JS
)实现。主要实现功能:项目展示、上传、下载。 - 业务层包括网关服务、账号服务、项目服务、文件索引服务等。采用Java + Spring Boot + Spring Cloud技术栈。其中重点服务是文件索引服务Index Server,负责海量文件的索引维护和查询。
- 业务数据MySQL集群+Redis集群,海量文件存储使用CEPH对象存储,支持S3 API。
3.3 关键流程图
- 上传助手使用普通的Put Object请求上传文件,加上自定义的metadata字段(项目ID、用户ID等)即可完成数据的提交。
- Openresty使用proxy模式将文件请求转发到 CEPH RGW,由RGW完成后台数据存储处理。
- Openresty在RGW完成数据存储以后,调用log_by_lua_file将对应请求的用户自定义metadata和文件属性转发到后台Kafka。
- 文件索引服务(Index Server)从Kafka中消费任务,拿到每个文件的信息。
- 文件索引服务(Index Server)对文件数据按业务要求进行处理后,存入MySQL数据库。
3.4 示例代码
log_by_lua_file.lua:从Openresty获取文件信息,并发往Kafka
local cjson = require "cjson" local producer = require "resty.kafka.producer" local broker_list = { { host = "172.16.0.20", port = 9092 }, } function send_job_to_kafka() local log_json = {} local req_headers_ = ngx.req.get_headers() for k, v in pairs(req_headers_) do if k == "content-length" then log_json["contentLength"] = tostring(v) end if k == "u-id" then log_json["uId"] = tostring(v) end if k == "p-id" then log_json["pId"] = tostring(v) end end local resp_headers_ = ngx.resp.get_headers() for k, v in pairs(resp_headers_) do if k == "etag" then log_json["etag"] = string.gsub(v, "\"", "") break end end log_json["uri"] = ngx.var.uri log_json["host"] = ngx.var.host log_json["remoteAddr"] = ngx.var.remote_addr log_json["status"] = ngx.var.status local message = cjson.encode(log_json); ngx.log(ngx.ERR, "message is[", message, "]") return message end --local is_args = ngx.var.is_args local request_method = ngx.var.request_method local status_code = ngx.var.status -- 过滤Put Object成功的请求,记录相应的metadata及请求ID,并转发到kafka if request_method == "PUT" and status_code == "200" then local bp = producer:new(broker_list, { producer_type = "async" }) local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka()) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end ngx.log(ngx.ERR, "kafka send success:", ok) end
4. 总结
- 通过此架构方案,在海量文件归档过程中,将文件基本信息异步导入到业务数据库中,便于业务应用开发。
- 此架构一般也应用对象存储的多媒体文件处理,比如图片处理、视频处理、加水印、鉴黄、事件通知等。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
重磅!Apache Flink 1.11 功能前瞻抢先看!
整理 | 高赟、程鹤群Review | 王治江 Flink 1.11 版本即将正式宣告发布!为满足大家的好奇与期待,我们邀请 Flink 核心开发者对 1.11 版本的功能特性进行解读与分享。Flink 1.11 在 1.10 的基础上对许多方面进行了完善和改进,并致力于进一步提高 Flink 的可用性及性能。 本文将详细介绍 1.11 版本的新功能、改进、重要变化及未来的发展计划。更多信息可以参考相应的 FLIP 或 Jira 页面,并关注我们后续的专题直播。 集群部署与资源管理 在集群部署方面 1.[FLIP-85] Flink 支持 Application Mode 目前 Flink 是通过一个单独的客户端来创建 JobGraph 并提交作业的,在实际使用时,会产生下载作业 jar 包占用客户端机器大量带宽、需要启动单独进程(占用不受管理的资源)作为客户端等问题。为了解决这些问题,在 Flink-1.11 中提供了一种新的 Application 模式,它将 JobGraph 的生成以及作业的提交转移到 Master 节点进行。 用户可以通过 bin/flink run-appl...
- 下一篇
直播预约 | 如何为云原生应用带来稳定高效的部署能力?
作者 | 酒祝 阿里云技术专家、墨封阿里云开发工程师 本次直播为第 3 期 SIG Cloud-Provider-Alibaba 网研会,我们邀请了阿里云技术专家 酒祝和阿里云开发工程师 墨封 重点讲解《如何为云原生应用带来稳定高效的部署能力?》。 点击链接预约直播:https://yq.aliyun.com/live/2898 随着近年来 Kubernetes 逐渐成为事实标准和大量应用的云原生化,我们却往往发现 Kubernetes 的原生 workload 对大规模化应用的支持并不十分“友好”。我们已经不止一次听到来自用户和开发者们的抱怨: 发布升级会触发 Pod 删除、重建,带来的问题:1. 无法保留 IP、Volume(hostPath / emptyDir)等数据;2. 发布时间较长,一次 Pod 重建涉及重新调度、分
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Hadoop3单机部署,实现最简伪集群
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS8编译安装MySQL8.0.19
- Docker安装Oracle12C,快速搭建Oracle学习环境