首页 文章 精选 留言 我的

精选列表

搜索[快速],共10000篇文章
优秀的个人博客,低调大师

用 Go + WebSocket 快速实现一个 chat 服务

前言 在 go-zero 开源之后,非常多的用户询问是否可以支持以及什么时候支持 websocket,终于在 v1.1.6 里面我们从框架层面让 websocket 的支持落地了,下面我们就以 chat 作为一个示例来讲解如何用 go-zero 来实现一个 websocket 服务。 整体设计 我们以 zero-example 中的 chat 聊天室为例来一步步一讲解 websocket 的实现,分为如下几个部分: 多客户端接入 消息广播 客户端的及时上线下线 全双工通信【客户端本身是发送端,也是接收端】 先放一张图,大致的数据传输: 中间有个 select loop 就是整个 chat 的 engine。首先要支撑双方通信: 得有一个交流数据的管道。客户端只管从 管道 读取/输送数据; 客户端在线情况。不能说你下线了,还往你那传输数据; 数据流 数据流是 engine 的主要功能,先不急着看代码,我们先想 client 怎么接入并被 engine 感知: 首先是从前端发 websocket 请求; 建立连接;准备接收/发送通道; 注册到 engine; //HTML操作{js}if(window["WebSocket"]){ conn=newWebSocket("ws://"+document.location.host+"/ws"); conn.onclose=function(evt){varitem=document.createElement("div"); item.innerHTML="<b>Connectionclosed.</b>"; appendLog(item); }; ... }//路由engine.AddRoute(rest.Route{ Method:http.MethodGet, Path:"/ws", Handler:func(whttp.ResponseWriter,r*http.Request){ internal.ServeWs(hub,w,r) }, })//接入逻辑funcServeWs(hub*Hub,whttp.ResponseWriter,r*http.Request){//将http请求升级为websocket conn,err:=upgrader.Upgrade(w,r,nil) ...//构建client:hub{engine},con{websockerconn},send{channelbuff} client:=&Client{ hub:hub, conn:conn, send:make(chan[]byte,bufSize), } client.hub.register<-client//开始客户端双工的通信,接收和写入数据 goclient.writePump() goclient.readPump() }复制代码 这样,新接入的 client 就被加入到 注册 通道中。 hub engine 发出了 注册 的动作,engine 会怎么处理呢? typeHubstruct{ clientsmap[*Client]bool//上线clients broadcastchan[]byte//客户端发送的消息->广播给其他的客户端 registerchan*Client //注册channel,接收注册msg unregisterchan*Client //下线channel}func(h*Hub)Run(){ for{ select{//注册channel:存放到注册表中,数据流也就在这些client中发生caseclient:=<-h.register: h.clients[client]=true//下线channel:从注册表里面删除caseclient:=<-h.unregister: if_,ok:=h.clients[client];ok{ delete(h.clients,client) close(client.send) }//广播消息:发送给注册表中的client中,send接收到并显示到client上casemessage:=<-h.broadcast: forclient:=rangeh.clients{ select{ caseclient.send<-message: default: close(client.send) delete(h.clients,client) } } } } }复制代码 接收注册消息 -> 加入全局注册表 如果 engine.broadcast 接收到,会将 msg 传递给 注册表 的 client.sendChan 这样从 HTML -> client -> hub -> other client 的整个数据流就清晰了。 广播数据 上面说到 engine.broadcast 接收到数据,那从页面开始,数据是怎么发送到这? func(c*Client)readPump(){ ... for{//1 _,message,err:=c.conn.ReadMessage() iferr!=nil{ ifwebsocket.IsUnexpectedCloseError(err,websocket.CloseGoingAway,websocket.CloseAbnormalClosure){ log.Printf("error:%v",err) } break } message=bytes.TrimSpace(bytes.Replace(message,newline,space,-1))//2. c.hub.broadcast<-message } }复制代码 从 conn 中不断读取 msg【页面点击后传递】 将 msg 传入 engine.broadcast,从而广播到其他的 client 当出现发送异常或者是超时,异常退出时,会触发下线 client 同时要知道,此时发送消息的 client 不止有一个,可能会有很多个。那发送到其他client,client 从自己的 send channel 中读取消息即可: func(c*Client)writePump(){//写超时控制 ticker:=time.NewTicker(pingPeriod) ... for{ select{ casemessage,ok:=<-c.send://当接收消息写入时,延长写超时时间。 c.conn.SetWriteDeadline(time.Now().Add(writeWait)) ... w,err:=c.conn.NextWriter(websocket.TextMessage) ... w.Write(message) //依次读取send中消息,并write n:=len(c.send) fori:=0;i<n;i++{ w.Write(newline) w.Write(<-c.send) } ... case<-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) ... } } }复制代码 上面也说了,send 有来自各自客户端中发送的msg:所以当检测到 send 有数据,就不断接收消息并写入当前 client;同时当写超时,会检测websocket长连接是否还存活,存活则继续读 send chan,断开则直接返回。 完整示例代码 github.com/zeromicro/z… 总结 本篇文章从使用上介绍如何结合 go-zero 开始你的 websocket 项目,开发者可以按照自己的需求改造。 关于 go-zero 更多的设计和实现文章,可以持续关注我们。 github.com/tal-tech/go… 欢迎使用 go-zero 并 star 支持我们!

