首页 文章 精选 留言 我的

精选列表

搜索[优化],共10000篇文章
优秀的个人博客,低调大师

Node.js 新版官网开启 Beta 测试:全新现代化 UI、优化交互

Node.js 新版官网已开启 Beta 测试,体验地址:https://beta-node-js-org.vercel.app/en。 Node.js 新版官网首页 当前官网首页 https://nodejs.org/en 可以看到,与当前版本相比,新版官网的视觉效果、页面布局、展现内容都有了很大的提升,整体上更大气、更现代化。而且首页关于 Node.js 的介绍也变得更突出、描述更全面。 新版官网最大的交互变化是在首页添加了「全局搜索」入口,方便用户随时检索文档、博客、下载等信息。 其他子页面一览:

优秀的个人博客,低调大师

MySQL高可用架构探秘:主从复制剖析、切换策略、延迟优化与架构选型

MySQL高可用的基石 在分布式系统中,单机节点在发生故障时无法提供服务,这可能导致长期的服务不可用,从而影响其他节点的运作,导致的后果非常严重 为了满足服务的高可用,往往是通过节点冗余(新增相同功能的从节点),当发生故障时进行主从切换,让从节点成为新的主节点来继续提供服务 比如:MySQL的主从、Redis的主从、MQ broker的主从...思想大体类似的 作为高可用的基石——主从架构功不可没,本篇文章就来聊聊MySQL的主从的一些细节 binlog binlog作为逻辑上恢复数据的日志,是主从数据同步、数据恢复的基础 binlog分为三种格式:statement、row、mixed statement :记录写操作的SQL,语句轻量、传输快,使用该格式可能会导致数据不一致(因为从机与主机所处的环境不同,比如从机时间与主机不同时,使用now()函数) row : 记录数据的修改,数据量大、传输慢,误操作时可以恢复数据(反向操作),主从同步时数据一致 mixed :结合statement、row的优点,自动混合选择格式 大多数情况下都是选择格式为row,因为数据一致并且可以恢复数据 主从复制 往期文章中说过当收到写操作需要修改数据时,为了满足数据的一致性,会写undo log(原子性)、redo log(持久性)、binlog等日志 当主节点接收到写操作更改数据时,也需要对从节点进行数据的修改以此来达到数据一致 在主从复制数据依靠的就是binlog,大致流程分为三个阶段: 主节点dump线程监听binlog变动通知从节点 从节点使用IO线程接收binlog并将其写入本地 relay log(中继日志) 从节点使用SQL线程根据relay log恢复数据 在单机中写完日志即可提交事务响应,而在主从中根据响应阶段的不同,主从复制的方式分为多种: 同步复制:所有从节点都响应(恢复完数据)主节点才响应,性能差、数据强一致 异步复制:主节点通知完从节点就立马响应,性能最好,存在延迟有数据一致性问题 半同步复制:只要有一个从节点响应主节点就响应,一主一从下与同步复制一致,网络超时退化为异步复制 增强半同步复制:在半同步复制的基础上,主节点收到响应后才提交事务,数据一致性会比半同步好,但性能稍差 延迟复制:从节点延迟一段时间恢复数据,这样即使发生误操作也可以进行回滚数据 主从切换 当主机发生故障时需要将从机切换为主机 不同策略 一般中间件的主从切换都只能在CAP理论中满足其二,即在分区容错(P)下只能满足可靠(C)或可用(A) binlog上会记录主节点写操作的时间,从节点会维护一个 seconds_behind_master 来记录主从延迟的时间 在可靠策略下,需要等到旧的从节点完成所有的数据恢复(即seconds_behind_master为0)才成为主节点,提供写服务 在此期间只提供读服务、无法提供写服务,因此可靠策略会损失一定的可用性,取决于主从延迟的时间 在可用策略下会立即将从节点设置为新的主节点提供读写服务,某些场景下可能导致数据不一致 假设id自增,记录格式为(id,name),新增数据a,b,c 主节点已经新增(1,a),(2,b),(3,c)时宕机 从节点可能只重做数据(1,a),(2,b) 而(3,c)还在中继日志中 此时旧的从节点成为新的主节点又继续提供写服务,需要新增d,新增完d后才将中继日志的数据进行恢复 如果使用的binlog格式为statement或mixed,则会新增为(3,d)和(4,c) 如果格式为row,则会主键冲突报错,新增(3,d)后中继日志为(3,c) 在可用策略下可能导致数据不一致,使用row会提前暴露数据不一致的问题 基于GTID的主从切换 GTID 全局事务ID 格式为 server_uuid:gno server_uuid 为节点标识 gno 为事务标识(事务提交时获得,全局自增) 在进行主从切换时,每个从节点同步数据的日志偏移量都不同,一般会找最新偏移量的从节点为新的主节点(这个偏移量是需要运维去定位的) 在GTID 全局事务ID出来后,binlog中每个事务有对应的GTID则可以通过GTID自动定位偏移量,不用手动定位 主从延迟 来源 默认情况下主从复制会使用异步复制,而在主从架构下一般会使用读写分离,主机服务写操作,从机服务读操作 由于使用异步复制,主从之间的数据一致性会存在一定的延迟,物理上主从会放在同一机房中,网络通信忽略不计,成本最大的就是从机SQL线程解析日志恢复数据的过程 如果恢复数据是一些大事务时会导致很长的延迟,比如在主机上执行批量操作耗时5s,在从机上执行时也会耗时那么久(资源大概一致) 可能写完操作就会进行读操作,如果此时从库还未重做数据就会导致写完查不到的数据不一致情况 先来看看哪些情况可能会导致主从延迟太长: 业务高峰期频繁读写(高TPS),从机不仅要同步数据,还要处理读操作 处理大事务,大事务导致延迟时间太久 从机硬件配置低,导致跟不上主机IO速度 主从机器可能参数不同(缓冲池、IO参数...) 从机本身就是延迟复制 ... 当主从延迟过长时可以考虑使用方案缩短延迟: 调整redo log\bin log刷盘策略,增强IO canal监听(通知改为监听) 从机并行复制 从机并行复制借助于redo log、bin log两阶段提交时,redo log prepare阶段不会有锁冲突,可以并行执行 并行复制就是基于两阶段提交中的组提交,可以调整以下两个参数拉长组提交的时间,减慢主机写,加快从机重做数据 binlog_group_commit_sync_delay 延迟多少微秒后才调用 fsync binlog_group_commit_sync_no_delay_count 累积多少次以后才调用 fsync 数据不一致解决方案 为了避免长时间的主从延迟,从机应该和主机有相同的参数、配置,并且要避免大事务 在业务高峰期还是可能存在主从延迟导致数据不一致,需要使用一些方案进行避免: 沟通业务:等待一段时间,比如用户修改完资料后进行审核状态 强一致性的读也走主库:这样就不存在主从延迟,使用方便,大量强一致性读操作就会导致主机压力大 等待从机没延迟(三种判断方式): 比较 seconds_behind_master 是否为0,为0说明没延迟 比较主从上的位点 Master_Log_File 和 Read_Master_Log_Pos(主库的最新位点)Relay_Master_Log_File 和 Exec_Master_Log_Pos(备库执行的最新位点)判断是否相同,相同则没延迟 比较从机上GTID集合 Retrieved_Gtid_Set 和 Executed_Gtid_Set (备库收到的所有日志的 GTID 集合 和 备库所有已经执行完成的 GTID 集合)是否相同,相同则没延迟 这个方案粒度大(实际上只需要判断事务是否重做,这里是一直判断是否有延迟),如果高峰期一直有延迟就会一直等待判断,不使用 修改主从复制方式为同步复制:数据强一致性,性能差 修改主从复制方式为半同步复制:一主一从下与同步复制相同,一主多从下查询不确定,需要判断该事务是否已重做 方案5需要做到细粒度的判断事务是否在从机上已经重做,有两种方式且实现较为复杂 判断偏移量 select master_pos_wait(file, pos,[timeout]) 用于判断当前偏移量是否已经超过该位置 file 为 binlog 文件,pos 为 偏移量,timeout为等待的时间 使用半同步复制时,一个从节点已经响应,其他从节点应该也是快要响应的状态,因此可以等待一段时间 50ms,100ms... 如果超时则可以在业务中再去查主机,要注意如果都超时就相当于又全打在主机上 通过该SQL能够以主库日志中偏移量的方式判断是否已执行该事务(已执行返回0): 写操作完成时顺便获取binlog文件和偏移量的信息 携带这两个参数加上超时时间使用该SQL判断是否已执行 如果返回0(已执行)则查从机,否则查主机(注意限流) 判断GTID 判断GTID的思路与上面相似 select wait_for_executed_gtid_set(gtid_set, [timeout]) SQL的作用是判断是否已经执行GTID集合 返回0,超时返回1 流程类似: 写操作时获取GTID集合 根据GTID集合判断从机是否已执行事务 已执行查询,否则查主库或限流 主从架构 由于binlog的数据复制,主从架构可以非常丰富,想怎么搭就怎么搭 一主一从:主负责写,从负责读,读写压力平分 一主多从:主负责写,从负责读,适合读多于写 双主热备:两个节点互为主从,读写压力平分,但存在循环同步的问题 当AB节点互为主从时,A收到写请求,要把bin log给B重做,B重做完(相当于写请求)又会把bin log给A重做,这样就会导致循环同步数据 在同步数据时携带节点的id(server id)解决循环同步问题 A收到写请求,binlog给B并携带自己的id,B重做完又把binlog给A,A发现binlog上server id是自己则不进行重做 总结 本篇文章以MySQL高可用为起点,聊到MySQL中的主从复制、切换、延迟、架构等 binlog的statement格式记录SQL,数据量小、传输快,但可能导致数据不一致 binlog的row格式记录修改数据,数据量大,传输慢,可以修复误操作数据 binlog的mixed混用statement、row的优点,在可用策略的主从切换还是会导致数据不一致 主从复制时主机dump线程监听binlog变更通知从机拉取,从机io线程将日志写入realy log中继日志,再由sql线程解析日志重做数据 同步复制需要所有从机响应,拥有强一致性,但性能最差 默认的异步复制性能最好,但可能延迟时会有数据一致性 半同步复制只需要一个从机响应,多从下性能好于同步复制,网络超时会使用异步复制 增强的半同步复制会在从机响应时才提交事务,相比于半同步复制一致性略好 延迟复制可以让从机延迟一段时间重做数据,误操作数据可以恢复 主从切换时只能满足CAP中其二,满足可靠会导致一段时间不可写,满足可用可能会出现数据不一致 把从机参数、配置调整为主机相同,避免使用大事务可以避免主从延迟太长 当主从延迟太长可以通过调整从机IO参数增强IO能力 发生主从延迟的数据一致性问题时: 沟通业务,能否使用审核等中间状态,等延迟过了再查看 强制走主机,注意压力可能太大 使用同步复制,性能差 使用半同步复制,一主多从下需要判断事务是否执行(偏移量/GTID),实现困难 常用的主从架构有:一主一从、一主多从、双主热备(通过server id解决循环同步问题)... 最后(不要白嫖,一键三连求求拉~) 本篇文章被收入专栏 MySQL进阶之路,感兴趣的同学可以持续关注喔 本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~ 有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~ 关注菜菜,分享更多干货,公众号:菜菜的后端私房菜 本文由博客一文多发平台 OpenWrite 发布!

