首页 文章 精选 留言 我的

精选列表

搜索[linux],共10000篇文章
优秀的个人博客,低调大师

Linux 环境下,Apache DolphinScheduler 如何驱动 Flink 消费 Kafka 数据?

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。 Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone). 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下: 配置好Flink的环境变量 1、编辑环境变量: sudo vim ~/.bashrc 增加Flink的路径 2、使环境变量生效: #使环境变量生效 source ~/.bashrc #查看环境变量 echo $Flink_HOME 修改Kafka、Flink以及DolphinScheduler的配置文件 因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 #192.168.146.132修改成虚拟机ip advertised.listeners=PLAINTEXT://192.168.146.132:9092 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存 jobmanager.rpc.address: 0.0.0.0 jobmanager.bind-host: 0.0.0.0 jobmanager.cpu.cores: 1 jobmanager.memory.process.size: 1600m taskmanager.bind-host: 0.0.0.0 taskmanager.host: 0.0.0.0 taskmanager.memory.process.size: 2048m taskmanager.cpu.cores: 1 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh。 查看dolphinscheduler-daemon.sh文件: 修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径: #修改成自己的JAVA、Flink路径 export JAVA_HOME=/data/jdk-11.0.29 export Flink_HOME=/data/Flink-1.18.1 关闭防火墙,启动应用 启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。 #关闭防火墙 sudo systemctl stop firewalld # 在 Flink 根目录下,执行以下命令启动 Flink 集群 bin/start-cluster.sh # 启动 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties & # 启动 Kafka 服务器 bin/Kafka-server-start.sh config/server.properties & #创建 Kafka 主题 bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 #使用命令行生产者发送消息 bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092 #消费 bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092 # 启动 Standalone Server 服务 bash ./bin/dolphinscheduler-daemon.sh start standalone-server 测试 测试Flink、Apache DolphinScheduler是否能访问成功。 Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址 Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123 编写样例 用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务: 编写样例: pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>Flink-Kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <Flink.version>1.18.1</Flink.version> <scala.binary.version>2.12</scala.binary.version> <Kafka.version>3.6.0</Kafka.version> </properties> <dependencies> <!-- Flink核心依赖 --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-streaming-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-clients</artifactId> <version>${Flink.version}</version> </dependency> <!-- 连接器基础依赖 --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-base</artifactId> <version>${Flink.version}</version> </dependency> <!-- Kafka连接器(关键修改点) --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-Kafka</artifactId> <version>3.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>${Kafka.version}</version> </dependency> <!-- 日志依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> <scope>runtime</scope> </dependency> </dependencies> <repositories> <repository> <id>aliyun</id> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>apache-releases</id> <url>https://repository.apache.org/content/repositories/releases/</url> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.Flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> FlinkKafkaConsumerExample.java import org.apache.Flink.api.common.functions.FlatMapFunction; import org.apache.Flink.api.java.tuple.Tuple2; import org.apache.Flink.api.java.utils.ParameterTool; import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.Flink.streaming.api.datastream.DataStream; import org.apache.Flink.streaming.api.functions.ProcessFunction; import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.Flink.util.Collector; import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer; import org.apache.Flink.api.common.serialization.SimpleStringSchema; import org.apache.Kafka.clients.consumer.ConsumerConfig; import org.apache.Kafka.common.serialization.StringDeserializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; public class FlinkKafkaConsumerExample { private static volatile int messageCount = 0; private static volatile boolean shouldStop = false; public static void main(String[] args) throws Exception { // 设置执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka 配置 Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 创建 Kafka 消费者 FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费 DataStream<String> stream = env.addSource(KafkaConsumer); // 处理数据:分词和计数 DataStream<Tuple2<String, Integer>> counts = stream .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() { @Override public void invoke(Tuple2<String, Integer> value, Context context) { System.out.println(value); messageCount++; // 检查是否达到停止条件 if (messageCount >= 2 && !shouldStop) { System.out.println("Processed 2 messages, stopping job."); shouldStop = true; // 设置标志位,表示应该停止 } } }); // 执行作业并获取 JobClient CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { // 启动作业并获取 JobClient org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount"); System.out.println("Job ID: " + jobClient.getJobID()); // 监测条件并取消作业 while (!shouldStop) { Thread.sleep(100); // 每100毫秒检查一次 } // 达到停止条件时取消作业 if (shouldStop) { System.out.println("Cancelling the job..."); jobClient.cancel().get(); // 取消作业 } } catch (Exception e) { e.printStackTrace(); } }); // 在主线程中等待作业结束 future.join(); // 等待作业完成 } // Tokenizer 类用于将输入字符串转化为单词 public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } 打包上传到Apache DolphinScheduler 新建Flink节点,并启动 在Apache DolphinScheduler的任务实例看启动日志: 在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息: 原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

