Flink读取Kafka数据Sink到MySQL和HBase数据库
Flink将流数据Sink到数据库,一般需要自己自定义Sink的实现。下面示例,演示Sink到MySQL和HBase示例。
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
object KafkaToSinkStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val p = new Properties()
p.setProperty("bootstrap.servers", "localhost:9092")
p.setProperty("group.id", "test")
val input = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p))
// 自定义MysqlSink类,将数据Sink到mysql
val sink = new MysqlSink("jdbc:mysql://localhost:3306/test", "root", "root")
input.addSink(sink)
// 自定义HBaseSink类,将数据Sink到HBase
val hBaseSink = new HBaseSink("student", "info")
input.addSink(hBaseSink)
env.execute("KafkaToSinkStreaming")
}
}
自定义MysqlSink类
import java.sql.{Connection, DriverManager}
import com.google.gson.Gson
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class MysqlSink(url: String, user: String, pwd: String) extends RichSinkFunction[String] {
var conn: Connection = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, pwd)
conn.setAutoCommit(false)
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val g = new Gson()
val s = g.fromJson(value, classOf[Student])
println(value)
val p = conn.prepareStatement("replace into student(name,age,sex,sid) values(?,?,?,?)")
p.setString(1, s.name)
p.setString(2, s.age.toString)
p.setString(3, s.sex)
p.setString(4, s.sid)
p.execute()
conn.commit()
}
override def close(): Unit = {
super.close()
conn.close()
}
}
自定义HBaseSink类
import com.google.gson.Gson
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
class HBaseSink(tableName: String, family: String) extends RichSinkFunction[String] {
var conn: Connection = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost")
conn = ConnectionFactory.createConnection(conf)
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val g = new Gson()
val student = g.fromJson(value, classOf[Student])
println(value)
println(student)
val t: Table = conn.getTable(TableName.valueOf(tableName))
val put: Put = new Put(Bytes.toBytes(student.sid))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("name"), Bytes.toBytes(student.name))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("age"), Bytes.toBytes(student.age))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("sex"), Bytes.toBytes(student.sex))
t.put(put)
t.close()
}
override def close(): Unit = {
super.close()
conn.close()
}
}
Student类
case class Student(name: String, age: Int, sex: String, sid: String)
执行KafkaToSinkStreaming程序后,在kafka product端输入。
{"name":"zhangsan","age":"18","sex":"male","sid":"1001"}
{"name":"lisi","age":"20","sex":"male","sid":"1002"}
{"name":"laowang","age":"20","sex":"male","sid":"1003"}
{"name":"caocao","age":"28","sex":"male","sid":"1004"}
mysql的数据库结果:
HBase数据库结果:
项目源码:https://github.com/zhang3550545/flinkdemo
相差阅读推荐:https://www.roncoo.com/view/171
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
大学生防范黑客的8大网络安全提示
每年都有数百万的学生上大学,但是并不是所有人都准备好迎接第一次独自生活的挑战。虽然学生可能会从父母那里获得洗衣服或烹饪基本食物等基础课程,但许多人从未接受过网络安全方面的任何培训,而网络安全是现代大学生活中日益重要的方面。 不要仅仅将网络安全信任于大学。太多情况下,数据泄露给学生造成了问题,这证明了唯一可以信任网络安全的人就是自己。 为了帮助大学生制定强有力的网络安全计划,我们汇总了八个提示,以帮助保持安全。 开始使用新证书 是否看过电视节目中的小人因为密码是“密码”而可以破解英雄的密码?不要让类似的事情发生在你身上。当您到达校园时,很可能会收到新的网络凭证,包括学院或大学网站的用户名和密码。 许多学生从不费力更改默认密码。这是一个大错误。从右脚开始,将默认密码更改为更安全的密码。 这将使帐户更难被破解,但也将有助于阻止知道学校提供的临时密码的任何人。 限制在大学图书馆的连接 大多数大学图书馆都有公用计算机,因此学生可以在学习时工作,但是这些计算机通常可以包含间谍软件,病毒和其他可能危害数据的危险物品。 使用图书馆计算机时,请在任何Web浏览器中使用隐私模式,以避免计算机记录访问的站点...
- 下一篇
Wine 5.0-rc6 发布,5.0 正式版最后冲刺阶段
Wine 5.0-rc6发布了,此版本只是进行了一些 bug 修复,因为项目目前正处在代码冻结阶段,特性不会变化。 Wine(Wine Is Not an Emulator)是一个能够在多种兼容 POSIX 接口的操作系统(诸如 Linux、macOS 与BSD 等)上运行 Windows 应用的兼容层。它不是像虚拟机或者模拟器一样模仿内部的 Windows 逻辑,而是将Windows API 调用翻译成为动态的 POSIX 调用,免除了性能和其它一些行为的内存占用,让你能够干净地整合Windows 应用到你的桌面。 如果没有意外,那么这就是最终 5.0 版本之前的最后一个版本。 更新说明: https://www.winehq.org/news/2020011701
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境