使用Nginx做页面采集, Kafka收集到对应Topic
0.架构简介
模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka
来实现,其中Ngx_kafka_module 是开源的专门用来对接Nginx和Kafka
的一个组件。
1.需求描述
1.1 用html
和jquery
模拟用户请求日志
其中包括下面下面几项:
用户id:user_id, 访问时间:act_time, 操作: (action,包括click,job_collect,cv_send,cv_upload)
企业编码job_code
1.2 用Nginx接受1.1中的请求
1.3 接受完请求后,使用ngx_kafka_module将数据发送到Kafka的主题tp_individual 中。
1.4 在kafka中使用一个消费者消费该主题,观察
2.搭建步骤
2.1 Kafka安装
由于使用现成的已安装好的docker-kafka镜像,所以直接启动即可.
2.2 安装Nginx,并启动
$ cd /usr/local/src $ git clone git@github.com:edenhill/librdkafka.git # 进入到librdkafka,然后进行编译 $ cd librdkafka $ yum install -y gcc gcc-c++ pcre-devel zlib-devel $ ./configure $ make && make install $ yum -y install make zlib-devel gcc-c++ libtool openssl openssl-devel $ cd /opt/hoult/software # 1.下载 $ wget http://nginx.org/download/nginx-1.18.0.tar.gz # 2.解压 $ tar -zxf nginx-1.18.0.tar.gz -C /opt/hoult/servers # 3. 下载模块源码 $ cd /opt/hoult/software $ git clone git@github.com:brg-liuwei/ngx_kafka_module.git # 4. 编译 $ cd /opt/hoult/servers/nginx-1.18.0 $ ./configure --add-module=/opt/hoult/software/ngx_kafka_module/ $ make && make install # 5.删除Nginx安装包 $ rm /opt/hoult/software/nginx-1.18.0.tar.gz # 6.启动nginx $ cd /opt/hoult/servers/nginx-1.18.0 $ nginx
3.相关配置
3.1 nginx配置nginx.conf
#pid logs/nginx.pid; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; kafka; kafka_broker_list linux121:9092; server { listen 9090; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; #------------kafka相关配置开始------------ location = /kafka/log { #跨域相关配置 add_header 'Access-Control-Allow-Origin' $http_origin; add_header 'Access-Control-Allow-Credentials' 'true'; add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS'; kafka_topic tp_individual; } #error_page 404 /404.html; } }
3.2 启动kafka 生产者和消费者
# 创建topic kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic tp_individual --partitions 1 --replication-factor 1 # 创建消费者 kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic tp_individual --from-beginning # 创建生产者测试 kafka-console-producer.sh --broker-list linux121:9092 --topic tp_individual
3.3 编写Html + Jquery代码
<!DOCTYPE html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1,shrink-to-fit=no"> <title>index</title> <!-- jquery cdn, 可换其他 --> <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.js"></script> </head> <body> <input id="click" type="button" value="点击" onclick="operate('click')" /> <input id="collect" type="button" value="收藏" onclick="operate('job_collect')" /> <input id="send" type="button" value="投简历" onclick="operate('cv_send')" /> <input id="upload" type="button" value="上传简历" onclick="operate('cv_upload')" /> </body> <script> function operate(action) { var json = {'user_id': 'u_donald', 'act_time': current().toString(), 'action': action, 'job_code': 'donald'}; $.ajax({ url:"http://linux121:8437/kafka/log", type:"POST" , crossDomain: true, data: JSON.stringify(json), // 下面这句话允许跨域的cookie访问 xhrFields: { withCredentials: true }, success:function (data, status, xhr) { // console.log("操作成功:'" + action) }, error:function (err) { // console.log(err.responseText); } }); }; function current() { var d = new Date(), str = ''; str += d.getFullYear() + '-'; str += d.getMonth() + 1 + '-'; str += d.getDate() + ' '; str += d.getHours() + ':'; str += d.getMinutes() + ':'; str += d.getSeconds(); return str; } </script> </html>
将a.html
放在nginx的目录下,浏览器访问192.168.18.128:9090
4.演示
4.1 首先启动zk集群,kafka集群
4.2 然后创建topic, 创建消费者,创建生产者,测试topic
4.3 启动nginx访问页面,进行点击,观察消费者状态
整个过程如下图:
大数据开发,更多关注查看个人资料

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
算法-一个经典sql 题和一个Java算法题
1.sql题描述 话说有一个日志表,只有两列,分别是连续id和num 至于啥意思,把它当金额把。现在想知道连续次数3次及以上的num,数据如下 id num 1 1 2 1 3 1 4 2 5 3 6 4 7 4 8 4 那么结果只有1,4满足条件,问这个sql该怎么写? 2.思路和解法 分析:题目简单,没有歧义,能看得懂,像连续几次的这种问题一定是用到窗口函数,首先想到的是排名row_number 然后lag 怎么体现连续呢,肯定是需要用到一个排序的id,由于题目给了id是连续递增的,可以省去row_number了 所以第一步,上lag,结果就是如下: id num lagid 1 1 null 2 1 0 3 1 0 4 2 1 5 3 1 6 4 1 7 4 0 8 4 0 得到lagid后,连续怎么用呢,首先只有为0的才满足条件,所以可以做一个筛选,结果就如下表去掉xxx的,下面观察0的行,怎么区分3 行的 0 和 7行的 0呢,想到使用新分组,rid 这样就把lagid 相同,num相同的排序,最后再加一列,id-rid 相同的分为一组 id num lagid rid gi...
- 下一篇
Hadoop和spark为什么要对key进行排序
1.思考 只要对hadoop中mapreduce的原理清楚的都熟知下面的整个流程运行原理,其中涉及到至少三次排序,分别是溢写快速排序,溢写归并排序,reduce拉取归并排序,而且排序是默认的,即天然排序的,那么为什么要这么做的,设计原因是什么。先给个结论,为了整体更稳定,输出满足多数需求,前者体现在不是采用hashShuffle而是sortShuffle ,后者体现在预计算,要知道排序后的数据,在后续数据使用时的会方便很多,比如体现索引的地方,如reduce拉取数据时候。 2.MapReduce原理分析 在分析设计原因之前,先理解一下整个过程,在map阶段,根据预先定义的partition规则进行分区,map首先将输出写到缓存中,当缓存内容达到阈值时,将结果spill到硬盘,每一次spill都会在硬盘产生一个spill文件,因此一个map task可能会产生多个spill文件,其中在每次spill的时候会对key进行排序。接下来进入shuffle阶段,当map写出最后一个输出,需要在map端进行一次merge操作,按照partition和partition内的key进行归并排序(合并+...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS关闭SELinux安全模块
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Hadoop3单机部署,实现最简伪集群