首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

实时音视频入门学习:开源工程WebRTC的技术原理和使用浅析

本文由ELab技术团队分享,原题“浅谈WebRTC技术原理与应用”,有修订和改动。 1、基本介绍 WebRTC(全称 Web Real-Time Communication),即网页即时通信。 是一个支持网页浏览器进行实时语音对话或视频对话的技术方案。从前端技术开发的视角来看,是一组可调用的API标准。 在WebRTC发布之前,开发实时音视频交互应用的成本是非常昂贵,需要考虑的技术问题很多,如音视频的编解码问题,数据传输问题,延时、丢包、抖动、回音的处理和消除等,如果要兼容浏览器端的实时音视频通信,还需要额外安装插件。 2010年5月:Google以6820万美元收购VoIP软件开发商Global IP Solutions的GIPS引擎,并改为名为“WebRTC”(见《了不起的WebRTC:生态日趋完善,或将实时音视频技术白菜化》)。旨在建立一个互联网浏览器间的实时通信的平台,让 WebRTC技术成为 H5标准之一。 2012年1月:谷歌已经把这款软件集成到Chrome浏览器中,Opera初步集成WebRTC。 2013年 6月:Mozilla Firefox[5]发布22.0版本正式集成及支持WebRTC。 2017年11月:W3C WebRTC 1.0 草案正式定稿。 2021年1月:WebRTC 被 W3C 和 IETF 发布为正式标准(见《WebRTC 1.0: Real-Time Communication Between Browsers》)。 2、重要意义 WebRTC的出现、发展和被业内标准组织(如W3C)等普遍认可,对于当下和未来大前端技术发展具有重要的意义。 降低在web端的音视频交互开发门槛: 1)以往的音视频交互开发对于Web开发者而言具有一定技术门槛; 2)现在借助于WebRTC,Web开发者通过调用JS接口,可快速的实现音视频交互应用。 避免依赖、插件造成的次生问题: 1)以往的音视频交互应用构建依赖于各种插件、软件和服务器等; 2)现在借助于主流浏览器即可形成端到端的音视频交互。 统一化和标准化对传统音视频交互环境差异性的规避: 1)以往音视频交互需要面对不同的 NAT 、防火墙对媒体 P2P 的建立带来了很大的挑战; 2)现在WebRTC 中有P2P 打洞的开源项目 libjingle ,支持 STUN,TURN 等协议。 更高效优化的算法、技术对于音视频交互性能的提升: 1)WebRTC 通过NACK、FEC技术,避免了经过服务端路由中转,减少了延迟和带宽消耗; 2)还有 TCC + SVC + PACER + JitterBuffer 等技术对于音视频流畅性进行了优化。 3、技术特征 WebRTC内容丰富,主要的技术特征包含以下几点。 1)实时通讯: WebRTC是一项实时通讯技术,允许网络应用或者站点,在不借助中间媒介的情况下,建立浏览器之间点对点(Peer-to-Peer)的连接,实现视频流和(或)音频流或者其他任意数据的传输。 2)无依赖/插件: WebRTC包含的这些标准使用户在无需安装任何插件或者第三方的软件的情况下,创建点对点(Peer-to-Peer)的数据分享和电话会议成为可能。 3)协议栈 众多: WebRTC并不是单一的协议,包含了媒体、加密、传输层等在内的多个协议标准以及一套基于 JavaScript的 API,它包括了音视频的采集、编解码、网络传输、显示等功能。通过简单易用的 JavaScript API ,在不安装任何插件的情况下,让浏览器拥有了 P2P音视频和数据分享的能力。 WebRTC依赖众多协议栈图: 同时WebRTC 并不是一个孤立的协议,它拥有灵活的信令,可以便捷的对接现有的SIP 和电话网络的系统。 4、兼容覆盖 目前大部分主流浏览器都正常兼容WebRTC: ▲ 上图引用自《WebRTC实时音视频技术的整体架构介绍》 更详细的浏览器及版本兼容情况,可以看看下图: ▲ 上图引用自《https://caniuse.com/rtcpeerconnection》 主流浏览器都支持 WebRTC 标准 API ,因此也让浏览器之间无插件化的音视频互通成为可能, 大大降低了音视频开发的门槛,开发者只需要调用 WebRTC API 即可快速构建出音视频应用。 5、技术框架 如下图所示:的技术框架描述了WebRTC的核心内容和面向不同开发者的API设计。 WebRTC技术框架图: ▲ 上图引用自《零基础入门:基于开源WebRTC,从0到1实现实时音视频聊天功能》 从图中可看到,WebRTC主要面向三类开发者的API设计: 1)对于Web开发者的API:框架包含了基于JavaScript 、 经过W3C认证了的一套API标准,使得web开发者可以基于这套API开发基于WebRTC的即时通讯应用; 2)对于浏览器厂商的API:框架同样包含了基于C++的底层WebRTC接口,对于浏览器厂商底层的接入十分友好; 3)浏览器厂商可自定义的部分:框架中还包含浏览器厂商可自定义的音视频截取等扩展部分。 6、技术核心 从上节框架中可以看到,WebRTC主要有音频、视频引擎和传输三部分组成,其中又包含众多的协议和方法等。 1)Voice Engine(音频引擎): a、Voice Engine包含iSAC/iLBC Codec(音频编解码器,前者是针对宽带和超宽带,后者是针对窄带); b、NetEQ for voice(处理网络抖动和语音包丢失); c、Echo Canceler(回声消除器)/ Noise Reduction(噪声抑制)。 2)Video Engine(视频引擎): a、VP8 Codec(视频图像编解码器); b、Video jitter buffer(视频抖动缓冲器,处理视频抖动和视频信息包丢失); c、Image enhancements(图像质量增强)。 3)Transport。 7、技术原理 7.1 基本情况 WebRTC主要的技术特征: 1)SRTP:安全的实时传输协议,用于音视频流传输; 2)Multiplexing:多路复用; 3)P2P:STUN+TURN+ICE,用于NAT网络和防火墙穿越; 4)DTLS:安全传输可能还会用到DTLS(数据报安全传输),用于加密传输和密钥协商; 5)UDP:整个WebRTC通信是基于UDP的。 限于篇幅,本文以下章节将不细致介绍音视频采集、编码和处理等内容,仅介绍实时通讯的建立过程原理的核心内容。 7.2 公网IP映射:明确网络定位信息 WebRTC是基于浏览器端到端的连接(P2P)实现的. 由于不需要服务器中转,所以获取连接对象的网络地址的方式,是借助于ICE、STUN、TURN等辅助内网穿透技术(NAT)得到对应主机的公网网络地址和端口等网络定位信息。 明确网络定位是建立端与端直接通讯的基础。 NAT穿透原理图: STUN服务器用于辅助内网穿透得到对应主机的公网网络地址和端口信息图: ▲ 上图引用自《WebRTC实时音视频技术的整体架构介绍》 7.3 信令服务器:网络协商与信息交换 信令服务器的作用是基于双工通信来中转信息。 中转信息包括公网IP映射后的网络定位信息,比如:公网IP、端口和媒体数据流等。 概念图: 信令服务器信息交互过程图: 7.4 会话描述协议SDP:统一的媒体协商方式 SDP的作用: 1)不同端/浏览器对于媒体流数据的编码格式各异,如VP8、VP9等,参与会话的各个成员的能力不对等、用户环境与配置不一致等; 2)WebRTC通讯还需要确定和交换本地和远程音频和视频媒体信息,例如分辨率和编解码器功能。交换媒体配置信息的信令通过使用会话描述协议 (SDP) 交换Offer和Anwser来进行; 3)SDP的交换一定是先于音视频流交换的。其内容包括会话基本信息、媒体信息描述等。 //SDP的结构体 Session description(会话级别描述) v= (protocol version) o= (originator and session identifier) s= (session name) c=* (connection information -- not required ifincluded inall media) One or moreTime descriptions ("t="and "r="lines; see below) a=* (zero or moresession attribute lines) Zero or moreMedia descriptions Time description t= (timethe session is active) Media description(媒体级别描述), ifpresent m= (media name and transport address) c=* (connection information -- optional ifincluded at session level) a=* (zero or moremedia attribute lines) 一个SDP例子如下: v=0 //代表版本,目前一般是`v=0`. o=- 3883943731 1 IN IP4 127.0.0.1 s= t=0 0 //会话处于活动状态的时间 a=group:BUNDLE audio video //:描述服务质量,传输层复用相关信息 m=audio 1 RTP/SAVPF103 104 0 8 106 105 13 126 //... a=ssrc:2223794119 label:H4fjnMzxy3dPIgQ7HxuCTLb4wLLLeRHnFxh81 7.5 一对一连接建立过程 以建立一对一的Web RTC连接过程为例来简要讲解。 一对一过程图: 简要过程图: 如上图所示,解释一下: 1)交换SDP,获取各自媒体配置信息; 2)STUN服务器交换网络地址和端口等网络信息; 3)Turn中转音视频媒体流数据。 工作流程图: 如上图所示,解释一下: 1)A和B双方先调用 getUserMedia 打开本地摄像头,作为本地待输出媒体流; 2)向信令服务器发送加入房间请求; 3)Peer B 接收到 Peer A 发送的 offer SDP 对象,并通过PeerConnection的SetLocalDescription方法保存 Answer SDP 对象并将它通过信令服务器发送给 Peer A; 4)在 SDP 信息的 offer/answer 流程中,Peer A 和 Peer B 已经根据 SDP 信息创建好相应的音频 Channel 和视频 Channel,并开启Candidate 数据的收集,Candidate数据(本地IP地址、公网IP地址、Relay服务端分配的地址); 5)当 Peer A 收集到 Candidate 信息后通过信令服务器发送给 Peer B。同样的过程 Peer B 对 Peer A 也会再发送一次。 7.6 多对多的建立 多对多建立点到点连接概念图,以三个用户点对点的连接为例: 7.7 WebRTC的主要JavaScrip接口 getUserMedia():访问数据流,例如来自用户的相机和麦克风 //请求媒体类型 const constraints = { video: true audio:true }; const video = document.querySelector('video'); //挂载流到相应dom展示本地媒体流 function handleSuccess(stream) { video.srcObject = stream; } function handleError(error) { console.error('getUserMedia error: ', error); } //利用摄像头捕获多媒体流 navigator.mediaDevices.getUserMedia(constraints). then(handleSuccess).catch(handleError); RTCPeerConnection:通过加密和带宽管理工具启用音频或视频通话 // 允许 RTC 服务器配置。 const server = { "iceServers": [{ "urls": "stun:stun.stunprotocol.org"}] }; // 创建本地连接 const localPeerConnection = newRTCPeerConnection(servers); // 收集Candidate 数据 localPeerConnection.onicecandidate=function(event){ ... } // 监听到媒体流接入时的操作 localPeerConnection.ontack=function(event){ ... } RTCDataChannel:支持通用数据的点对点通信,常用于数据点到点的传输 const pc = newRTCPeerConnection(); const dc = pc.createDataChannel("my channel"); //接受数据 dc.onmessage = function(event) { console.log("received: "+ event.data); }; //打开传输 dc.onopen = function() { console.log("datachannel open"); }; //关闭传输 dc.onclose = function() { console.log("datachannel close"); }; 8、应用案例 这里以WebRTC的多人视频案例为实践来大致演示一下。 8.1 设计框架 多人视频基本框架图: 8.2 关键代码 8.2.1)媒体捕获: 获取浏览器视频权限,捕获本地视频媒体流,在Video元素中附加媒体流,显示本地视频结果。代码如下。 //摄像头兼容性处理 navigator.getUserMedia = ( navigator.getUserMedia || navigator.webkitGetUserMedia || navigator.mozGetUserMedia || navigator.msGetUserMedia); // 获取本地音频和视频流 navigator.mediaDevices.getUserMedia({ "audio": false, "video": true }).then( (stream)=> { //显示自己的输出流,挂到页面Video元素上 document.getElementById("myVido").srcObject=stream }) 捕获本地视频媒体流的显示结果截图: 为每个新的客户端连接创建RTCPeerConnection对象: // stun和turn服务器 const iceServer = { "iceServers": [{ urls:"stun:stun.l.google.com:19302" }] }; //为点到点的连接创建RTCPeerConnection const peerRTCConn=newRTCPeerConnection(iceServer); 8.2.2)网络协商: 主要任务就是:创建对等连接,收集ICE候选,等待媒体流接入时挂载到dom。 交互式连通性建立(Interactive Connectivity Establishment — ICE)是一个允许实时对等端发现对方并且彼此连接的框架。此技术允许对等方发现有关彼此拓扑的足够信息,从而有可能在彼此之间找到一条或多条通信路径。ICE 代理负责:收集本地IP,端口元组候选、在同级之间执行连接检查和发送连接保持活动。(关于ICE的介绍,见《P2P技术之STUN、TURN、ICE详解》) // 发送ICE候选到其他客户端 peerRTCConn.onicecandidate = function(event){ if(event.candidate) { //向信令服务器转发收集到的ICE候选 socket.send(JSON.stringify({ "event": "relayICECandidate", "data": { 'iceCandidate': { 'sdpMLineIndex': event.candidate.sdpMLineIndex, 'candidate': event.candidate.candidate } }, "fromID":signalMsg['data']['peerId'] })); } } //有媒体流介入就挂载dom peerRTCConn.ontrack=function(event){ let v=document.createElement("video") v.autoplay=true v.style="width:200px" document.getElementById("peer").appendChild(v) v.srcObject=event.streams[0] } 8.1.3)媒体协商: 发起时创建Offer。peer利用setLocalDescription方法将会话信息加到RTCPeerConnection(),并由信令服务器中转。其他Peer会返回相应的Answer。SDP过程: //新加入节点发起offer if(canOffer){ peerRTCConn.createOffer( function(localDescription) { peerRTCConn.setLocalDescription(localDescription, function() { //发送描述信息给信令服务器 socket.send(JSON.stringify({ "event":"relaySessionDescription", "data":localDescription, "fromID":peerId })) }, function() { alert("offer failed"); } ); }, function(error) { console.log("error sending offer: ", error); } ) } 响应时创建Answer。会话描述包括音视频信息等内容,当发起者向响应者发出offer类型的描述后,响应者会返回answer类型的描述: //创建Answer会话 peer.createAnswer( function(_remoteDescription) { peer.setLocalDescription(_remoteDescription, function() { //发送描述信息给信令服务器 socket.send(JSON.stringify({ "event":"relaySessionDescription", "data":_remoteDescription, "callerID":signalMsg['fromId'], "fromID":signalMsg['fromId'] })) }, function() { alert("answer failed"); } ); }, function(error) { console.log("error creating answer: ", error); }); 当收到ICE候选共享后,会把ICE候选添加到远程对等点描述中: //对应的RTCPeerConnection const peer = peers[signalMsg["fromID"]]; //ICE候选添加到远程对等点描述 peer.addIceCandidate(newRTCIceCandidate(signalMsg["data"].iceCandidate)); 多人视频结果截图<本地模拟效果>: 8.2.4)信令中转: 信令服务部分关键代码: wss.on('connection', function(ws) { ws.on('message', function(message) { let meeageObj=JSON.parse(message) //交换ICE候选 if (meeageObj['event'] =='relayICECandidate') { wss.clients.forEach(function (client) { console.log("send iceCandidate") client.send(JSON.stringify({ "event": "iceCandidate", "data": meeageObj['data'], "fromID": meeageObj['fromID'] })); }); } //交换SDP if (meeageObj['event'] =='relaySessionDescription') { console.log(meeageObj["fromID"],meeageObj["data"].type) wss.clients.forEach(function(client) { if(client!=ws) { client.send(JSON.stringify({ "event": "sessionDescription", "fromId":meeageObj["fromID"], "data": meeageObj["data"], })); } }); } }) }) 9、小结一下 WebRTC的优点主要是: 1)方便:对于用户来说,在WebRTC出现之前想要进行实时通信就需要安装插件和客户端,但是对于很多用户来说,插件的下载、软件的安装和更新这些操作是复杂而且容易出现问题的,现在WebRTC技术内置于浏览器中,用户不需要使用任何插件或者软件就能通过浏览器来实现实时通信。对于开发者来说,在Google将WebRTC开源之前,浏览器之间实现通信的技术是掌握在大企业手中,这项技术的开发是一个很困难的任务,现在开发者使用简单的HTML标签和JavaScript API就能够实现Web音/视频通信的功能。 2)免费:虽然WebRTC技术已经较为成熟,其集成了最佳的音/视频引擎,十分先进的codec,但是Google对于这些技术不收取任何费用。 3)强大的打洞能力:WebRTC技术包含了使用STUN、ICE、TURN、RTP-over-TCP的关键NAT和防火墙穿透技术,并支持代理。 WebRTC的缺点主要是: 1)缺乏服务器方案的设计和部署。 2)传输质量难以保证。WebRTC的传输设计基于P2P,难以保障传输质量,优化手段也有限,只能做一些端到端的优化,难以应对复杂的互联网环境。比如对跨地区、跨运营商、低带宽、高丢包等场景下的传输质量基本是靠天吃饭,而这恰恰是国内互联网应用的典型场景。 3)WebRTC比较适合一对一的单聊,虽然功能上可以扩展实现群聊,但是没有针对群聊,特别是超大群聊进行任何优化。 4)设备端适配,如回声、录音失败等问题层出不穷。这一点在安卓设备上尤为突出。由于安卓设备厂商众多,每个厂商都会在标准的安卓框架上进行定制化,导致很多可用性问题(访问麦克风失败)和质量问题(如回声、啸叫)。 5)对Native开发支持不够。WebRTC顾名思义,主要面向Web应用,虽然也可以用于Native开发,但是由于涉及到的领域知识(音视频采集、处理、编解码、实时传输等)较多,整个框架设计比较复杂,API粒度也比较细,导致连工程项目的编译都不是一件容易的事。 10、参考资料 [1]开源实时音视频技术WebRTC的现状 [2]简述开源实时音视频技术WebRTC的优缺点 [3]访谈WebRTC标准之父:WebRTC的过去、现在和未来 [4]良心分享:WebRTC 零基础开发者教程(中文)[附件下载] [5]WebRTC实时音视频技术的整体架构介绍 [6]新手入门:到底什么是WebRTC服务器,以及它是如何联接通话的? [7]WebRTC实时音视频技术基础:基本架构和协议栈 [8]浅谈开发实时视频直播平台的技术要点 [9]基于开源WebRTC开发实时音视频靠谱吗?第3方SDK有哪些? [10]开源实时音视频技术WebRTC在Windows下的简明编译教程 [11]网页端实时音视频技术WebRTC:看起来很美,但离生产应用还有多少坑要填? [12]了不起的WebRTC:生态日趋完善,或将实时音视频技术白菜化 [13]零基础入门:基于开源WebRTC,从0到1实现实时音视频聊天功能 [14]P2P技术详解(一):NAT详解——详细原理、P2P简介 [15]P2P技术详解(二):P2P中的NAT穿越(打洞)方案详解(基本原理篇) (本文已同步发布于:http://www.52im.net/thread-3804-1-1.html)

