首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

ES[7.6.x]学习笔记(七)IK中文分词器

在上一节中,我们给大家介绍了ES的分析器,我相信大家对ES的全文搜索已经有了深刻的印象。分析器包含3个部分:字符过滤器、分词器、分词过滤器。在上一节的例子,大家发现了,都是英文的例子,是吧?因为ES是外国人写的嘛,中国如果要在这方面赶上来,还是需要屏幕前的小伙伴们的~ 英文呢,我们可以按照空格将一句话、一篇文章进行分词,然后对分词进行过滤,最后留下有意义的词。但是中文怎么分呢?中文的一句话是没有空格的,这就要有一个强大的中文词库,当你的内容中出现这个词时,就会将这个词提炼出来。这里大家也不用重复造轮子,经过前辈的努力,这个中文的分词器已经有了,它就是今天要给大家介绍的IK中文分词器。 IK中文分词器的安装 ES默认是没有IK中文分词器的,我们要将IK中文分词器作为一个插件安装到ES中,安装的步骤也很简单: 从GitHub上下载适合自己ES版本的IK中文分词器,地址如下:https://github.com/medcl/elasticsearch-analysis-ik/releases。 在我们的ES的插件目录中(${ES_HOME}/plugins)创建ik目录, mkdir ik 将我们下载好的IK分词器解压到ik目录,这里我们安装unzip命令,进行解压。 重启我们所有的ES服务。 到这里,我们的IK中文分词器就安装完了。 IK中文分词器初探 在上一节我们访问了ES的分析器接口,指定了分析器和文本的内容,我们就可以看到分词的结果。那么既然我们已经安装了Ik中文分词器,当然要看一下效果了。在看效果之前,我们先要说一下,IK中文分词器插件给我们提供了两个分析器。 ik_max_word: 会将文本做最细粒度的拆分 ik_smart:会做最粗粒度的拆分 我们先看看ik_max_word的分析效果吧, POST _analyze { "analyzer": "ik_max_word", "text": "中华人民共和国国歌" } 我们指定分词器为ik_max_word,文本内容为中华人民共和国国歌。我们看一下分词的结果: { "tokens": [ { "token": "中华人民共和国", "start_offset": 0, "end_offset": 7, "type": "CN_WORD", "position": 0 }, { "token": "中华人民", "start_offset": 0, "end_offset": 4, "type": "CN_WORD", "position": 1 }, { "token": "中华", "start_offset": 0, "end_offset": 2, "type": "CN_WORD", "position": 2 }, { "token": "华人", "start_offset": 1, "end_offset": 3, "type": "CN_WORD", "position": 3 }, { "token": "人民共和国", "start_offset": 2, "end_offset": 7, "type": "CN_WORD", "position": 4 }, { "token": "人民", "start_offset": 2, "end_offset": 4, "type": "CN_WORD", "position": 5 }, { "token": "共和国", "start_offset": 4, "end_offset": 7, "type": "CN_WORD", "position": 6 }, { "token": "共和", "start_offset": 4, "end_offset": 6, "type": "CN_WORD", "position": 7 }, { "token": "国", "start_offset": 6, "end_offset": 7, "type": "CN_CHAR", "position": 8 }, { "token": "国歌", "start_offset": 7, "end_offset": 9, "type": "CN_WORD", "position": 9 } ] } 我们可以看到,分词分的非常细,我们在使用上面的这些进行搜索时,都可以搜索到中华人民共和国国歌这个文本。我们再看一下另外一个分析器ik_smart, POST _analyze { "analyzer": "ik_smart", "text": "中华人民共和国国歌" } 我们的文本内容同样是中华人民共和国国歌,看一下分词的效果, { "tokens": [ { "token": "中华人民共和国", "start_offset": 0, "end_offset": 7, "type": "CN_WORD", "position": 0 }, { "token": "国歌", "start_offset": 7, "end_offset": 9, "type": "CN_WORD", "position": 1 } ] } 同样的文本,使用ik_smart进行分词时,只分成了两个词,和ik_max_word分词器比少了很多。这就是两个分词器的区别,不过这两个分析器都是可以对中文进行分词的。 创建索引时指定IK分词器 既然我们安装了IK中文分词器的插件,那么我们在创建索引时就可以为text类型的字段指定IK中文分词器了。来看看下面的例子, PUT ik_index { "mappings": { "properties": { "id": { "type": "long" }, "title": { "type": "text", "analyzer": "ik_max_word" } } } } 我们创建了索引ik_index,并且为字段title指定了分词器ik_max_word。我们执行一下,创建成功。然后我们再通过GET请求看一下这个索引的映射情况。 GET ik_index/_mapping 返回的结果如下: { "ik_index": { "mappings": { "properties": { "id": { "type": "long" }, "title": { "type": "text", "analyzer": "ik_max_word" } } } } } 我们可以看到title字段的分析器是ik_max_word。 为索引指定默认IK分词器 在上一节中,我们已经给大家介绍了为索引指定默认分词器的方法,这里我们直接把分词器改为IK分词器就可以了,如下: PUT ik_index { "settings": { "analysis": { "analyzer": { "default": { "type": "ik_max_word" } } } } } 这样我们在索引中就不用创建每一个字段,可以通过动态字段映射,将String类型的字段映射为text类型,同时分词器指定为ik_max_word。我们试一下,向ik_index索引中添加一条记录。 POST ik_index/_doc/1 { "id": 1, "title": "大兴庞各庄的西瓜", "desc": "大兴庞各庄的西瓜真是好吃,脆沙瓤,甜掉牙" } 执行成功。我们再执行搜索试一下,如下: POST ik_index/_search { "query": { "match": { "title": "西瓜" } } } 我们搜索title字段匹配西瓜,执行结果如下: { "took": 2, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": "ik_index", "_type": "_doc", "_id": "1", "_score": 0.2876821, "_source": { "id": 1, "title": "大兴庞各庄的西瓜", "desc": "大兴庞各庄的西瓜真是好吃,脆沙瓤,甜掉牙" } } ] } } 我们可以看到刚才插入的那条记录已经搜索出来了,看来我们的IK中文分词器起作用了,而且搜索的结果也符合我们的预期。我们再看看搜索西一个字的时候,能不能搜索到结果, POST ik_index/_search { "query": { "match": { "title": "西" } } } 执行结果如下: { "took": 4, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 0, "relation": "eq" }, "max_score": null, "hits": [] } } 并没有搜索出结果,说明在进行分词时,西瓜是作为一个词出现的,并没有拆分成每一个字,这也是符合我们预期的。 好了~ 这一节的IK中文分词器就给大家介绍到这里了~~