优秀的个人博客,低调大师

CakePHP 3.10.0-RC1 发布,PHP 快速开发框架

CakePHP 是一个运用了诸如 ActiveRecord、Association Data Mapping、Front Controller 和 MVC(model–view–controller) 等著名设计模式的开源 Web 框架。CakePHP 用 PHP 编写,以 Ruby on Rails 的概念为模型,并在 MIT 许可下进行分发。 CakePHP 团队宣布 CakePHP 3.10.0-RC1 马上就要发布了。这是 3.10.0 的第一个候选版本。它包含了一些新的功能。如果没有重大问题的报告,稳定版将在未来几周内打包。 3.10 包含了一些从 4.x 移植过来的功能。这些新功能和移植的目的是为了方便团队从 3.x 升级到 4.x。 扩展 3.x 的支持 此前 3.9 是 3.x 版本中计划的最后一个版本,而 bug 修复计划于 2021 年 6 月 15 日之后停止。由于人们对 3.x 的持续关注和使用,我们决定延长 3.x 的维护时间表,并发布 3.10。新的时间表是: 3.10 将继续获得 bug 修复,直到 2021 年 12 月 15 日; 3.10 也将继续获得安全修复,直到 2022 年 12 月 15 日。 更新到 RC 版本 你可以使用 composer 升级到 CakePHP 3.10.0 的测试版: php composer.phar require --update-with-dependencies "cakephp/cakephp:3.10.0-RC1" 3.10.0 中的新功能: 3.10中包含的功能简短清单如下。 改进了 API 文档; 从 4.x 版本向下移植并改进了 Validation::time(); EmailTrait::assertMailSentFrom() 现在可以接受一个包含地址和别名的数组。 更多详情可查看:https://github.com/cakephp/cakephp/releases/tag/3.10.0-RC1

优秀的个人博客,低调大师

5分钟快速梳理你的HTTP体系