优秀的个人博客,低调大师

【Java技术探索】深入学习JIT编译器实现机制(原理篇)

前提概要 解释器 Java程序最初是通过解释器(Interpreter)进行解释执行的,当虚拟机发现某个方法或代码块的运行特别频繁的时候,就会把这些代码认定为“热点代码”(hotspot code)。正因为如此,我们的hotspot的虚拟机就是因此而得名。 解释器优点 (占用空间较少)解释执行占用更小的内存空间。 (启动和首次执行速度较快)当程序需要迅速启动的时候,解释器可以首先发挥作用,省去了编译的时间,立即执行。 (提高动态性和移植性)当处于程序的动态效果下,如果预先编译好所有相关的静态本地代码后,就无法实现动态化扩展,以及提高移植到其他计算机平台架构下的能力 编译器 为了提高热点代码的执行效率,在运行时,即时编译器(Just In Time Compiler,下文称 JIT编译器 )会把这些代码编译成与本地平台相关的机器码,并进行各种层次的优化。 编译器优点 (提高运行速度)在程序运行时,随着时间的推移,编译器逐渐发挥作用,把越来越多的代码编译成本地代码之后,可以获得更高的执行效率。 (逆转优化)同时,当编译器进行的激进优化失败的时候,还可以进行逆优化来恢复到解释执行的状态。 因此,整个虚拟机执行架构中,解释器与编译器经常配合工作,如下图所示。 解释器与编译器并存的架构(流程) 如果Java程序需要迅速启动和执行时,或者只是执行一次,解释器可首先发挥作用,省去编译时间,立即执行程序运行后,随着时间推移,JIT编译器逐渐发挥作用,把越来越多的代码编译成本地代码后,可获取更高执行效率。 程序运行环境中内存资源限制较大(如部分嵌入式系统中),可使用解释执行节约内存,反之可使用JIT编译执行提升效率 解释器还可作为JIT编译器激进优化时的一个“逃生门”,让编译器根据概率选择一些大多数时候都能提升运行速度的优化手段,当激进优化的假设不成立时可通过逆优化(Deoptimization)退回到解释状态继续执行 故,在整个虚拟机执行架构中解释器与编译器经常配合工作 Xint设置:用户可以使用参数 -Xint 强制虚拟机运行于 “解释模式”(Interpreted Mode),这时候编译器完全不介入工作。 -Xcomp设置:强制虚拟机运行于 “编译模式”(Compiled Mode),这时候将优先采用编译方式执行,但是解释器仍然要在编译无法进行的情况下接入执行过程。 -Xmixed设置:这种配合使用的方式称为“混合模式”(Mixed Mode), 通过虚拟机 -version 命令可以查看当前默认的运行模式。 即时编译器(JIT编译器) JIT编译器不是虚拟机的必需部分,但JIT编译器编译性能的好坏、代码优化程度的高低是衡量一款商用虚拟机优秀与否的最关键的指标之一,也是虚拟机中最核心且最能体现虚拟机技术水平的部分。 被编译对象和触发条件 在运行过程中会被即时编译的“热点代码”有两类,即: 编译的目标对象 被多次调用的方法 编译器会将整个方法作为编译对象,这也是标准的JIT 编译方式 被多次执行的循环体 由循环体出发的,但是编译器依然会以整个方法作为编译对象,因为发生在方法执行过程中,称为栈上替换。 判断热点代码 「判断一段代码是否是热点代码,是不是需要出发即时编译」,这样的行为称为热点探测(Hot Spot Detection),探测算法有两种,分别为。 基于采样的热点探测(Sample Based Hot Spot Detection) 虚拟机会周期的对各个线程栈顶进行检查,如果某些方法经常出现在栈顶,这个方法就是“热点方法”。 优点:实现简单、高效,很容易获取方法调用关系。 缺点:很难确认方法的reduce(衰减),容易受到线程阻塞或其他外因扰乱。 基于计数器的热点探测(Counter Based Hot Spot Detection) 为每个方法(甚至是代码块)建立计数器,执行次数超过阈值就认为是“热点方法”。 优点:统计结果精确严谨。 缺点:实现麻烦,不能直接获取方法的调用关系。 HotSpot使用的是第二种-基于技术其的热点探测,并且有两类计数器: 方法调用计数器(Invocation Counter ) 回边计数器(Back Edge Counter ) 两个即时编译器 从上面的解释器和编译器的协同合作架构图中,应该可以了解到,JVM虚拟机实现了两个不同的JIT编译器,分别称为 Client Compiler和 Server Compiler ,或者简称为 C1 编译器和 C2 编译器。 热点触发的阈值 这两个计数器都有一个确定的阈值,超过后便会触发JIT编译,具体细节和内容下面会详细讲述。 上面提到了一下两种热点探测的计数器: 方法调用计数器(Invocation Counter ) 首先是方法调用计数器: Client模式下默认阈值是1500 次。 Server 模式下是 10000次。 这个阈值可以通过 -XX:CompileThreshold 来人为设定。 如果不做任何设置,方法调用计数器统计的并不是方法被调用的绝对次数,而是一个相对的执行频率,即一段时间之内的方法被调用的次数。(可以理解为滑动窗口)。 当超过一定的时间限度,如果方法的调用次数仍然不足以让它提交给即时编译器编译,那么这个方法的调用计数器就会被减少一半,这个过程称为方法调用计数器热度的衰减(Counter Decay),而这段时间就成为此方法的统计的半衰期( Counter Half Life Time)。 进行热度衰减的动作是在虚拟机进行垃圾收集时顺便进行的,可以使用虚拟机参数 -XX:CounterHalfLifeTime 参数设置半衰周期的时间 (时间窗口秒),单位是秒。整个 JIT 编译的交互过程如下图。 回边计数器(Back Edge Counter ) 作用是统计一个方法中循环体代码执行的次数,在字节码中遇到控制流向后跳转的指令称为“回边”( Back Edge )。 显然,建立回边计数器统计的目的就是为了触发 OSR 编译。关于这个计数器的阈值, HotSpot 提供了 -XX:BackEdgeThreshold 供用户设置。 但是当前的虚拟机实际上使用了 -XX:OnStackReplacePercentage 来简介调整阈值,计算公式如下: Client模式, 公式为方法调用计数器阈值(CompileThreshold)X OSR 比率(OnStackReplacePercentage)/100 。其中OSR比率默认为933,那么,回边计数器的阈值为13995。 Server模式,公式为方法调用计数器阈值(Compile Threashold)X (OSR (OnStackReplacePercentage)- 解释器监控比率 (InterpreterProfilePercent))/100 其中onStackReplacePercentage 默认值为 140,InterpreterProfilePercentage 默认值为 33,如果都取默认值,那么 Server 模式虚拟机回边计数器阈值为 10700 。 编译过程 默认情况下,无论是方法调用产生的即时编译请求,还是OSR请求,虚拟机在代码编译器还未完成之前,都仍然将按照解释方式继续执行,而编译动作则在后台的编译线程中进行。 用户可以通过参数 -XX:-BackgroundCompilation来禁止后台编译,这样,一旦达到 JIT 的编译条件,执行线程向虚拟机提交便已请求之后便会一直等待,直到编译过程完成后再开始执行编译器输出的本地代码。 虚拟机运行模式 目前的HotSpot编译器默认的是解释器和其中一个即时编译器配合的方式工作,具体是哪一个编译器,取决于虚拟机运行的模式,HotSpot虚拟机会根据自身版本与计算机的硬件性能自动选择运行模式,用户也可以使用 -client 和 -server 参数强制指定虚拟机运行在 Client 模式或者 Server 模式。 Client Compiler(了解即可) : 它是一个简单快速的三段式编译器,主要关注点在于局部的优化,放弃了许多耗时较长的全局优化手段。 第一阶段,一个平台独立的前端将字节码构造成一种高级中间代码表示(High-Level Intermediate Representaion , HIR)。在此之前,编译器会在字节码上完成一部分基础优化,如 方法内联,常量传播等优化。 第二阶段,一个平台相关的后端从 HIR 中产生低级中间代码表示(Low-Level Intermediate Representation ,LIR),而在此之前会在 HIR 上完成另外一些优化,如空值检查消除,范围检查消除等,让HIR 更为高效。 第三阶段,在平台相关的后端使用线性扫描算法(Linear Scan Register Allocation)在 LIR 上分配寄存器,做窥孔(Peephole)优化,然后产生机器码。 Server Compiler(了解即可): 专门面向服务端典型应用并为服务端性能配置特别调整过的编译器也是一个充分优化过的高级编译器,几乎能达到GNU C++编译器使用-02参数时的优化强度会执行所有经典的优化动作。 无用代码消除(Dead Code Elimination)、 循环展开(LoopcUnrolling)、 循环表达式外提(Loop Expression Hoisting)、 消除公共子表达式(Common Subexpression Elimination)、 常量传播(Constant Propagation)、 基本块重排序(Basic Block Reordering)等 还会实施一些与Java语言特性密切相关的优化技术,如 范围检查消除(Range Check Elimination)、 空值检查消除(Null Check Elimination)等 还可能根据解释器或Client Compiler提供的性能监控信息,进行一些不稳定的激进优化,如 守护内联(Guarded Inlining)、 分支频率预测(Branch Frequency Prediction)等 Server Compiler的寄存器分配器是一个全局图着色分配器,它可充分利用某些处理器架构(如RISC)上的大寄存器集合 编译速度远超传统静态优化编译器,相对Client Compiler代码质量有所提高,可减少本地代码执行时间,从而抵消额外的编译时间开销 如何从外部观察即时编译器的编译过程和编译结果? -XX:+PrintCompilation 在即时编译时,打印被编译成本地代码的方法名称 -XX:+PrintInlining 在即时编译时,输出方法内联信息 -XX:+PrintAssembly 在即时编译时,打印被编译方法的汇编代码,虚拟机需安装反汇编适配器HSDIS插件,Product版虚拟机需加入参数-XX:+UnlockDiagnosticVMOptions打开虚拟机诊断模式 -XX:+PrintOptoAssembly 用于Server VM,输出比较接近最终结果的中间代码表示,不需HSDIS插件支持 -XX:+PrintLIR 用于Client VM,输出比较接近最终结果的中间代码表示,不需HSDIS插件支持 -XX:+PrintCFGToFile 用于Client Compiler,将编译过程中各阶段数据(如,字节码、HIR生成、LIR生成、寄存器分配过程、本地代码生成等)输出到文件中 -XX:PrintIdealGraphFile 用于Server Compiler,将编译过程中各阶段数据(如,字节码、HIR生成、LIR生成、寄存器分配过程、本地代码生成等)输出到文件中 注,要输出CFG或IdealGraph文件,需Debug或FastDebug版虚拟机支持,Product版的虚拟机无法输出这些文件

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习十二(物理计划的执行)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见: https://my.oschina.net/u/3374539/blog/5035628 这一章主要记录一下物理计划是怎样执行的。 在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。 对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。 对于一个过滤而言,会在物理计划中产生对应的信息,展示如下: select * from myMeasurement where fieldKey like 'value1'; input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } } 接下来看物理计划的执行代码: pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> { match plan.output_partitioning().partition_count() { 0 => Ok(vec![]), //单一块的时候直接取出数据 1 => { let it = plan.execute(0).await?; common::collect(it).await } //多个数据块的时候就需要进行合并数据 _ => { let plan = MergeExec::new(plan.clone()); assert_eq!(1, plan.output_partitioning().partition_count()); //这里分为了两步execute 和 collect common::collect(plan.execute(0).await?).await } } } 接下来看plan.execute方法: async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { 。。。省略 tokio::spawn(async move { //这里的input就代表了上面展示的filter的input或者是数据的input let mut stream = match input.execute(part_i).await { Err(e) => { let arrow_error = ArrowError::ExternalError(Box::new(e)); sender.send(Err(arrow_error)).await.ok(); return; } Ok(stream) => stream, }; //计划执行完成之后返回一个stream,这里就是一直next获取完 while let Some(item) = stream.next().await { sender.send(item).await.ok(); } }); 。。。省略 } 上面的input代表了以下这么多东西: 上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。 Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。 姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。 async fn execute( &self, partition: usize, ) -> datafusion::error::Result<SendableRecordBatchStream> { //因为在前面物理计划中得到了所有列,这里拿出列的名字 let fields = self.schema.fields(); let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>(); //多个分区的时候可以根据分区号拿出chunk信息 let ChunkInfo { chunk, chunk_table_schema, } = &self.chunk_and_infos[partition]; //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空 let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection = Selection::Some(&selection_cols); //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。 let stream = chunk .read_filter(&self.table_name, &self.predicate, selection) .map_err(|e| { DataFusionError::Execution(format!( "Error creating scan for table {} chunk {}: {}", self.table_name, chunk.id(), e )) })?; //这里使用SchemaAdapterStream的结构来填充空值列 let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema)) .map_err(|e| DataFusionError::Internal(e.to_string()))?; Ok(Box::pin(adapter)) } 这个SchemaAdapterStream在代码中给了一个特别形象的描述: /// /// ┌────────────────┐ ┌─────────────────────────┐ /// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │ /// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │ /// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │ /// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │ /// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │ /// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │ /// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │ /// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │ /// │ │ │ │ /// │ Record Batch │ │ Record Batch │ /// └────────────────┘ └─────────────────────────┘ /// 接下来看如何实现数据查找的: fn read_filter( &self, table_name: &str, predicate: &Predicate, selection: Selection<'_>, ) -> Result<SendableRecordBatchStream, Self::Error> { //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile match self { //还是在写入阶段的buffer,暂时不支持查询条件 Self::MutableBuffer { chunk, .. } => { if !predicate.is_empty() { return InternalPredicateNotSupported { predicate: predicate.clone(), } .fail(); } let batch = chunk .read_filter(table_name, selection) .context(MutableBufferChunk)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } //不可写阶段的buffer,对数据进行过滤 Self::ReadBuffer { chunk, .. } => { let rb_predicate = to_read_buffer_predicate(&predicate).context(PredicateConversion)?; //读取数据并过滤 let read_results = chunk .read_filter(table_name, rb_predicate, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //读取schema信息并过滤 let schema = chunk .read_filter_table_schema(table_name, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //ReadFilterResultsStream是对不同的chunk类型实现的读取接口 Ok(Box::pin(ReadFilterResultsStream::new( read_results, schema.into(), ))) } //Parquet同理 Self::ParquetFile { chunk, .. } => chunk .read_filter(table_name, predicate, selection) .context(ParquetFileChunkError { chunk_id: chunk.id(), }), } } 数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。 数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。 好了查询就先写到这里。 祝玩儿的开心!! 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习十(查询主流程)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一篇粗略的总结了写入的基本流程,详情见: https://my.oschina.net/u/3374539/blog/5033469 这一篇记录一下查询的主要流程。 在第六章中,写了一个查询示例,如下: let mut query = flight::Client::new(connection) .perform_query("databaseName", "select * from myMeasurement") .await .expect("query request should work"); 其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行可以找到,如下: async fn do_get( &self, //这个Ticket里就是保存的perform_query方法中封装的json数据 request: Request<Ticket>, ) -> Result<Response<Self::DoGetStream>, tonic::Status> { //这里就是把json还原回来 let ticket = request.into_inner(); let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { ticket: ticket.ticket, })?; //反序列化成了ReadInfo结构 let read_info: ReadInfo = serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; //拿到客户端设置的数据库名字 let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?; //从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去 //这里就是创建数据库的时候写入到内存里的 //同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha let db = self.server.db(&database).context(DatabaseNotFound { database_name: &read_info.database_name, })?; //这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章 let executor = db.executor(); //这里是创建出sql语句对应的physical_plan,后面再看 let physical_plan = Planner::new(Arc::clone(&executor)) .sql(db, &read_info.sql_query) .await .context(Planning)?; //使用线程异步的执行查询 let results = executor //复制一下执行时候需要用到的信息 .new_context() //真正的去执行 .collect(Arc::clone(&physical_plan)) .await .map_err(|e| Box::new(e) as _) .context(Query { database_name: &read_info.database_name, })?; //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。 //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中 let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = physical_plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)]; //上面得到的结果集,这里进行遍历,封装为要返回的数据结构 let mut batches: Vec<Result<FlightData, tonic::Status>> = results .iter() //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的 //因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据 .map(optimize_record_batch) .collect::<Result<Vec<_>, Error>>()? .iter() //这里就是一条一条的把数据序列化到缓冲区里 .flat_map(|batch| { let (flight_dictionaries, flight_batch) = arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); //把数据包装在Result中 flight_dictionaries .into_iter() .chain(std::iter::once(flight_batch)) .map(Ok) }) .collect(); //前面是schema,后面是数据 flights.append(&mut batches); //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu? let output = futures::stream::iter(flights); //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。 Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } 这里基本上是整个查询的主逻辑: 异步的将sql转换为plan。 异步的去执行plan并返回结果和结果所对应的schema信息。 将返回的arrow数据封装到flights格式中。 通过Grpc返回 这一篇就到这里吧,下几章准备记录一下: sql是怎么被执行的 查询中都经历了什么 等等。。。 祝玩儿的开心。 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习五(创建数据库)