优秀的个人博客,低调大师

RabbitMQ学习:RabbitMQ的六种工作模式终结篇(四)

前言,在前面我讲到了RabbitMQ的六种工作模式中简单模式和工作模式 -- https://my.oschina.net/u/4115134/blog/3228182 ,这里呢,我就一次性将剩下的四种--发布订阅模式/路由模式/主题模式及Rpc异步调用模式,给大家进行分析,讲解一下,同时也给自己复习复习!!! 三、发布订阅模式 在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。 为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。 在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。 最终, 消息会被广播到所有消息接受者。 Exchanges 交换机 RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。 相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。 有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout"); fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。 我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。 但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。 要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列 //自动生成队列名 //非持久,独占,自动删除 String queueName = ch.queueDeclare().getQueue(); 绑定Bindings 我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。 //指定的队列,与指定的交换机关联起来 //成为绑定 -- binding //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey ch.queueBind(queueName, "logs", ""); 现在, logs交换机将会向我们指定的队列添加消息 列出绑定关系: rabbitmqctl list_bindings 完成代码实现 生产者 生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。 package rabbitmq.publishsubscribe; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定义名字为logs的交换机,交换机类型为fanout //这一步是必须的,因为禁止发布到不存在的交换。 ch.exchangeDeclare("logs", "fanout"); while (true) { System.out.print("输入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } //第一个参数,向指定的交换机发送消息 //第二个参数,不指定队列,由消费者向交换机绑定队列 //如果还没有队列绑定到交换器,消息就会丢失, //但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。 ch.basicPublish("logs", "", null, msg.getBytes("UTF-8")); System.out.println("消息已发送: "+msg); } c.close(); } } 消费者 如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。 package rabbitmq.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定义名字为 logs 的交换机, 它的类型是 fanout ch.exchangeDeclare("logs", "fanout"); //自动生成对列名, //非持久,独占,自动删除 String queueName = ch.queueDeclare().getQueue(); //把该队列,绑定到 logs 交换机 //对于 fanout 类型的交换机, routingKey会被忽略,不允许null值 ch.queueBind(queueName, "logs", ""); System.out.println("等待接收数据"); //收到消息后用来处理消息的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); } }; //消费者取消时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } } 四、路由模式 在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。 在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 绑定 Bindings 在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码: ch.queueBind(queueName, "logs", ""); 绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。 绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定: ch.queueBind(queueName, EXCHANGE_NAME, "black"); bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。 直连交换机 Direct exchange 上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。 前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。 我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置 其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green。 这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有black或green路由键的消息将转到Q2。而所有其他消息都将被丢弃。 多重绑定 Multiple bindings 使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。 发送日志 我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。 和前面一样,我们首先需要创建一个exchange: //参数1: 交换机名 //参数2: 交换机类型 ch.exchangeDeclare("direct_logs", "direct"); 接着来看发送消息的代码 //参数1: 交换机名 //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning" //参数3: 其他配置属性 //参数4: 发布的消息数据 ch.basicPublish("direct_logs", "error", null, message.getBytes()); 订阅 接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下: ch.queueBind(queueName, "logs", "info"); ch.queueBind(queueName, "logs", "warning"); 最终代码实现 生产者 package rabbitmq.routing; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { String[] a = {"warning", "info", "error"}; ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //参数1: 交换机名 //参数2: 交换机类型 ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); while (true) { System.out.print("输入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } //随机产生日志级别 String level = a[new Random().nextInt(a.length)]; //参数1: 交换机名 //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning" //参数3: 其他配置属性 //参数4: 发布的消息数据 ch.basicPublish("direct_logs", level, null, msg.getBytes()); System.out.println("消息已发送: "+level+" - "+msg); } c.close(); } } 消费者 package rabbitmq.routing; import java.io.IOException; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定义名字为 direct_logs 的交换机, 它的类型是 "direct" ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //自动生成对列名, //非持久,独占,自动删除 String queueName = ch.queueDeclare().getQueue(); System.out.println("输入接收的日志级别,用空格隔开:"); String[] a = new Scanner(System.in).nextLine().split("\\s"); //把该队列,绑定到 direct_logs 交换机 //允许使用多个 bindingKey for (String level : a) { ch.queueBind(queueName, "direct_logs", level); } System.out.println("等待接收数据"); //收到消息后用来处理消息的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); String routingKey = message.getEnvelope().getRoutingKey(); System.out.println("收到: "+routingKey+" - "+msg); } }; //消费者取消时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } } 五、主题模式 在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。 虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。 在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。 这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。 要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。 主题交换机 Topic exchange 发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。 bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点: * 可以通配单个单词。 # 可以通配零个或多个单词。 用一个例子来解释这个问题是最简单的 在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。 我们创建三个绑定:Q1与bindingKey “.orange.” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。 这些绑定可概括为: Q1对所有橙色的动物感兴趣。 Q2想接收关于兔子和慢速动物的所有消息。 将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。 如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。 另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。 最终代码实现 生产者 package rabbitmq.topic; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //参数1: 交换机名 //参数2: 交换机类型 ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); while (true) { System.out.print("输入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".contentEquals(msg)) { break; } System.out.print("输入routingKey: "); String routingKey = new Scanner(System.in).nextLine(); //参数1: 交换机名 //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning" //参数3: 其他配置属性 //参数4: 发布的消息数据 ch.basicPublish("topic_logs", routingKey, null, msg.getBytes()); System.out.println("消息已发送: "+routingKey+" - "+msg); } c.close(); } } 消费者 package rabbitmq.topic; import java.io.IOException; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); //自动生成对列名, //非持久,独占,自动删除 String queueName = ch.queueDeclare().getQueue(); System.out.println("输入bindingKey,用空格隔开:"); String[] a = new Scanner(System.in).nextLine().split("\\s"); //把该队列,绑定到 topic_logs 交换机 //允许使用多个 bindingKey for (String bindingKey : a) { ch.queueBind(queueName, "topic_logs", bindingKey); } System.out.println("等待接收数据"); //收到消息后用来处理消息的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); String routingKey = message.getEnvelope().getRoutingKey(); System.out.println("收到: "+routingKey+" - "+msg); } }; //消费者取消时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } } 六、RPC模式 客户端 在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果 RPCClient client = new RPCClient(); String result = client.call("4"); System.out.println( "第四个斐波那契数是: " + result); 回调队列 Callback Queue 使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码: //定义回调队列, //自动生成对列名,非持久,独占,自动删除 callbackQueueName = ch.queueDeclare().getQueue(); //用来设置回调队列的参数对象 BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); //发送调用消息 ch.basicPublish("", "rpc_queue", props, message.getBytes()); 消息属性 Message Properties AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个: deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。 contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。 replyTo:通常用于指定回调队列。 correlationId:将RPC响应与请求关联起来非常有用。 关联id (correlationId): 在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。 这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。 最终实现代码 RPC的工作方式是这样的: 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。 请求被发送到rpc_queue队列。 RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。 服务器端 package rabbitmq.rpc; import java.io.IOException; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); /* * 定义队列 rpc_queue, 将从它接收请求信息 * * 参数: * 1. queue, 对列名 * 2. durable, 持久化 * 3. exclusive, 排他 * 4. autoDelete, 自动删除 * 5. arguments, 其他参数属性 */ ch.queueDeclare("rpc_queue",false,false,false,null); ch.queuePurge("rpc_queue");//清除队列中的内容 ch.basicQos(1);//一次只接收一条消息 //收到请求消息后的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //处理收到的数据(要求第几个斐波那契数) String msg = new String(message.getBody(), "UTF-8"); int n = Integer.parseInt(msg); //求出第n个斐波那契数 int r = fbnq(n); String response = String.valueOf(r); //设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应 BasicProperties replyProps = new BasicProperties.Builder() .correlationId(message.getProperties().getCorrelationId()) .build(); /* * 发送响应消息 * 1. 默认交换机 * 2. 由客户端指定的,用来传递响应消息的队列名 * 3. 参数(关联id) * 4. 发回的响应消息 */ ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); //发送确认消息 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; // CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认 ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback); } protected static int fbnq(int n) { if(n == 1 || n == 2) return 1; return fbnq(n-1)+fbnq(n-2); } } 客户端 package rabbitmq.rpc; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCClient { Connection con; Channel ch; public RPCClient() throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); con = f.newConnection(); ch = con.createChannel(); } public String call(String msg) throws Exception { //自动生成对列名,非持久,独占,自动删除 String replyQueueName = ch.queueDeclare().getQueue(); //生成关联id String corrId = UUID.randomUUID().toString(); //设置两个参数: //1. 请求和响应的关联id //2. 传递响应数据的queue BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数 ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8")); //用来保存结果的阻塞集合,取数据时,没有数据会暂停等待 BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); //接收响应数据的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据 if (message.getProperties().getCorrelationId().contentEquals(corrId)) { //把收到的响应数据,放入阻塞集合 response.offer(new String(message.getBody(), "UTF-8")); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //开始从队列接收响应数据 ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback); //返回保存在集合中的响应数据 return response.take(); } public static void main(String[] args) throws Exception { RPCClient client = new RPCClient(); while (true) { System.out.print("求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); String r = client.call(""+n); System.out.println(r); } } } 七、RabbitMQ六种工作模式总结: 如果觉得我写的可以,是否可以【穿着条纹睡衣的男孩】,更多精彩会等着你哦