HTTP 定义 HTTP(超文本传输协议) 是 客户端 与 服务端 之间信息交流的 桥梁。 在信息交流之前必须要做的就是 客户端通过连接TCP/IP协议 80 端口 ,以便 服务端侦听HTTP请求。3.HTTP 是 一种通用的 , 无状态的应用层协议,基于标准客户机/服务器模型。 HTTP 特点 1.采用 “请求/响应”的交互模式, 客户端发送请求,服务端接受请求,处理请求,并将处理结果返回给客户端。服务端不会主动发送请求。2.协议设计灵活,拓展性好,HTTP可以通过扩展新的请求方法实现新的功能。3.无状态:协议对于事务处理没有存储功能,意思就是如果上次响应的结果在该请求中需要用,那么是用不了的。缺点:每次连接的数量增大。优点:1.服务器处理速度快,效率高2.避免0了集群特点间状态同步的开销。4.持久连接:连接可以重复使用,提高了网络连接使用效率。持久连接 在HTTP1.1中已 经是默认选项。5.支持内容协商 HTTP 请求/响应交互模型 HTTP 常用请求方法 GET 方法 1.GET 方法 是 客户端 向服务端 获取资源时使用的,资源类型有图片,音频,HTML.....2.服务器在处理GET请求时,它会根据客户端发送过来的url上具体参数进行返回结果处理。3.当用GET请求获取数据量较大时,可能会出现传输过程中断情况,HTTP协议提供了断点续传机制,通过GET 方法获取资源时可以指定获取的起始点。 POST 方法 1.POST 方法主要是 客户端向服务端发送数据资源。2.POST 和 GET 方法区别:POST 请求会包含信息体,信息体中携带了要发送给服务端的数据。 HEAD 方法 HEAD 方法 和 GET 方法 POST方法类似 区别在于: GET方法返回的请求URL标识资源内容本身 HEAD方法仅仅返回相关响应头信息,不返回资源内容 3.HEAD 方法 主要用于 测试资源是否存在,是否被删除或修改 PUT 方法 PUT方法用请求有效载荷替换目标资源的所有当前表示。 DELETE DELETE方法删除指定的资源。 HTTP URI URI 1.定义 URI,通一资源标志符(Uniform Resource Identifier, URI),表示的是web上每一种可用的资源,如 HTML文档、图像、视频片段、程序等都由一个URI进行定位的。 2.URI的结构组成: ①访问资源的命名机制; ②存放资源的主机名; ③资源自身的名称。 3.实例 https://xxx.xxx.com/details/1 ①这是一个可以通过https协议访问的资源, ②位于主机 xxx.xxx.com上, ③通过“/details/1”可以对该资源进行唯一标识(注意,这个不一定是完整的路径) URI 构成 URL 统一资源定位符 统一资源名称 URL 1.定义 URL是URI的一个子集。它是Uniform Resource Locator的缩写,译为“统一资源定位 符”。 2.URL的一般格式为(带方括号[]的为可选项):protocol :// hostname[:port] / path / [;parameters][?query]#fragment 3.URL的格式由三部分组成:①第一部分是协议(或称为服务方式)。 ②第二部分是存有该资源的主机IP地址(有时也包括端口号)。 ③第三部分是主机资源的具体地址,如目录和文件名等。 第一部分和第二部分用“://”符号隔开, 第二部分和第三部分用“/”符号隔开。 第一部分和第二部分是不可缺少的,第三部分有时可以省略。 URL 和 URI 区别 URI:统一资源标志符(Uniform Resource Identifier) URL:统一资源定位符(uniform resource location) 说白了,URI与URL都是定位资源位置的,就是表示这个资源的位置信息,就像经纬度一样可以表示你在世界的哪个角落。URI是一种宽泛的含义更广的定义,而URL则是URI的一个子集,就是说URL是URI的一部分。换句话说,每个URL都是URI,但是不是每个URI都是URL的。 HTTP 发送请求 HTTP 响应请求 HTTP 状态码 100 Continue 继续。客户端应继续其请求 101 Switching Protocols 切换协议。服务器根据客户端的请求切换协议。只能切换到更高级的协议,例如,切换到HTTP的新版本协议 200 OK 请求成功。一般用于GET与POST请求 201 Created 已创建。成功请求并创建了新的资源 202 Accepted 已接受。已经接受请求,但未处理完成 203 Non-Authoritative Information 非授权信息。请求成功。但返回的meta信息不在原始的服务器,而是一个副本 204 No Content 无内容。服务器成功处理,但未返回内容。在未更新网页的情况下,可确保浏览器继续显示当前文档 205 Reset Content 重置内容。服务器处理成功,用户终端(例如:浏览器)应重置文档视图。可通过此返回码清除浏览器的表单域 206 Partial Content 部分内容。服务器成功处理了部分GET请求 300 Multiple Choices 多种选择。请求的资源可包括多个位置,相应可返回一个资源特征与地址的列表用于用户终端(例如:浏览器)选择 301 Moved Permanently 永久移动。请求的资源已被永久的移动到新URI,返回信息会包括新的URI,浏览器会自动定向到新URI。今后任何新的请求都应使用新的URI代替 302 Found 临时移动。与301类似。但资源只是临时被移动。客户端应继续使用原有URI 303 See Other 查看其它地址。与301类似。使用GET和POST请求查看 304 Not Modified 未修改。所请求的资源未修改,服务器返回此状态码时,不会返回任何资源。客户端通常会缓存访问过的资源,通过提供一个头信息指出客户端希望只返回在指定日期之后修改的资源 305 Use Proxy 使用代理。所请求的资源必须通过代理访问 306 Unused 已经被废弃的HTTP状态码 307 Temporary Redirect 临时重定向。与302类似。使用GET请求重定向 400 Bad Request 客户端请求的语法错误,服务器无法理解 401 Unauthorized 请求要求用户的身份认证 402 Payment Required 保留,将来使用 403 Forbidden 服务器理解请求客户端的请求,但是拒绝执行此请求 404 Not Found 服务器无法根据客户端的请求找到资源(网页)。通过此代码,网站设计人员可设置"您所请求的资源无法找到"的个性页面 405 Method Not Allowed 客户端请求中的方法被禁止 406 Not Acceptable 服务器无法根据客户端请求的内容特性完成请求 407 Proxy Authentication Required 请求要求代理的身份认证,与401类似,但请求者应当使用代理进行授权 408 Request Time-out 服务器等待客户端发送的请求时间过长,超时 409 Conflict 服务器完成客户端的 PUT 请求时可能返回此代码,服务器处理请求时发生了冲突 410 Gone 客户端请求的资源已经不存在。410不同于404,如果资源以前有现在被永久删除了可使用410代码,网站设计人员可通过301代码指定资 源的新位置 411 Length Required 服务器无法处理客户端发送的不带Content-Length的请求信息 412 Precondition Failed 客户端请求信息的先决条件错误 413 Request Entity Too Large 由于请求的实体过大,服务器无法处理,因此拒绝请求。为防止客户端的连续请求,服务器可能会关闭连接。如果只是服务器暂时无法处理,则会包含一个Retry-After的响应信息 414 Request-URI Too Large 请求的URI过长(URI通常为网址),服务器无法处理 415 Unsupported Media Type 服务器无法处理请求附带的媒体格式 416 Requested range not satisfiable 客户端请求的范围无效 417 Expectation Failed 服务器无法满足Expect的请求头信息 422 Conflict 表明由于所提供的的作为请求部分的数据非法,创建或修改操作不能被完成 429 TooManyRequests 表明超出了客户端访问频率的限制或者服务端接收到多于它能处理的请求。建议客户端读取相应的Retry-After 首部,然后等待该首部指出的时间后重试。 500 Internal Server Error 服务器内部错误,无法完成请求 501 Not Implemented 服务器不支持请求的功能,无法完成请求 502 Bad Gateway 作为网关或者代理工作的服务器尝试执行请求时,从远程服务器接收到了一个无效的响应 503 Service Unavailable 由于超载或系统维护,服务器暂时的无法处理客户端的请求。延时的长度可包含在服务器的Retry-After头信息中 504 Gateway Time-out 充当网关或代理的服务器,未及时从远端服务器获取请求 505 HTTP Version not supported 服务器不支持请求的HTTP协议的版本 HTTP 状态码分类 1** ------------------------------------> 信息,服务器收到请求,需要请求者继续执行 2** ------------------------------------> 成功,操作被成功接收并处理 3** ------------------------------------> 重定向,需要进一步的操作以完成请求 4** ------------------------------------> 客户端错误,请求包含语法错误或无法完成请求 5** ------------------------------------> 服务器错误,服务器在处理请求的过程中发生了错误 彩蛋环节 本文分享自微信公众号 - 前端自学社区(gh_ce69e7dba7b5)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