优秀的个人博客,低调大师

IceWM 4.0.0 发布,Linux / BSD 轻量级窗口管理器

IceWM 4.0.0 现已发布,改进了 Alt+Tab 快速切换功能。Alt +Tab 窗口切换器现在可以处理水平和垂直模式下的大量应用程序窗口。在 Alt+Tab 中输入应用程序类名的首字母,即可选择该应用程序类的下一个实例窗口。 用户也可以按数字键选择应用程序。在水平模式下,还可以使用鼠标在 Alt+Tab 中选择应用程序。此外,还支持使用所有导航键来导航快速切换窗口。按下 Alt+Tab 上的菜单按钮即可打开系统菜单。新增QuickSwitchPreview预览模式,可在快速切换过程中实时更新应用程序预览效果。 Fixes 修复 OpenBSD 的键盘布局切换问题。 修复在 secondary screen 上拖动桌面迷你图标的问题。 当工作区名称从外部更改时,更新任务栏上的工作区名称。 修复任务窗格与工作区窗格重叠的问题。 防止停靠层窗口退出时可能发生的崩溃。 在 icewm-menu-fdo 中初始化用户的默认语言环境。 Changes Alpha blending 和 32 位 RGBA 已成为默认设置。 移除 DoubleBuffer 和 QuickSwitchMaxWidth 首选项。 在 WM_ICON_SIZE 中宣布支持高分辨率图标。 将图标大小标准化为 16、22、24、32、48、64、128 和 256。 通过在服务器端缓存图标图片来加快图标绘制速度。 当 clock led pixmap 缺失时,使用 clock font 代替。 将窗口标题限制为 128 字节,并去除尾随空格。 /proc/net/dev 上的 I/O 故障最多报告一次。 调整 HiDPI 显示器的子菜单指示器大小。 启动时从桌面读取额外的工作区名称。 将 getWorkspaceName 和 getWorkspaceNames 添加到 icesh。 为 icesh 的 loadicon 和 saveicon 函数添加诊断信息输出。 已更新的翻译:瑞典语、加泰罗尼亚语、德语、日语、荷兰语、斯洛伐克语、印尼语、葡萄牙语、巴西葡萄牙语、斯洛文尼亚语。 详情可查看 :https://github.com/ice-wm/icewm/releases/tag/4.0.0

优秀的个人博客,低调大师

Linux 桌面环境 LXQt 2.2 发布,提供更好的 Wayland 支持

LXQt 2.2 已正式发布,成为这款轻量级开源 Qt 桌面环境的最新稳定更新。 LXQt 2.2 以之前 LXQt 版本中已实现的 Wayland 支持为基础。LXQt 2.2 不仅提升了 Wayland 的多屏支持,还进行了其他一些功能改进,以便在 X11 继续正常运行的同时,更好地支持 Wayland 环境。 LXQt Wayland 会话也将继续与所有可与其配对的不同 Wayland 合成器的最新稳定版本完美兼容。LXQt 2.2 还为 PCManFM-Qt 带来了终端自定义选项支持、QTerminal 文本渲染修复以及 LXQt 电源管理中的 PPD 电源配置文件支持。 LXQt Archiver 现在默认使用 7-Zip 构建,并依赖 7z 处理 RAR 压缩包,libqtxdg 也已针对 Qt 6.9 头文件进行了更新。LibFM-Qt 也能够更好地处理 LXQt 2.2 桌面版的拖放操作。 更多有关 LXQt 2.2 桌面版的详细信息,请访问GitHub。

优秀的个人博客,低调大师

Tails 6.13 发布,隐私性极高的 Linux 发行版

Tails 6.13 现已发布,具体更新内容包括: 新功能 检测 Wi-Fi 硬件问题 为了帮助解决 Wi-Fi 接口的硬件兼容性问题,Tor Connection助手现在会在未检测到 Wi-Fi 硬件时进行报告。 Changes and updates 将Tor Browser更新至14.0.7。 将Torclient更新至 0.4.8.14。 已修复的问题 首次启动 Tails 时也检测分区错误。(#20797) 这解决了在新的 Tails USB 记忆棒上创建持久存储时出现的一些故障。 修复安装其他软件失败时通知中的配置和显示日志按钮。(#20781) 更多详情可查看changelog。

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册