优秀的个人博客,低调大师

Python机器学习小技巧:GroupBy 和 Shift实现 Pandas分组错位

使用Pandas进行数据操作的时候,有时需要分组将数据错位进行操作。 在数据分析中经常遇到需要分组使用a列的第n行数据与去b列的第n+1行数据进行对比或者计算的要求,下面是我使用pandas解决该问题的方法。首先要说的试这个问题可以通过操作Index来实现。不过Pandas针对这种情况已经提供了一种方法了,就是shift函数,用起来更加方便。shift函数定义如下: pandas.DataFrame.shift DataFrame.shift(self,periods=1,freq=None,axis=0,fill_value=None)[source] Shift index by desired number of periods with an optional time freq. When freq is not passed, shift the index without realigning the data. If freq is passed (in this case, the index must be date or datetime, or it will raise a NotImplementedError), the index will be increased using the periods and the freq. 比如我们要分析一个汽车的形式记录,需要对比每个位置的前一个点和后一个点的情况,如下代码即可: df1['x_pre']=df1.groupby('CARID')['x'].shift(1) df1['x_next']=df1.groupby('CARID')['x'].shift(-1) df1['y_pre']=df1.groupby('CARID')['y'].shift(1) df1['y_next']=df1.groupby('CARID')['y'].shift(-1)