ProBuilder快速原型开发技术 ---模型精细化操作

前面我们讲解了很多ProBuilder功能,但是对于制作一款复杂的模型来说,还远远不够。需要更多的对于模型细节的处理,这就是本篇文章要讲解的“模型精细化操作”技术。关于PB对于模型的精细化操作,笔者分四大方面进行讲解。 一: 模型的线面批量选择在对一个复杂模型进行操作时候,往往要面对很多个面进行批量操作。这时候PB的批量选择功能就可以派上用途。常用的有如下功能:1> Select Face Loop 面循环选择操作步骤: A: 定义面操作试图 B: 选择一个面 C: 选择 Select Face Loop 功能。2> Select Face Ring 环形选择 操作步骤: 1:定义面操作试图 2:选择一个面 3:选择 Select Face Ring 功能。3> Grow Selection 成长选择 操作步骤: 1:定义面操作试图 2:选择一个面 3:选择 Grow Selection 右边的“+”号。4:取消“Restrict to Angle” 默认勾选。5:点击底部“Grow Selection”按钮。4> Select by Material 材质选择 表示选择相同材质的模型面。5> Select by Colors 颜色选择表示选择相同颜色的模型面。二: 面平滑处理面平滑处理,是PB中采用通过改变模型法线的方式,从视觉上模拟一种连续平滑面的效果。操作步骤: 1:制作一个连续不光滑的立方体,且查看其不光滑的面。2:选择需要光滑处理的连续面。3:点击PB面板的“Smoothing”功能按钮,设置为一组。 4:查看平滑处理后的连续面。三: 变换模型坐标系“模型坐标系变换“这项功能,主要是针对解决模型在旋转后,再进行各种操作时候,可能存在由于坐标系的定义不同,而导致的操作失误。以下我们定义7个步骤,演示问题与解决方式:实验步骤:1:定义一个 5 5 10 的模型。2:设置模型本身为世界坐标系。3:对模型做凹陷操作,查看模型外观。4:复制一个相同模型,也为世界坐标系。5:对模型做x与y轴的45度变换。6:同样做凹陷操作,查看模型外观的不同点。7:后者模型可以通过“Orientation” 设置局部或者法线坐标系,把错误的凹陷模型修改正确显示。四:模型分割线对于模型的精细化操作,其任意分割模型是必须的,PB中有多项具体选项可供选择。1> 模型添加“任意分割线”选择“subdivide Object” 对模型增加分割线。做进一步处理。如果当前选择一个“面”,则插入一个十字分割线。2> 插入相互垂直的分割线在模型的“Edge Selection”模式下,可以点击PB操作面板的“Insert Edge Loop” ,插入一个当前选择线,相垂直的“包扎线”,可以移动到指定方位。3> 插入一条分割线在模型的“Edge Selection”模式下,点击模型的一个边线, 点击PB操作面板的“Insert Edge Loop” ,插入与当前边沿线垂直的一条线。4> 插入一条分割线的快捷方式模型的“Edge Selection”模式下,可以通过 Alt+U 快捷方式,插入一条线。(作用同上) 本章节四大功能的灵活掌握,可以让读者灵活的开发复杂模型,例如央视大楼模型、小区楼盘模型、大型泳池模型等等。今天关于ProBuilder的功能就介绍到这,下一篇笔者讲解ProBuilder的进阶功能,敬请期待。 大家有什么学习上的问题,可以留言,欢迎一起讨论,共同进步!