欢迎关注公众号: 上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654 这章记录一下Database create命令的执行过程。 在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand。 enum Command { Convert { // 省略 ...}, Meta {// 省略 ...}, Database(commands::database::Config), Run(Box<commands::run::Config>), Stats(commands::stats::Config), Server(commands::server::Config), Writer(commands::writer::Config), Operation(commands::operations::Config), } 这章我们打开看一眼commands::database下的config包含了什么。 pub struct Config { #[structopt(subcommand)] command: Command, } //见名知意,基本猜测一下就行了,慢慢使用到再回来看 enum Command { Create(Create), List(List), Get(Get), Write(Write), Query(Query), Chunk(chunk::Config), Partition(partition::Config), } 先来看一下create命令的执行。 Command::Create(command) => { //创建一个grpc的client let mut client = management::Client::new(connection); //设置基本的配置项 let rules = DatabaseRules { //数据库名字 name: command.name, //内存的各种配置,包含缓存大小,时间等等 lifecycle_rules: Some(LifecycleRules { //省略。。 }), //设置分区的策略 partition_template: Some(PartitionTemplate { //省略。。 }), //其它都填充default ..Default::default() }; //使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法 client.create_database(rules).await?; println!("Ok"); } 在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。 有关tonic更多的资料请阅读:https://github.com/hyperium/tonic #[tonic::async_trait] impl<M> management_service_server::ManagementService for ManagementService<M> where M: ConnectionManager + Send + Sync + Debug + 'static, { //省略其它方法。。。 async fn create_database( &self, //这里就是接收CreateDatabaseRequest的请求 request: Request<CreateDatabaseRequest>, ) -> Result<Response<CreateDatabaseResponse>, Status> { //对数据进行一下校验,然后获得在上面配置的rules规则 let rules: DatabaseRules = request .into_inner() .rules .ok_or_else(|| FieldViolation::required("")) .and_then(TryInto::try_into) .map_err(|e| e.scope("rules"))?; //这里就是在第三章中提到的server_id,如果没配置就会报错了 let server_id = match self.server.require_id().ok() { Some(id) => id, None => return Err(NotFound::default().into()), }; //这里就是真正的去创建,在下面继续跟踪 match self.server.create_database(rules, server_id).await { Ok(_) => Ok(Response::new(CreateDatabaseResponse {})), Err(Error::DatabaseAlreadyExists { db_name }) => { return Err(AlreadyExists { resource_type: "database".to_string(), resource_name: db_name, ..Default::default() } .into()) } Err(e) => Err(default_server_error_handler(e)), } } } 接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的? pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> { //检查server_id self.require_id()?; //把数据库名字存储到内存中,最终保存到一个btreemap中 let db_reservation = self.config.create_db(rules)?; //对数据进行持久化保存 self.persist_database_rules(db_reservation.rules().clone()) .await?; //启动数据库后台线程,在内存中写入数据库状态 db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec)); Ok(()) } 来解答上面的疑问,文件是怎样持久化、格式是什么样子的。 pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> { //生成一个新的数据库路径 let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); //序列化DatabaseRules这个pb到byte流 let mut data = BytesMut::new(); rules.encode(&mut data).context(ErrorSerializing)?; let len = data.len(); let stream_data = std::io::Result::Ok(data.freeze()); //将pb的内容进行存储 self.store .put( &location, futures::stream::once(async move { stream_data }), Some(len), ) .await .context(StoreError)?; Ok(()) } 这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules. 到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 + --writer-id + 数据库名字。 例如,我的启动和创建命令为: ./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/ ./influxdb_iox database create test 那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下: $ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \ < ~/influxtest/1/test/rules.pb influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused. name: "test" partition_template { parts { time: "%Y-%m-%d %H:00:00" } } lifecycle_rules { mutable_linger_seconds: 300 mutable_size_threshold: 10485760 buffer_size_soft: 52428800 buffer_size_hard: 104857600 sort_order { order: ORDER_ASC created_at_time { } } } 看到这里已经知道整个生成过程及文件内容。 祝玩儿的开心。

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习四(Run命令的执行)

欢迎关注公众号: 上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858 这章记录一下Run命令的执行过程。 //根据用户在命令行配置的num_threads参数 //来选择创建一个多线程的模型,还是current_thread的模型 //后面有时间深入研究tokio的时候再来分析有什么异同 let tokio_runtime = get_runtime(config.num_threads)?; //block_on会让线程一直等待方法里的future执行完成 //这是让闭包中的方法占有了io driver 和 timer context tokio_runtime.block_on(async move { let host = config.host; match config.command { // 省略其它command ... Command::Run(config) => { //具体去子类型里执行,然后await一个结果 if let Err(e) = commands::run::command(logging_level, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } } } }); 在influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracing、CancellationToken等。 //初始化对象存储 let object_store = ObjectStore::try_from(&config)?; //可以看到,目前已经支持了 //1.内存(在container环境运行时候使用) //2.Google //3.S3 //4.Azure //5.File 本地文件,方便开发者调试运行在云上时候的文件变化 fn try_from(config: &Config) -> Result<Self, Self::Error> { match config.object_store { Some(ObjStoreOpt::Memory) | None => { //创建一个btreemap用来缓存或者搜索 Ok(Self::new_in_memory(object_store::memory::InMemory::new())) } Some(ObjStoreOpt::Google) => { // 省略 } Some(ObjStoreOpt::S3) => { // 省略 } Some(ObjStoreOpt::Azure) => { // 省略 } Some(ObjStoreOpt::File) => match config.database_directory.as_ref() { Some(db_dir) => { //去递归创建这个配置路径中的文件夹 //context也是使用的snafu来处理错误的 fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectory { path: db_dir })?; //都创建完成,并且没出错误,把路径保存起来 Ok(Self::new_file(object_store::disk::File::new(&db_dir))) } // 如果database_directory这个参数没有配置的时候 //使用snafu这个crate来返回一个错误 None => MissingObjectStoreConfig { object_store: ObjStoreOpt::File, missing: "data-dir", } .fail(), }, } } 关于错误处理的代码: #[snafu(display("Unable to create database directory {:?}: {}", path, source))] CreatingDatabaseDirectory { path: PathBuf, source: std::io::Error, }, #[snafu(display( "Specified {} for the object store, required configuration missing for {}", object_store, missing ))] MissingObjectStoreConfig { object_store: ObjStoreOpt, missing: String, }, 我们来测试一下错误的场景,来看看是否符合代码的预期。 // 不传入路径 cargo run run --object-store file Finished dev [unoptimized + debuginfo] target(s) in 0.42s Running `./influxdb_iox run --object-store file` Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Specified File for the object store, required configuration missing for data-dir //传入一个创建不了的路径 cargo run run --object-store file --data-dir /root/1/1 Finished dev [unoptimized + debuginfo] target(s) in 0.47s Running `./influxdb_iox run --object-store file --data-dir /root/1/1` Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30) 可以看到是符合预期的,bingo //创建一个空的结构体 let connection_manager = ConnectionManager {}; //创建AppServer结构体用来保存基本的信息 //server_config里就是保存的对象存储的信息及线程配置 //如果num_worker_threads没有填写,默认就使用cpu数量 let app_server = Arc::new(AppServer::new(connection_manager, server_config)); //不设置这个writer_id能启动,但是不能做任何操作 if let Some(id) = config.writer_id { //compare and set 一个非0的数值,错误就打印一个指定的panic app_server.set_id(id).expect("writer id already set"); //校验所有的配置 if let Err(e) = app_server.load_database_configs().await { error!( "unable to load database configurations from object storage: {}", e ) } } else { warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); } 接下来进入load_database_configs方法看看, let list_result = self .store //把write_id和配置的文件路径组合一下,作为一个目录 //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹 //用Vec存下所有的文件信息,包括路径、修改时间、大小等 .list_with_delimiter(&self.root_path()?) .await .context(StoreError)?; //拿到配置的server的write_id let server_id = self.require_id()?; let handles: Vec<_> = list_result //配置的文件夹下的所有文件夹 .common_prefixes .into_iter() //全部进行map转换 .map(|mut path| { let store = Arc::clone(&self.store); let config = Arc::clone(&self.config); let exec = Arc::clone(&self.exec); //先找database的相关信息文件,名字叫rules.pb path.set_file_name(DB_RULES_FILE_NAME); //感觉是需要io来读取文件内容,所以开一个异步 tokio::task::spawn(async move { let mut res = get_store_bytes(&path, &store).await; //省略错误处理。。 let res = res.unwrap().freeze(); //解析文件内容,根据文件名可以看出是个pb文件。 match DatabaseRules::decode(res) { Err(e) => { //省略错误。。 } //根据解析出来的文件内容,在内存中恢复回来db的相关信息 Ok(rules) => match config.create_db(rules) { Err(e) => error!("error adding database to config: {}", e), //提交一个后台任务,用来不断的检测chunks的状态 //比如达到了某个大小,然后写入到存储等 Ok(handle) => handle.commit(server_id, store, exec), }, } }) }) .collect(); //等待所有任务完成 futures::future::join_all(handles).await; 这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。 接下来就是启动连接相关的了。 //从启动命令行中读取grpc的地址 let grpc_bind_addr = config.grpc_bind_address; //绑定这个地址 let socket = tokio::net::TcpListener::bind(grpc_bind_addr) .await .context(StartListeningGrpc { grpc_bind_addr })?; //真正的协议启动 let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //同样的启动http相关的服务,使用的hyper库 let bind_addr = config.http_bind_address; let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?; let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //省略后面的停止流程。。。 然后看grpc的启动的服务 //启动起来健康检查的服务 let stream = TcpListenerStream::new(socket); let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); //标识相对应的服务已经是可以提供服务的状态了 let services = [ generated_types::STORAGE_SERVICE, generated_types::IOX_TESTING_SERVICE, generated_types::ARROW_SERVICE, ]; for service in &services { health_reporter .set_service_status(service, tonic_health::ServingStatus::Serving) .await; } //增加一堆使用grpc的服务,并启动起来 tonic::transport::Server::builder() .add_service(health_service) .add_service(testing::make_server()) .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(Arc::clone(&server))) .add_service(write::make_server(Arc::clone(&server))) .add_service(management::make_server(Arc::clone(&server))) .add_service(operations::make_server(server)) .serve_with_incoming_shutdown(stream, shutdown.cancelled()) .await 然后是http相关的启动 pub async fn serve<M>( addr: AddrIncoming, server: Arc<AppServer<M>>, shutdown: CancellationToken, ) -> Result<(), hyper::Error> where M: ConnectionManager + Send + Sync + Debug + 'static, { //初始化路由相关的信息 let router = router(server); let service = RouterService::new(router).unwrap(); //启动服务 hyper::Server::builder(addr) .serve(service) .with_graceful_shutdown(shutdown.cancelled()) .await } 顺便看一下都提供了哪些地址可以被访问的: Router::builder() .data(server) //写了一个拦截,打印请求参数和返回结果 .middleware(Middleware::pre(|req| async move { debug!(request = ?req, "Processing request"); Ok(req) })) .middleware(Middleware::post(|res| async move { debug!(response = ?res, "Successfully processed request"); Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::<M>) .get("/health", health) .get("/metrics", handle_metrics) .get("/iox/api/v1/databases/:name/query", query::<M>) .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>) .get("/api/v1/partitions", list_partitions::<M>) .post("/api/v1/snapshot", snapshot_partition::<M>) //错误的时候调用的处理拦截 .err_handler_with_info(error_handler) .build() .unwrap() 做一个/health的测试: curl localhost:8080/health OK% 可以看到成功返回了值。 到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如Panics,Log等等吧,欢迎持续关注。 祝玩儿的开心

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册