优秀的个人博客,低调大师

Kafka基本原理、生产问题总结及性能优化实践 | 京东云技术团队

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。(分布式的流处理平台) Kafka的使用场景 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。 消息系统:解耦和生产者和消费者、缓存消息等。 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 Kafka基本概念 kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确并没有完全遵循JMS规范。 首先,让我们来看一下基础的消息(Message)相关术语: 名称 解释 Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic Producer 消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端 ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 因此,从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer来进行消费,如下图:集群架构,broker就相当于服务端,处理消息的节点;producer将消息push,consumer将消息pull 依赖第三方的zk,类似nameserver;topic类似消息队列,最终存放的是在topic下面的partition里面。 服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。 kafka基本使用(2.4) https://kafka.apache.org/documentation/ 安装前的环境准备 由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。 yum install java-1.8.0-openjdk* -y kafka依赖zookeeper,所以需要先安装zookeeper wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz cd apache-zookeeper-3.5.8-bin cp conf/zoo_sample.cfg conf/zoo.cfg # 启动zookeeper bin/zkServer.sh start bin/zkCli.sh ls / #查看zk的根目录相关节点 查看zk的目录树: 第一步:下载安装包 下载2.4.1 release版本,并解压: wgethttps://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz # 2.11是scala的版本,2.4.1是kafka的版本 tar-xvfkafka_2.11-2.4.1.tgz cdkafka_2.11-2.4.1 第二步:修改配置 修改配置文件config/server.properties: #broker.id属性在kafka集群中必须要是唯一 broker.id=0 #kafka部署的机器ip和提供服务的端口号 listeners=PLAINTEXT://192.168.65.60:9092 #kafka的消息存储文件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=192.168.65.60:2181 第三步:启动服务 现在来启动kafka服务: 启动脚本语法:kafka-server-start.sh [-daemon] server.properties 可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts) # 启动kafka,运行日志在logs目录的server.log文件里 bin/kafka-server-start.sh-daemonconfig/server.properties #后台启动,不会打印日志到控制台 或者用 bin/kafka-server-start.sh config/server.properties & # 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树 bin/zkCli.sh ls / #查看zk的根目录kafka相关节点 ls /brokers/ids #查看kafka节点 # 停止kafka bin/kafka-server-stop.sh kafka启动的时候,会把自己的信息注册到zk里面去 server.properties核心配置详解: Property Default Description broker.id 0 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。 log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 listeners PLAINTEXT://192.168.65.60:9092 server接受客户端连接的端口,ip配置kafka本机ip即可 zookeeper.connect localhost:2181 zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3 log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。 num.partitions 1 创建topic的默认分区数 default.replication.factor 1 自动创建topic的默认副本数量,建议设置为大于等于2 min.insync.replicas 1 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 delete.topic.enable false 是否允许删除主题 第四步:创建主题 现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1: sh bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic --partitions 3 --replication-factor 1 现在我们可以通过以下命令来查看kafka中目前存在的topic,主题的元数据在zk中,具体的消息在kafka中保存 bin/kafka-topics.sh --list --zookeeper 192.168.65.60:2181 除了我们通过手工的方式创建Topic,当producer发布一个消息到某个指定的Topic,这个Topic如果不存在,就自动创建。 删除主题 bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.65.60:2181 第五步:发送消息 kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。 首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容: bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092 --topic test >this is a msg >this is a another msg 第六步:消费消息 对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息: bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --topic test 如果想要消费之前的消息可以通过--from-beginning参数指定,如下命令: bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test 如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。 以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。 消费多主题 bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --whitelist "test|test-2 单播消费 一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可 分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息 同一个消费组中,同一条消息,只能有一个消费者消费消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup --topic test 多播消费 一条消息能被多个消费者消费的模式,类似publish-subscribe模式,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup-2 --topic test 查看消费组名 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --list 查看消费组的消费偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --describe --group testGroup current-offset:当前消费组的已消费偏移量 log-end-offset:主题对应分区消息的结束偏移量(HW) lag:当前消费组未消费的消息数 以组为单位保存消费的偏移量,组来决定消费情况 kafka消息消费完不会删除,消息是存储在文件里面,消息会根据offset来进行逐个消费,还可以指定偏移量进行消费。每一个消费者,会维护自己的偏移量。在某一个地方存上一次消费的偏移量。 消费情况和组绑定,不同的消费者只影响当前的组 主题Topic和消息日志Log 可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区(Partition)日志文件: Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。 为什么要多个分区? 真正大数据的系统,会有很多很多消息的,不会主动删除(最多可以保存7天),这里面会放很多的消息,如果一个分区,放在一起,存在一个磁盘同一台机器,肯定存不下,所以这个思想就是分布式存储,把一个主题划分很多分区,一个分区在一个文件里面,放在不同的机器上面去,达到分布式存储的效率。 不同的分区可以由不同的消费者消费,同时也提高了消费的能力。 每个partition,都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。 kafka一般不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,默认保留最近一周的日志消息。 针对业务场景,设置这个值。 kafka的性能与保留的消息数据量大小没有关系,因此保存大量的数据消息日志信息不会有什么影响。 每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息,或者跳过某些消息。 这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer来说,都是没有影响的,因为每个consumer维护各自的消费offset。 创建多个分区的主题: bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 2 --topic test1 查看下topic的情况 bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test1 以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。 leader节点负责给定partition的所有读写请求。 replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。 isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。 我们可以运行相同的命令查看之前创建的名称为”test“的topic bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test 之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。 消息日志文件的存储 可以进入kafka的数据文件存储目录查看我们创建的主题的消息日志文件:cd /tmpkafka-logs/ 消息日志文件主要存放在分区文件夹里的以log结尾的日志文件里,如下是test_topic-0主题对应的分区0的消息日志: 分区的扩容 消费者消费肯定会发生变化,reblance机制 当然我们也可以通过如下命令增加topic的分区数量(目前kafka不支持减少分区): bin/kafka-topics.sh -alter --partitions 3 --zookeeper 192.168.65.60:2181 --topic test 那么分区扩容之后,消费者会不会发生变化呢?肯定会触发它rebalance机制。 可以这么来理解Topic,Partition和Broker 一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。 为什么要对Topic下数据进行分区存储? 1、commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据。 2、为了提高并行度。 kafka集群实战 对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。 所以kafka的集群,主要是根据分区来的,所以它单台机器也叫集群。 要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。注册到同一个zk上面。 首先,我们需要建立好其他2个broker的配置文件: cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties 配置文件的需要修改的内容分别如下: config/server-1.properties: #broker.id属性在kafka集群中必须要是唯一 broker.id=1 #kafka部署的机器ip和提供服务的端口号 listeners=PLAINTEXT://192.168.65.60:9093 log.dir=/usr/local/data/kafka-logs-1 #kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同 zookeeper.connect=192.168.65.60:2181 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://192.168.65.60:9094 log.dir=/usr/local/data/kafka-logs-2 zookeeper.connect=192.168.65.60:2181 目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可: bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties 查看zookeeper确认集群节点是否都注册成功: 现在我们创建一个新的topic,副本数设置为3,分区数设置为2: bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic 查看下topic的情况 bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic my-replicated-topic kafka一启动,都会注册到zk的对应的节点下面去 以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。 leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾) leader处理收发数据,其他的节点是不可以写的,会同步给其他的从节点。 replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。 isr 是replicas的一个子集(已同步的副本集),它只列出当前还存活着的,并且已同步备份了该partition的节点。 容灾 把每一个分区放到不同的broker上去,如果有一个节点挂了,那么其他的还可以继续写数据。 写消息都是往leader上面写 副本被分配到三台机器上 现在我们向新建的 my-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点: bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 --topic my-replicated-topic >my test msg 1 >my test msg 2 现在开始消费: bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 --from-beginning --topic my-replicated-topic my test msg 1 my test msg 2 现在我们来测试我们容错性,因为broker1目前是my-replicated-topic的分区0的leader,所以我们要将其kill ps -ef | grep server.properties kill 14776 现在再执行命令: bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:9092 --topic my-replicated-topic 我们可以看到,分区0的leader节点已经变成了broker 0。要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR(in-sync replica)中进行的。 此时,我们依然可以 消费新消息: bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 --from-beginning --topic my-replicated-topic my test msg 1 my test msg 2 查看主题分区对应的leader信息: kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。 集群消费 log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量。 针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。 Producers 生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。 Consumers 传统的消息传递模式有2种:队列( queue) 和(publish-subscribe) queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。 publish-subscribe模式:消息会被广播给所有的consumer。 Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。 queue模式:所有的consumer都位于同一个consumer group 下。 publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。 上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, A有2个consumer instances ,B有4个。 通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logical subscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。 消费顺序 一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。 consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。 Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。 如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。 Java客户端访问Kafka 引入maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> 消息发送端代码 package com.tuling.kafka.kafkaDemo; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class MsgProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094"); /* 发出消息持久化机制参数 (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。 (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一 条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证 只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。 */ /*props.put(ProducerConfig.ACKS_CONFIG, "1"); *//* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在 接收者那边做好消息接收的幂等性处理 *//* props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); *//* kafka本地线程会从缓冲区取数据,批量发送到broker, 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去 *//* props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); *//* 默认值是0,意思就是消息必须立即被发送,但这样会影响性能 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去 如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长 *//* props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/ //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); int msgNum = 5; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); //指定发送分区 /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/ //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order)); //等待消息发送成功的同步阻塞方法 /*RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());*/ //异步回调方式发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } countDownLatch.countDown(); } }); } countDownLatch.await(5, TimeUnit.SECONDS); producer.close(); } } 作者:京东零售 王雷 来源:京东云开发者社区 转载请注明来源

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册