优秀的个人博客,低调大师

干货 | 五千字长文带你快速入门FlinkSQL

一、前言 最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家带来关于FlinkSQL的教程,之后还会更新一些大数据实时数仓的内容,和一些热门的组件使用!希望小伙伴们能点个关注,第一时间关注技术干货! 二、FlinkSQL出现的背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。 Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。 在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点: SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法; SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划; SQL 易于理解,不同行业和领域的人都懂,学习成本较低; SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少; 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。 三、整体介绍 3.1 什么是 Table API 和 Flink SQL? Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。 Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink的SQL支持,基于实现了SQL标准的Apache Calcite(Apache开源SQL解析工具)。 无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果。 3.2 需要引入的依赖 Table API 和 SQL 需要引入的依赖有两个:planner 和 bridge <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.10.0</version> </dependency> 其中: flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner; flink-table-api-scala-bridge:bridge桥接器,主要负责table API和 DataStream/DataSet API的连接支持,按照语言分java和scala; 这里的两个依赖,是IDE环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了。 当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个SQL client,这个包含在 flink-table-common 里。 3.3 两种planner(old & blink)的区别 1、批流统一:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。 2、因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。 3、Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog。 4、旧 planner 和 Blink planner 的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推。 5、基于字符串的键值配置选项仅适用于Blink planner。 6、PlannerConfig在两个planner中的实现不同。 7、Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧 planner 的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立。 8、旧的planner不支持目录统计,而Blink planner支持。 四、API 调用 4.1 基本程序结构 Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。 具体操作流程如下: val tableEnv = ... // 创建表的执行环境 // 创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 注册一张表,用于把计算结果输出 tableEnv.connect(...).createTemporaryTable("outputTable") // 通过 Table API 查询算子,得到一张结果表 val result = tableEnv.from("inputTable").select(...) // 通过 SQL查询语句,得到一张结果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 将结果表写入输出表中 result.insertInto("outputTable") 4.2 创建表环境 创建表环境最简单的方式,就是基于流处理执行环境,调create方法直接创建: val tableEnv = StreamTableEnvironment.create(env) 表环境(TableEnvironment)是flink中集成 Table API & SQL 的核心概念。它负责: 注册catalog 在内部 catalog 中注册表 执行 SQL 查询 注册用户自定义函数 将 DataStream 或 DataSet 转换为表 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用 在创建TableEnv的时候,可以多传入一个EnvironmentSettings 或者 TableConfig 参数,可以用来配置 TableEnvironment 的一些特性。 比如,配置老版本的流式查询(Flink-Streaming-Query): val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本planner .inStreamingMode() // 流处理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings) 基于老版本的批处理环境(Flink-Batch-Query): val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv) 基于 blink 版本的流处理环境(Blink-Streaming-Query): val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 基于blink版本的批处理环境(Blink-Batch-Query): val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings) 4.3 在Catalog中注册表 4.3.1 表(Table)的概念 TableEnvironment 可以注册目录 Catalog ,并可以基于Catalog注册表。它会维护一个 Catalog-Table 表之间的map。 表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。 表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。视图可以从现有的表中创建,通常是 table API 或者SQL查询的一个结果。 4.3.2 连接到文件系统(Csv格式) 连接外部系统在Catalog中注册表,直接调用 tableEnv.connect() 就可以,里面参数要传入一个 ConnectorDescriptor ,也就是connector描述器。对于文件系统的 connector 而言,flink内部已经提供了,就叫做FileSystem()。 代码如下: tableEnv .connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接 .withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("inputTable") // 创建临时表 这是旧版本的csv格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被弃用,以后会被一个符合RFC-4180标准的新format描述器取代。新的描述器就叫Csv(),但flink没有直接提供,需要引入依赖flink-csv: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.0</version> </dependency> 代码非常类似,只需要把 withFormat 里的 OldCsv 改成Csv就可以了。 4.3.3 连接到Kafka kafka的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们可以在 connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor。 tableEnv.connect( new Kafka() .version("0.11") // 定义kafka的版本 .topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") 当然也可以连接到 ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的。感兴趣的 小伙伴可以自行去研究,这里就不详细赘述了。 4.4 表的查询 通过上面的学习,我们已经利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。 Flink给我们提供了两种查询方式:Table API和 SQL。 4.4.1 Table API的调用 Table API是集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。 Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中 select(…)表示选择表中指定的字段,filter(…)表示筛选条件。 代码中的实现如下: val sensorTable: Table = tableEnv.from("inputTable") val resultTable: Table = senorTable .select("id, temperature") .filter("id ='sensor_1'") 4.4.2 SQL查询 Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。 代码实现如下: val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'") 或者: val resultSqlTable: Table = tableEnv.sqlQuery( """ |select id, temperature |from inputTable |where id = 'sensor_1' """.stripMargin) 当然,也可以加上聚合操作,比如我们统计每个sensor温度数据出现的个数,做个count统计: val aggResultTable = sensorTable .groupBy('id) .select('id, 'id.count as 'count) SQL的实现: val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id") 这里Table API里指定的字段,前面加了一个单引号’,这是Table API中定义的Expression类型的写法,可以很方便地表示一个表中的字段。 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。以后的代码中,一般都用后一种形式。 4.5 将DataStream 转换成表 Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。 4.5.1 代码表达 代码中实现非常简单,直接用 tableEnv.fromDataStream() 就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。 代码具体如下: val inputStream: DataStream[String] = env.readTextFile("sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) val sensorTable: Table = tableEnv.fromDataStreama(datStream) val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts) 4.5.2 数据类型与 Table schema的对应 在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。 另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。 基于名称的对应: val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature) 基于位置的对应: val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts) Flink的 DataStream 和 DataSet API 支持多种类型。 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。 元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的:元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。 4.6 创建临时视图(Temporary View) 创建临时视图的第一种方式,就是直接从DataStream转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。 代码如下: tableEnv.createTemporaryView("sensorView", dataStream) tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 'timestamp as 'ts) 另外,当然还可以基于Table创建视图: tableEnv.createTemporaryView("sensorView", sensorTable) View和Table的Schema完全相同。事实上,在Table API中,可以认为View 和 Table 是等价的。 4.7 输出表 表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。 具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中。 4.7.1 输出到文件 代码如下: // 注册输出表 tableEnv.connect( new FileSystem().path("…\\resources\\out.txt") ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义格式化方法,Csv格式 .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("outputTable") // 创建临时表 resultSqlTable.insertInto("outputTable") 4.7.2 更新模式(Update Mode) 在流处理过程中,表的处理并不像传统定义的那样简单。 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(update mode)指定。 Flink Table API中的更新模式有以下三种: 追加模式(Append Mode) 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 撤回模式(Retract Mode) 在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。 其中: 插入(Insert)会被编码为添加消息; 删除(Delete)则编码为撤回消息; 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。 在此模式下,不能定义key,这一点跟upsert模式完全不同。 Upsert(更新插入)模式 在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。 这个模式需要一个唯一的key,通过这个key可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一key的属性。 插入(Insert)和更新(Update)都被编码为Upsert消息; 删除(Delete)编码为Delete信息 这种模式和 Retract 模式的主要区别在于,Update操作是用单个消息编码的,所以效率会更高。 4.7.3 输出到Kafka 除了输出到文件,也可以输出到Kafka。我们可以结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出。 代码如下: // 输出到 kafka tableEnv.connect( new Kafka() .version("0.11") .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat( new Csv() ) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable") 4.7.4 输出到ElasticSearch ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,这样就可以使用Query定义的键(key)与外部系统交换UPSERT/DELETE消息。 另外,对于“仅追加”(append-only)的查询,connector还可以在 append 模式下操作,这样就可以与外部系统只交换 insert 消息。 es目前支持的数据格式,只有Json,而 flink 本身并没有对应的支持,所以还需要引入依赖: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency> 代码实现如下: // 输出到es tableEnv.connect( new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("sensor") .documentType("temp") ) .inUpsertMode() // 指定是 Upsert 模式 .withFormat(new Json()) .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggResultTable.insertInto("esOutputTable") 4.7.5 输出到MySql Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency> jdbc连接的代码实现比较特殊,因为没有对应的java/scala类实现 ConnectorDescriptor,所以不能直接 tableEnv.connect()。不过Flink SQL留下了执行DDL的接口:tableEnv.sqlUpdate() 对于jdbc的创建表操作,天生就适合直接写DDL来实现,所以我们的代码可以这样写: // 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/test', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) aggResultSqlTable.insertInto("jdbcOutputTable") 4.7.6 将表转换成DataStream 表可以转换为DataStream或DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API或SQL查询的结果上运行了。 将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。 表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。 Table API 中表到 DataStream 有两种模式: 追加模式(Append Mode) 用于表只会被插入(Insert)操作更改的场景 撤回模式(Retract Mode) 用于任何场景。有些类似于更新模式中Retract模式,它只有 Insert 和 Delete 两类操作。 得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)。 代码实现如下: val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable) val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable) resultStream.print("result") aggResultStream.print("aggResult") 所以,没有经过groupby之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。 4.7.7 Query的解释和执行 Table API提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。 explain方法会返回一个字符串,描述三个计划: 未优化的逻辑查询计划 优化后的逻辑查询计划 实际执行计划 我们可以在代码中查看执行计划: val explaination: String = tableEnv.explain(resultTable) println(explaination) Query的解释和执行过程,老planner和 blink planner 大体是一致的,又有所不同。整体来讲,Query都会表示成一个逻辑查询计划,然后分两步解释: 优化查询计划 解释成 DataStream 或者 DataSet程序 而 Blink 版本是批流统一的,所以所有的Query,只会被解释成DataStream程序;另外在批处理环境 TableEnvironment 下,Blink版本要到 tableEnv.execute() 执行调用才开始解释。 巨人的肩膀 1、http://www.atguigu.com/ 2、https://www.bilibili.com/video/BV12k4y1z7LM?from=search&seid=953051020130358915 3、https://blog.csdn.net/u013411339/article/details/93267838 小结 本篇文章主要用五千多字,为大家带来迅速入门并掌握 FlinkSQL 的技巧,包含FlinkSQL出现的背景介绍以及与 Table API 的区别,API调用方式更是介绍的非常详细全面,希望小伙伴们在看了之后能够及时复习总结,尤其是初学者。好了,本篇文章 over,大家看了之后有任何的疑惑都可以私信作者,我看到都会一一解答。下一篇我会在本篇的基础上为大家介绍一些流处理中的特殊概念,敬请期待|ू・ω・` ),你知道的越多,你不知道的也越多,我是Alice,我们下一期见! 文章持续更新,可以微信搜一搜「 猿人菌 」第一时间阅读,思维导图,大数据书籍,大数据高频面试题,海量一线大厂面经…关注这个在大数据领域冉冉升起的新星! 本文同步分享在 博客“Alice菌”(CSDN)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册