首页 文章 精选 留言 我的

精选列表

搜索[分布式调度],共10000篇文章
优秀的个人博客,低调大师

SpringBoot+kafka+ELK分布式日志收集

一、背景 随着业务复杂度的提升以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障我们也会将重要的服务模块进行集群部署,通过负载均衡进行服务的调用。那么随着节点的增多,各个服务的日志也会散落在各个服务器上。这对于我们进行日志分析带来了巨大的挑战,总不能一台一台的登录去下载日志吧。那么我们需要一种收集日志的工具将散落在各个服务器节点上的日志收集起来,进行统一的查询及管理统计。那么ELK就可以做到这一点。 ELK是ElasticSearch+Logstash+Kibana的简称,在这里我分别对如上几个组件做个简单的介绍: 1.1、ElasticSearch(简称ES) Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎。它允许您快速、实时地存储、搜索和分析大量数据。它通常用作底层引擎/技术,为具有复杂搜索特性和需求的应用程序提供动力。我们可以借助如ElasticSearch完成诸如搜索,日志收集,反向搜索,智能分析等功能。ES设计的目标: 快速实时搜索 Elasticsearch是一个实时搜索平台。这意味着,从索引文档到可搜索文档,存在轻微的延迟(通常为一秒)。 集群 集群是一个或多个节点(服务器)的集合,这些节点(服务器)一起保存整个数据,并提供跨所有节点的联合索引和搜索功能。集群由一个惟一的名称来标识,默认情况下该名称为“elasticsearch”。这个名称很重要,因为节点只能是集群的一部分,如果节点被设置为通过其名称加入集群的话。确保不要在不同的环境中重用相同的集群名称,否则可能会导致节点加入错误的集群。例如,您可以使用logging-dev、logging-test和logging-prod开发、测试和生产集群。 节点 节点是单个服务器,它是集群的一部分,它用来存储数据,并参与集群的索引和搜索功能。与集群一样,节点的名称默认为在启动时分配给节点的随机惟一标识符(UUID)。如果不需要默认值,可以定义任何节点名称。这个名称对于管理非常重要,因为您想要确定网络中的哪些服务器对应于Elasticsearch集群中的哪些节点。 索引 索引是具有类似特征的文档的集合。例如,您可以有一个客户数据索引、另一个产品目录索引和另一个订单数据索引。索引由一个名称标识(必须是小写的),该名称用于在对其中的文档执行索引、搜索、更新和删除操作时引用索引。在单个集群中,可以定义任意数量的索引。 文档 文档是可以建立索引的基本信息单元。例如,可以为单个客户提供一个文档,为单个产品提供一个文档,为单个订单提供另一个文档。这个文档用JSON (JavaScript对象符号)表示。在索引中,可以存储任意数量的文档。请注意,尽管文档在物理上驻留在索引中,但实际上文档必须被索引/分配到索引中的类型中。 1.2、Logstash Logstash是一个开源数据收集引擎,具有实时流水线功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据规范化后(通过Filter过滤)传输到您选择的目标。 在这里inputs代表数据的输入通道,大家可以简单理解为来源。常见的可以从kafka,FileBeat, DB等获取日志数据,这些数据经过fliter过滤后(比如说:日志过滤,json格式解析等)通过outputs传输到指定的位置进行存储(Elasticsearch,Mogodb,Redis等) 简单的实例: cd logstash-6.4.1 bin/logstash -e 'input { stdin { } } output { stdout {} }' 1.3、Kibana kibana是用于Elasticsearch检索数据的开源分析和可视化平台。我们可以使用Kibana搜索、查看或者与存储在Elasticsearch索引中的数据交互。同时也可以轻松地执行高级数据分析并在各种图表、表和映射中可视化数据。基于浏览器的Kibana界面使您能够快速创建和共享动态仪表板,实时显示对Elasticsearch查询的更改。 1.4、处理方案 用户通过java应用程序的Slf4j写入日志,SpringBoot默认使用的是logback。我们通过实现自定义的Appender将日志写入kafka,同时logstash通过input插件操作kafka订阅其对应的主题。当有日志输出后被kafka的客户端logstash所收集,经过相关过滤操作后将日志写入Elasticsearch,此时用户可以通过kibana获取elasticsearch中的日志信息 二、SpringBoot中的配置 在SpringBoot当中,我们可以通过logback-srping.xml来扩展logback的配置。不过我们在此之前应当先添加logback对kafka的依赖,代码如下: compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1' 添加好依赖之后我们需要在类路径下创建logback-spring.xml的配置文件并做如下配置(添加kafka的Appender): <configuration> <!-- springProfile用于指定当前激活的环境,如果spring.profile.active的值是哪个,就会激活对应节点下的配置 --> <springProfile name="default"> <!-- configuration to be enabled when the "staging" profile is active --> <springProperty scope="context" name="module" source="spring.application.name" defaultValue="undefinded"/> <!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 --> <springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers" defaultValue="localhost:9092"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> <pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern> </encoder> </appender> <!-- kafka的appender配置 --> <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder> <pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern> </encoder> <topic>logger-channel</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/> <!-- Optional parameter to use a fixed partition --> <!-- <partition>0</partition> --> <!-- Optional parameter to include log timestamps into the kafka message --> <!-- <appendTimestamp>true</appendTimestamp> --> <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --> <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --> <!-- bootstrap.servers is the only mandatory producerConfig --> <producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig> <!-- 如果kafka不可用则输出到控制台 --> <appender-ref ref="STDOUT"/> </appender> <!-- 指定项目中的logger --> <logger name="org.springframework.test" level="INFO" > <appender-ref ref="kafka" /> </logger> <root level="info"> <appender-ref ref="STDOUT" /> </root> </springProfile> </configuration> 在这里面我们主要注意以下几点: 日志输出的格式是为模块名 | 时间 | 日志级别 | 类的全名 | 日志内容 SpringProfile节点用于指定当前激活的环境,如果spring.profile.active的值是哪个,就会激活对应节点下的配置 springProperty可以读取Environment中的值 三、ELK搭建过程 3.1、检查环境 ElasticSearch需要jdk8,官方建议我们使用JDK的版本为1.8.0_131,原文如下: Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131 检查完毕后,我们可以分别在官网下载对应的组件 ElasticSearch Kibana Logstash kafka zookeeper 3.2、启动zookeeper 首先进入启动zookeeper的根目录下,将conf目录下的zoo_sample.cfg文件拷贝一份重新命名为zoo.cfg mv zoo_sample.cfg zoo.cfg 配置文件如下: # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=../zookeeper-data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 紧接着我们进入bin目录启动zookeeper: ./zkServer.sh start 3.3、启动kafka 在kafka根目录下运行如下命令启动kafka: ./bin/kafka-server-start.sh config/server.properties 启动完毕后我们需要创建一个logger-channel主题: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel 3.4、配置并启动logstash 进入logstash跟目录下的config目录,我们将logstash-sample.conf的配置文件拷贝到根目录下重新命名为core.conf,然后我们打开配置文件进行编辑: # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { kafka { id => "my_plugin_id" bootstrap_servers => "localhost:9092" topics => ["logger-channel"] auto_offset_reset => "latest" } } filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{WORD:module} \| %{LOGBACKTIME:timestamp} \| %{LOGLEVEL:level} \| %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" } } } output { stdout { codec => rubydebug } elasticsearch { hosts =>["localhost:9200"] } } 我们分别配置logstash的input,filter和output(懂ruby的童鞋们肯定对语法结构不陌生吧): 在input当中我们指定日志来源为kafka,具体含义可以参考官网:kafka-input-plugin 在filter中我们配置grok插件,该插件可以利用正则分析日志内容,其中patterns_dir属性用于指定自定义的分析规则,我们可以在该文件下建立文件配置验证的正则规则。举例子说明:55.3.244.1 GET /index.html 15824 0.043的 日志内容经过如下配置解析: grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } 解析过后会变成: client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043 这些属性都会在elasticsearch中存为对应的属性字段。更详细的介绍请参考官网:grok ,当然该插件已经帮我们定义好了好多种核心规则,我们可以在这里查看所有的规则。 在output当中我们将过滤过后的日志内容打印到控制台并传输到elasticsearch中,我们可以参考官网上关于该插件的属性说明:地址 另外我们在patterns文件夹中创建好自定义的规则文件logback,内容如下: # yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) 编辑好配置后我们运行如下命令启动logstash: bin/logstash -f first-pipeline.conf --config.reload.automatic 该命令会实时更新配置文件而不需启动 3.5、启动ElasticSearch 启动ElasticSearch很简单,我们可以运行如下命令: ./bin/elasticsearch 我们可以发送get请求来判断启动成功: GET http://localhost:9200 我们可以得到类似于如下的结果: { "name" : "Cp8oag6", "cluster_name" : "elasticsearch", "cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA", "version" : { "number" : "6.4.0", "build_flavor" : "default", "build_type" : "zip", "build_hash" : "f27399d", "build_date" : "2016-03-30T09:51:41.449Z", "build_snapshot" : false, "lucene_version" : "7.4.0", "minimum_wire_compatibility_version" : "1.2.3", "minimum_index_compatibility_version" : "1.2.3" }, "tagline" : "You Know, for Search" } 3.5.1 配置IK分词器(可选) 我们可以在github上下载elasticsearch的IK分词器,地址如下:ik分词器,然后把它解压至your-es-root/plugins/ik的目录下,我们可以在{conf}/analysis-ik/config/IKAnalyzer.cfg.xmlor {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml 里配置自定义分词器: <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <!--用户可以在这里配置自己的扩展字典 --> <entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry> <!--用户可以在这里配置自己的扩展停止词字典--> <entry key="ext_stopwords">custom/ext_stopword.dic</entry> <!--用户可以在这里配置远程扩展字典 --> <entry key="remote_ext_dict">location</entry> <!--用户可以在这里配置远程扩展停止词字典--> <entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry> </properties> 首先我们添加索引: curl -XPUT http://localhost:9200/my_index 我们可以把通过put请求来添加索引映射: PUT my_index { "mappings": { "doc": { "properties": { "title": { "type": "text" }, "name": { "type": "text" }, "age": { "type": "integer" }, "created": { "type": "date", "format": "strict_date_optional_time||epoch_millis" } "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } } } } 其中doc是映射名 my_index是索引名称 3.5.2 logstash与ElasticSearch logstash默认情况下会在ES中建立logstash-*的索引,*代表了yyyy-MM-dd的时间格式,根据上述logstash配置filter的示例,其会在ES中建立module ,logmessage,class,level等索引。(具体我们可以根据grok插件进行配置) 3.6 启动Kibana 在kibana的bin目录下运行./kibana即可启动。启动之后我们可以通过浏览器访问http://localhost:5601 来访问kibanaUI。我们可以看到如下界面:

优秀的个人博客,低调大师

分布式--ActiveMQ 消息中间件(一)

1. ActiveMQ 1). ActiveMQ ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。 2). Java Message Service(JMS) JMS支持两种消息发送和接收模型。 一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。 图1.png 另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。 图2.png 3). JMS术语 Provider/MessageProvider:生产者 Consumer/MessageConsumer:消费者 PTP:Point To Point,点对点通信消息模型 Pub/Sub:Publish/Subscribe,发布订阅消息模型 Queue:队列,目标类型之一,和PTP结合 Topic:主题,目标类型之一,和Pub/Sub结合 ConnectionFactory:连接工厂,JMS用它创建连接 Connnection:JMS Client到JMS Provider的连接 Destination:消息目的地,由Session创建 Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的 4). ActiveMQ下载 图3.png bin (windows下面的bat(分32、64位)和unix/linux下面的sh) conf (activeMQ配置目录,包含最基本的activeMQ配置文件) data (默认是空的) docs (index,replease版本里面没有文档,-.-b不知道为啥不带) example (几个例子) lib (activemMQ使用到的lib) webapps 注意ActiveMQ自带Jetty提供Web管控台 webapps-demo 示例 activemq-all-5.15.3.jar LICENSE.txt README.txt 5). 配置 Web控制台账号和密码(apache-activemq-5.15.3\conf) 图4.png 网络端口(apache-activemq-5.15.3\conf)--默认为8161 图5.png 6). 启动 \apache-activemq-5.15.3\bin\win64\目录下双击activemq.bat文件,在浏览器中输入http://localhost:8161/admin/, 用户名和密码输入admin即可 图6.png 7). 消息中间件(MOM:Message Orient middleware) 消息中间件有很多的用途和优点: 1 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 负责建立网络通信的通道,进行数据的可靠传送。 保证数据不重发,不丢失 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务 8).什么情况下使用ActiveMQ? 多个项目之间集成 (1) 跨平台 (2) 多语言 (3) 多项目 降低系统间模块的耦合度,解耦 (1) 软件扩展性 系统前后端隔离 (1) 前后端隔离,屏蔽高安全区 2. ActiveMQ 示例 1). P2P 示例 I. 导包--activemq-all-5.15.3.jar II. Producer /** * 定义消息的生产者 * @author mazaiting */ public class Producer { // 用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 链接 private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL; /** * 定义消息并发送,等待消息的接收者(消费者)消费此消息 * @param args * @throws JMSException */ public static void main(String[] args) throws JMSException { // 消息中间件的链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( USERNAME, PASSWORD, BROKENURL); // 连接 Connection connection = null; // 会话 Session session = null; // 消息的目的地 Destination destination = null; // 消息生产者 MessageProducer messageProducer = null; try { // 通过连接工厂获取链接 connection = connectionFactory.createConnection(); // 创建会话,进行消息的发送 // 参数一:是否启用事务 // 参数二:设置自动签收 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 destination = session.createQueue("talkWithMo"); // 创建一个消息生产者 messageProducer = session.createProducer(destination); // 设置持久化/非持久化, 如果非持久化,MQ重启后可能后导致消息丢失 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 模拟发送消息 for (int i = 0; i < 5; i++) { TextMessage textMessage = session.createTextMessage("给妈妈发送的消息:"+i); System.out.println("textMessage: " + textMessage); messageProducer.send(textMessage); } // 如果设置了事务,会话就必须提交 session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { if (null != connection) { connection.close(); } } } } III. Consumer /** * 定义消息的消费者 * @author mazaiting */ public class Consumer { // 用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 链接 private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL; /** * 接收消息 * @param args * @throws JMSException */ public static void main(String[] args) throws JMSException { // 消息中间件的链接工厂 ConnectionFactory connectionFactory = null; // 链接 Connection connection = null; // 会话 Session session = null; // 消息的目的地 Destination destination = null; // 消息的消费者 MessageConsumer messageConsumer = null; // 实例化链接工厂,创建一个链接 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKENURL); try { // 通过工厂获取链接 connection = connectionFactory.createConnection(); // 启动链接 connection.start(); // 创建会话,进行消息的接收 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息队列 destination = session.createQueue("talkWithMo"); // 创建一个消息的消费者 messageConsumer = session.createConsumer(destination); // 模拟接收消息 while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(10000); if (null != textMessage) { System.out.println("收到消息: " + textMessage); } else { break; } } // 提交 session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { if (null != connection) { connection.close(); } } } } IV. 测试 先运行生产者Producer 图7.png ActiveMQ控制台 图8.png 再运行消费者Consumer 图9.png ActiveMQ控制台 图10.png V. 消息类型 StreamMessage Java原始值的数据流 MapMessage 一套名称-键值对 TextMessage 一个字符串对象 ObjectMessage 一个序列号的Java对象 BytesMessage 一个未解释字节的数据流 VI. 控制台 Queue Messages Enqueued:表示生产了多少条消息,记做P Messages Dequeued:表示消费了多少条消息,记做C Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息 Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C VII. 签收 签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉! AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收 CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收 DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销 2). request/reply模型 I. 实现思路 图11.png Client的Producer发出一个JMS message形式的request,request上附加了一些额外的属性: correlation ID(用来和返回的correlation ID对比进行验证), JMSReplyTo属性(放置jms message的destination,这样worker的Consumer获得jms message就能得到destination) Worker的consumer收到requset,处理request并用producer发出reply,destination就从requset的JMSReplyTo属性中得到。 II. Server代码 public class Server implements MessageListener { // 经纪人链接 private static final String BROKER_URL = "tcp://localhost:61616"; // 请求队列 private static final String REQUEST_QUEUE = "requestQueue"; // 经纪人服务 private BrokerService brokerService; // 会话 private Session session; // 生产者 private MessageProducer producer; // 消费者 private MessageConsumer consumer; private void start() throws Exception { createBroker(); setUpConsumer(); } /** * 创建经纪人 * @throws Exception */ private void createBroker() throws Exception { // 创建经纪人服务 brokerService = new BrokerService(); // 设置是否持久化 brokerService.setPersistent(false); // 设置是否使用JMX brokerService.setUseJmx(false); // 添加链接 brokerService.addConnector(BROKER_URL); // 启动 brokerService.start(); } /** * 设置消费者 * @throws JMSException */ private void setUpConsumer() throws JMSException { // 创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); // 创建连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination adminQueue = session.createQueue(REQUEST_QUEUE); // 创建生产者 producer = session.createProducer(null); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 创建消费者 consumer = session.createConsumer(adminQueue); // 消费者设置消息监听 consumer.setMessageListener(this); } public void stop() throws Exception { producer.close(); consumer.close(); session.close(); brokerService.stop(); } @Override public void onMessage(Message message) { try { // 创建新消息 TextMessage response = this.session.createTextMessage(); // 判断消息是否是文本消息 if (message instanceof TextMessage) { // 强转为文本消息 TextMessage textMessage = (TextMessage) message; // 获取消息内容 String text = textMessage.getText(); // 设置消息 response.setText(handleRequest(text)); } response.setJMSCorrelationID(message.getJMSCorrelationID()); producer.send(message.getJMSReplyTo(), response); } catch (JMSException e) { e.printStackTrace(); } } /** * 构建消息内容 * @param text 文本 * @return */ private String handleRequest(String text) { return "Response to '" + text + "'"; } public static void main(String[] args) throws Exception { Server server = new Server(); // 启动 server.start(); System.out.println(); System.out.println("Press any key to stop the server"); System.out.println(); System.in.read(); server.stop(); } } III. Client代码 public class Client implements MessageListener { // 经纪人链接 private static final String BROKER_URL = "tcp://localhost:61616"; // 请求队列 private static final String REQUEST_QUEUE = "requestQueue"; // 连接 private Connection connection; // 会话 private Session session; // 生产者 private MessageProducer producer; // 消费者 private MessageConsumer consumer; // 请求队列 private Queue tempDest; public void start() throws JMSException { // 连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL); // 创建连接 connection = activeMQConnectionFactory.createConnection(); // 开启连接 connection.start(); // 创建会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination adminQueue = session.createQueue(REQUEST_QUEUE); // 创建生产者 producer = session.createProducer(adminQueue); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 创建模板队列 tempDest = session.createTemporaryQueue(); // 创建消费者 consumer = session.createConsumer(tempDest); // 设置消息监听 consumer.setMessageListener(this); } /** * 停止 * @throws JMSException */ public void stop() throws JMSException { producer.close(); consumer.close(); session.close(); } /** * 请求 * @param request * @throws JMSException */ public void request(String request) throws JMSException { System.out.println("Request: " + request); // 创建文本消息 TextMessage textMessage = session.createTextMessage(); // 设置文本内容 textMessage.setText(request); // 设置回复 textMessage.setJMSReplyTo(tempDest); // 获取UUID String correlationId = UUID.randomUUID().toString(); // 设置JMS id textMessage.setJMSCorrelationID(correlationId); // 发送消息 this.producer.send(textMessage); } @Override public void onMessage(Message message) { try { System.out.println("Received response for: " + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) throws JMSException, InterruptedException { Client client = new Client(); // 启动 client.start(); int i = 0; while(i++ < 10) { client.request("REQUEST- " + i); } Thread.sleep(3000); client.stop(); } } IV. 测试 启动Server 图12.png 启动Client 图13.png 代码下载

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册