优秀的个人博客,低调大师

Java学习进阶之MySQL数据库结构和引擎比对

MySQL是最流行的关系型数据库管理系统,有很多Java程序员初学MySQL时都会被其繁杂的操作以及知识结构吓到,接下来好程序员Java培训小编就给大家简单分析MySQL的结构与引擎对比。 MySQL数据库结构 1、最上层的服务并不是MySQL独有的,大多数基于网络的客户端/服务器的工具或者服务都有类似的架构。 2、第二层架构中,大多数的MySQL的核心服务功能都在这一层,包括查询解析、分析、优化、缓存以及所有的内置函数(日期时间等),所有跨存储引擎的功能都在这一层实现:存储过程、触发器、视图等。 3、第三层包含了存储引擎。存储引擎负责MySQL中的数据的存储和提取。MySQL支持各种不同的存储引擎,每个存储引擎都有它的优势和劣势。服务器通过API和存储引擎进行通信。这些API接口屏蔽了不同存储引擎之间的差异,使得这些差异对上层的查询过程透明。存储引擎不会解析SQL(InnoDB是个例外,它会解析外键定义),不同的存储引擎之间也不能互相通信,而只是简单的响应上层服务器的请求! 执行流程 数据库存储引擎是数据库底层软件组件,数据库管理系统使用数据引擎进行创建、查询、更新和删除数据操作。 不同的存储引擎提供不同的存储机制、索引技巧、锁定水平等功能,使用不同的存储引擎还可以获得特定的功能。 现在许多数据库管理系统都支持多种不同的存储引擎。MySQL的核心就是存储引擎。 MyISAM:拥有较高的插入,查询速度,但不支持事务。 InnoDB :5.5版本后MySQL的默认数据库,事务型数据库的首选引擎,支持ACID事务,支持行级锁定。 BDB:源自Berkeley DB,事务型数据库的另一种选择,支持COMMIT和ROLLBACK等其他事务特性。 Memory:所有数据置于内存的存储引擎,拥有极高的插入,更新和查询效率。但是会占用和数据量成正比的内存空间。并且其内容会在Mysql重新启动时丢失。 Merge:将一定数量的MyISAM表联合而成一个整体,在超大规模数据存储时很有用。 Archive:非常适合存储大量的独立的,作为历史记录的数据。因为它们不经常被读取。Archive拥有高效的插入速度,但其对查询的支持相对较差。 Federated:将不同的Mysql服务器联合起来,逻辑上组成一个完整的数据库。非常适合分布式应用。 Cluster/NDB:高冗余的存储引擎,用多台数据机器联合提供服务以提高整体性能和安全性。适合数据量大,安全和性能要求高的应用。 CSV:逻辑上由逗号分割数据的存储引擎。它会在数据库子目录里为每个数据表创建一个.CSV文件。这是一种普通文本文件,每个数据行占用一个文本行。CSV存储引擎不支持索引。 BlackHole:黑洞引擎,写入的任何数据都会消失,一般用于记录binlog做复制的中继 另外,MySQL的存储引擎接口定义良好。有兴趣的开发者通过阅读文档编写自己的存储引擎。 想了解更多MySQL数据库知识,你可以关注“好程序员”微信公众号,定期发布技术文章和行业趋势分析,助力你快速入行,轻松上岗。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册