首页 文章 精选 留言 我的

精选列表

搜索[快速入门],共10000篇文章
优秀的个人博客,低调大师

Svelte入门——Web Components实现跨框架组件复用

Svelte 是构建 Web 应用程序的一种新方法,推出后一直不温不火,没有继Angular、React和VUE成为第四大框架,但也没有失去热度,无人问津。造成这种情况很重要的一个原因是,Svelte 的核心思想在于【通过静态编译减少框架运行时的代码量】,它可以像React和VUE一样开发,但却没有虚拟DOM。,使它可以Svelte可以将代码编译为体积小、不依赖于框架的JS代码。 看起来满满优点,但因为过于灵活,导致大家无法写出高度一致的业务代码,以上优点并没有在实际的大项目中得到很好的体现。 Svelte 这款框架并不完美,却又没有在残酷的市场竞争中死掉,是因为它拥有一本特殊秘籍,一些使它成为其他框架无法替代的一员的功能。。 而对于 Svelte 来说,这本秘籍的名字就叫做——Web Components。 在多团队协同完成的大项目中,各个团队可能使用不同的框架版本,甚至不同的框架,这让不同项目之间的组件复用变得困难。"write one,run anywhere"就是一句空话。这种情况下Svelte就变成了沟通跨越框架鸿沟的桥梁,使用Svelte开发的无框架依赖的Web Components,可以在各个框架间复用。同时,Svelte的开发方式也不像写pure js那样繁琐。 下面以SpreadJS集成为例,介绍如何用Svelte开发一款spread-sheets Web Component供其他页面复用。 创建Svelte template工程。 svelte 官方提供了template 工程,只要clone或者下载项目即可。 https://github.com/sveltejs/component-template npx degit sveltejs/component-template my-new-component cd my-new-component npm install # or yarn 修改 rollup.config.js,添加 customElement: true 配置,输出为web component组件。 添加后的rollup.config.js如下。 import svelte from 'rollup-plugin-svelte'; import resolve from '@rollup/plugin-node-resolve'; import pkg from './package.json'; const name = pkg.name .replace(/^(@\S+\/)?(svelte-)?(\S+)/, '$3') .replace(/^\w/, m =&gt; m.toUpperCase()) .replace(/-\w/g, m =&gt; m[1].toUpperCase()); export default { input: 'src/index.js', output: [ { file: pkg.module, 'format': 'es' }, { file: pkg.main, 'format': 'umd', name } ], plugins: [ svelte({ customElement: true, }), resolve() ], }; 更新 src/Component.svelte,创建spread-sheets组件。 <svelte:options tag="spread-sheets" /> <script> import { createEventDispatcher, onMount } from 'svelte'; // Event handling const dispatch = createEventDispatcher(); export let value =""; $: valueChanged(value); function valueChanged(newValue) { console.log("value changed", newValue); if(spread){ let sheet = spread.getActiveSheet(); sheet.setValue(0, 0, value); } } let spreadHost; let spread; function dispatchEvent(name, e) { // dispatch(name, e); const event = new CustomEvent(name, { detail: e, bubbles: true, cancelable: true, composed: true, // this line allows the event to leave the Shadow DOM }); // console.log(event) spreadHost.dispatchEvent(event); } onMount(() => { spread = new GC.Spread.Sheets.Workbook(spreadHost); let sheet = spread.getActiveSheet(); sheet.setValue(0, 0, value); spread.bind(GC.Spread.Sheets.Events.ValueChanged, function(s, e){ e.evnetName = "ValueChanged"; dispatchEvent("changed", e); }); spread.bind(GC.Spread.Sheets.Events.RangeChanged, function(s, e){ e.evnetName = "RangeChanged"; dispatchEvent("changed", e); }); }); </script> <style> </style> <div bind:this="{" spreadhost} style="width: 100%; height:100%"></div> 这样我们的自定义组件就创建好了,只需要调用npm run build,就能编译出spread-sheets 组件了。 在页面引用组件。 创建index.html页面,并引用编译好的js文件。同时引入spreadjs相关资源。 直接使用spread-sheets标签添加SpreadJS。 <meta name="spreadjs culture" content="zh-cn"> <meta charset="utf-8"> <title>My Counter</title> <base href="/"> <meta name="viewport" content="width=device-width, initial-scale=1"> <link rel="stylesheet" type="text/css" href="https://demo.grapecity.com.cn/spreadjs/SpreadJSTutorial/zh/purejs/node_modules/@grapecity/spread-sheets/styles/gc.spread.sheets.excel2013white.css"> <!-- <spread-sheets-designer></spread-sheets-designer> --> <button onclick="getJSON()">GetJSON</button> <spread-sheets value="123" style="display:block; width: 80%; height: 400px;"></spread-sheets> <script src="https://demo.grapecity.com.cn/SpreadJS/WebDesigner/lib/spreadjs/scripts/gc.spread.sheets.all.14.1.3.min.js" type="text/javascript"></script> <script type="text/javascript" src="/dist/index.js"></script> <script type="text/javascript"> document.querySelector("spread-sheets").addEventListener("changed", function(){ console.log(arguments) }) window.onload = function(){ document.querySelector("spread-sheets").setAttribute("value", "234"); } </script> 添加后效果如下图。 总结 虽然看起来Web Component完美解决了组件之间的复用问题,但是用Svelte 开发的Web Component也存在一些限制:比如,只能传递string 属性;绑定的attribute是单向绑定,想要获取组件内部更新值,需要绑定event获取。 如果大家对Svelte 有更多兴趣,欢迎留言交流~

优秀的个人博客,低调大师

性能工具之Java调试工具BTrace入门

2 --> 引言 在我们对Java应用做问题分析的时候,往往采用log进行问题定位和分析,但是如果我们的log缺乏相关的信息呢?远程调试会影响应用的正常工作,修改代码重新部署应用,实时性和灵活性难以保证,有没有不影响正常应用运行,又灵活并无侵入性的方法呢? 答案是有,它就是Java中的神器-BTrace BTrace是什么? BTrace使用Java的Attach技术,可以让我们无缝的将我们BTrace脚本挂到JVM上,通过脚本你可以获取到任何你想拿到的数据,在侵入性和安全性都非常可靠,特别是定位线上问题的神器。 BTrace原理 BTrace是基于动态字节码修改技术(Hotswap)向目标程序的字节码注入追踪代码。 安装配置 关于BTrace的安装配置使用,此处就不再重复造轮子,网上有太多的教程。 官网地址:https://github.com/btraceio/btrace 注意事项 生产环境可以使用,但修改的字节码不会被还原,使用Btrace时,需要确保追踪的动作是只读的(即:追踪行为不能修改目标程序的状态)和有限的行为(即:追踪行为需要在有限的时间内终止),一个追踪行为需要满足以下的限制: 不能创建新的对象 不能创建新的数组 不能抛出异常 不能捕获异常 不能对实例或静态方法调用-只有从BTraceUtils中的public static方法中或在当前脚本中声明的方法,可以被BTrace调用 不能有外部,内部,嵌套或本地类 不能有同步块或同步方法 不能有循环(for,while,do..while) 不能继承抽象类(父类必须是java.lang.Object) 不能实现接口 不能有断言语句 不能有class保留字 以上的限制可以通过通过unsafe模式绕过。追踪脚本和引擎都必须设置为unsafe模式。脚本需要使用注解为@BTrace(unsafe=true),需要修改BTrace安装目录下bin中btrace脚本将-Dcom.sun.btrace.unsafe=false改为-Dcom.sun.btrace.unsafe=true。 注:关于unsafe的使用,如果你的程序一旦被btrace追踪过,那么unsafe的设置会一直伴随该进程的整个生命周期。如果你修改了unsafe的设置,只有通过重启目标进程,才能获得想要的结果。所以该用法不是很好使用,如果你的应用不能随便重启,那么你在第一次使用btrace最终目标进程之前,先想好到底使用那种模式来启动引擎。 使用示例 拦截一个普通方法 control方法 @GetMapping(value = "/arg1") public String arg1(@RequestParam("name") String name) throws InterruptedException { Thread.sleep(2000); return "7DGroup," + name; } BTrace脚本 /** * 拦截示例 */ @BTrace public class PrintArgSimple { @OnMethod( //类名 clazz = "com.techstar.monitordemo.controller.UserController", //方法名 method = "arg1", //拦截时刻:入口 location = @Location(Kind.ENTRY)) /** * 拦截类名和方法名 */ public static void anyRead(@ProbeClassName String pcn, @ProbeMethodName String pmn, AnyType[] args) { BTraceUtils.printArray(args); BTraceUtils.println(pcn + "," + pmn); BTraceUtils.println(); } } 拦截结果: 192:Btrace apple$ jps -l 369 5889 /Users/apple/Downloads/performance/apache-jmeter-4.0/bin/ApacheJMeter.jar 25922 sun.tools.jps.Jps 23011 org.jetbrains.idea.maven.server.RemoteMavenServer 25914 org.jetbrains.jps.cmdline.Launcher 25915 com.techstar.monitordemo.MonitordemoApplication 192:Btrace apple$ btrace 25915 PrintArgSimple.java [zuozewei, ] com.techstar.monitordemo.controller.UserController,arg1 [zee, ] com.techstar.monitordemo.controller.UserController,arg1 拦截构造函数 构造函数 @Data public class User { private int id; private String name; } control方法 @GetMapping(value = "/arg2") public User arg2(User user) { return user; } BTrace脚本 /** * 拦截构造函数 */ @BTrace public class PrintConstructor { @OnMethod(clazz = "com.techstar.monitordemo.domain.User", method = "<init>") public static void anyRead(@ProbeClassName String pcn, @ProbeMethodName String pmn, AnyType[] args) { BTraceUtils.println(pcn + "," + pmn); BTraceUtils.printArray(args); BTraceUtils.println(); } } 拦截结果 192:Btrace apple$ btrace 34119 PrintConstructor.java com.techstar.monitordemo.domain.User,<init> [1, zuozewei, ] 拦截同名函数,以参数区分 control方法 @GetMapping(value = "/same1") public String same(@RequestParam("name") String name) { return "7DGroup," + name; } @GetMapping(value = "/same2") public String same(@RequestParam("id") int id, @RequestParam("name") String name) { return "7DGroup," + name + "," + id; } BTrace脚本 /** * 拦截同名函数,通过输入的参数区分 */ @BTrace public class PrintSame { @OnMethod(clazz = "com.techstar.monitordemo.controller.UserController", method = "same") public static void anyRead(@ProbeClassName String pcn, @ProbeMethodName String pmn, String name) { BTraceUtils.println(pcn + "," + pmn + "," + name); BTraceUtils.println(); } } 拦截结果 192:Btrace apple$ jps -l 369 5889 /Users/apple/Downloads/performance/apache-jmeter-4.0/bin/ApacheJMeter.jar 34281 sun.tools.jps.Jps 34220 org.jetbrains.jps.cmdline.Launcher 34221 com.techstar.monitordemo.MonitordemoApplication 192:Btrace apple$ btrace 34221 PrintSame.java com.techstar.monitordemo.controller.UserController,same,zuozewei com.techstar.monitordemo.controller.UserController,same,zuozewei com.techstar.monitordemo.controller.UserController,same,zuozewei 拦截方法返回值 BTrace脚本 /** * 拦截返回值 */ @BTrace public class PrintReturn { @OnMethod(clazz = "com.techstar.monitordemo.controller.UserController", method = "arg1", //拦截时刻:返回值 location = @Location(Kind.RETURN)) public static void anyRead(@ProbeClassName String pcn, @ProbeMethodName String pmn, @Return AnyType result) { BTraceUtils.println(pcn + "," + pmn + "," + result); BTraceUtils.println(); } } 拦截结果 192:Btrace apple$ jps -l 34528 org.jetbrains.jps.cmdline.Launcher 34529 com.techstar.monitordemo.MonitordemoApplication 369 5889 /Users/apple/Downloads/performance/apache-jmeter-4.0/bin/ApacheJMeter.jar 34533 sun.tools.jps.Jps 192:Btrace apple$ btrace 34529 PrintReturn.java com.techstar.monitordemo.controller.UserController,arg1,7DGroup,zuozewei 异常分析 有时候开发人员对异常处理不合理,导致某些重要异常人为被吃掉,并且没有日志或者日志不详细,导致性能分析定位问题困难,我们可以使用BTrace来处理 control方法 @GetMapping(value = "/exception") public String exception() { try { System.out.println("start..."); System.out.println(1 / 0); //模拟异常 System.out.println("end..."); } catch (Exception e) {} return "successful..."; } BTrace脚本 /** * 有时候,有些异常被人为吃掉,日志又没有打印,这个时候可以用该类定位问题 * This example demonstrates printing stack trace * of an exception and thread local variables. This * trace script prints exception stack trace whenever * java.lang.Throwable's constructor returns. This way * you can trace all exceptions that may be caught and * "eaten" silently by the traced program. Note that the * assumption is that the exceptions are thrown soon after * creation [like in "throw new FooException();"] rather * that be stored and thrown later. */ @BTrace public class PrintOnThrow { // store current exception in a thread local // variable (@TLS annotation). Note that we can't // store it in a global variable! @TLS static Throwable currentException; // introduce probe into every constructor of java.lang.Throwable // class and store "this" in the thread local variable. @OnMethod(clazz = "java.lang.Throwable", method = "<init>") public static void onthrow(@Self Throwable self) { currentException = self; } @OnMethod(clazz = "java.lang.Throwable", method = "<init>") public static void onthrow1(@Self Throwable self, String s) { currentException = self; } @OnMethod(clazz = "java.lang.Throwable", method = "<init>") public static void onthrow1(@Self Throwable self, String s, Throwable cause) { currentException = self; } @OnMethod(clazz = "java.lang.Throwable", method = "<init>") public static void onthrow2(@Self Throwable self, Throwable cause) { currentException = self; } // when any constructor of java.lang.Throwable returns // print the currentException's stack trace. @OnMethod(clazz = "java.lang.Throwable", method = "<init>", location = @Location(Kind.RETURN)) public static void onthrowreturn() { if (currentException != null) { Threads.jstack(currentException); BTraceUtils.println("====================="); currentException = null; } } } 拦截结果 192:Btrace apple$ jps -l 369 5889 /Users/apple/Downloads/performance/apache-jmeter-4.0/bin/ApacheJMeter.jar 34727 sun.tools.jps.Jps 34666 org.jetbrains.jps.cmdline.Launcher 34667 com.techstar.monitordemo.MonitordemoApplication 192:Btrace apple$ btrace 34667 PrintOnThrow.java java.lang.ClassNotFoundException: org.apache.catalina.webresources.WarResourceSet java.net.URLClassLoader.findClass(URLClassLoader.java:381) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader.loadClass(ClassLoader.java:411) sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) java.lang.ClassLoader.loadClass(ClassLoader.java:357) org.apache.catalina.webresources.StandardRoot.isPackedWarFile(StandardRoot.java:656) org.apache.catalina.webresources.CachedResource.validateResource(CachedResource.java:109) org.apache.catalina.webresources.Cache.getResource(Cache.java:69) org.apache.catalina.webresources.StandardRoot.getResource(StandardRoot.java:216) org.apache.catalina.webresources.StandardRoot.getResource(StandardRoot.java:206) org.apache.catalina.mapper.Mapper.internalMapWrapper(Mapper.java:1027) org.apache.catalina.mapper.Mapper.internalMap(Mapper.java:842) org.apache.catalina.mapper.Mapper.map(Mapper.java:698) org.apache.catalina.connector.CoyoteAdapter.postParseRequest(CoyoteAdapter.java:679) org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:336) org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:800) org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:800) org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1471) org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) java.lang.Thread.run(Thread.java:748) ===================== ... 定位某个超过阈值的函数 BTrace脚本 ** * 探测某个包路径下的方法执行时间是否超过某个阈值的程序,如果超过了该阀值,则打印当前线程的栈信息。 */ import com.sun.btrace.BTraceUtils; import com.sun.btrace.annotations.*; import static com.sun.btrace.BTraceUtils.*; @BTrace public class PrintDurationTracer { @OnMethod(clazz = "/com\\.techstar\\.monitordemo\\..*/", method = "/.*/", location = @Location(Kind.RETURN)) public static void trace(@ProbeClassName String pcn, @ProbeMethodName String pmn, @Duration long duration) { //duration的单位是纳秒 if (duration > 1000 * 1000 * 2) { BTraceUtils.println(Strings.strcat(Strings.strcat(pcn, "."), pmn)); BTraceUtils.print(" 耗时:"); BTraceUtils.print(duration); BTraceUtils.println("纳秒,堆栈信息如下"); jstack(); } } } 拦截结果 192:Btrace apple$ btrace 39644 PrintDurationTracer.java com.techstar.monitordemo.controller.Adder.execute 耗时:1715294657纳秒,堆栈信息如下 com.techstar.monitordemo.controller.Adder.execute(Adder.java:13) com.techstar.monitordemo.controller.Main.main(Main.java:10) com.techstar.monitordemo.controller.Adder.execute 耗时:893795666纳秒,堆栈信息如下 com.techstar.monitordemo.controller.Adder.execute(Adder.java:13) com.techstar.monitordemo.controller.Main.main(Main.java:10) com.techstar.monitordemo.controller.Adder.execute 耗时:1331363658纳秒,堆栈信息如下 com.techstar.monitordemo.controller.Adder.execute(Adder.java:13) 追踪方法执行时间 BTrace脚本 /** * 追踪某个方法的执行时间,实现原理同AOP一样。 */ @BTrace public class PrintExecuteTimeTracer { @TLS static long beginTime; @OnMethod(clazz = "com.techstar.monitordemo.controller.Adder", method = "execute") public static void traceExecuteBegin() { beginTime = timeMillis(); } @OnMethod(clazz = "com.techstar.monitordemo.controller.Adder", method = "execute", location = @Location(Kind.RETURN)) public static void traceExecute(int arg1, int arg2, @Return int result) { BTraceUtils.println(strcat(strcat("Adder.execute 耗时:", str(timeMillis() - beginTime)), "ms")); BTraceUtils.println(strcat("返回结果为:", str(result))); } } 拦截结果 192:Btrace apple$ btrace 40863 PrintExecuteTimeTracer.java Adder.execute 耗时:803ms 返回结果为:797 Adder.execute 耗时:1266ms 返回结果为:1261 Adder.execute 耗时:788ms 返回结果为:784 Adder.execute 耗时:1524ms 返回结果为:1521 Adder.execute 耗时:1775ms 性能分析 压测的时候经常发现某一个服务变慢了,但是由于这个服务有很多的业务逻辑和方法构成,这个时候就不好定位到底慢在哪个地方。BTrace可以解决这个问题,只需要大概定位问题可能存在的地方,通过包路径模糊匹配,就可以找到问题。 BTrace脚本 /** * * Description: * This script demonstrates new capabilities built into BTrace 1.2 * Shortened syntax - when omitting "public" identifier in the class * definition one can safely omit all other modifiers when declaring methods * and variables * Extended syntax for @ProbeMethodName annotation - you can use * parameter to request a fully qualified method name instead of * the short one * Profiling support - you can use {@linkplain Profiler} instance to gather * performance data with the smallest overhead possible */ @BTrace class Profiling { @Property Profiler profiler = BTraceUtils.Profiling.newProfiler(); @OnMethod(clazz = "/com\\.techstar\\..*/", method = "/.*/") void entry(@ProbeMethodName(fqn = true) String probeMethod) { BTraceUtils.Profiling.recordEntry(profiler, probeMethod); } @OnMethod(clazz = "/com\\.techstar\\..*/", method = "/.*/", location = @Location(value = Kind.RETURN)) void exit(@ProbeMethodName(fqn = true) String probeMethod, @Duration long duration) { BTraceUtils.Profiling.recordExit(profiler, probeMethod, duration); } @OnTimer(5000) void timer() { BTraceUtils.Profiling.printSnapshot("Performance profile", profiler); } 死锁排查 我们怀疑程序是否有死锁,可以通过以下的脚本扫描追踪,非常简单方便。 /** * This BTrace program demonstrates deadlocks * built-in function. This example prints * deadlocks (if any) once every 4 seconds. */ @BTrace public class PrintDeadlock { @OnTimer(4000) public static void print() { deadlocks(); } } 小结 BTrace是一个事后工具,所谓的事后工具就是在服务已经上线或者压测后,但是发现有问题的时候,可以使用BTrace动态跟踪分析。 比如哪些方法执行太慢,例如监控方法执行时间超过1秒的方法; 查看哪些方法调用了system.gc( ),调用栈是怎样的; 查看方法的参数和属性 哪些方法发生了异常 ..... 总之,这里只是将部分经常用的列举了下抛砖引玉,还有很多没有列举,大家可以参考官方的其他Sample去玩下。

优秀的个人博客,低调大师

Pytorch量化入门之超分量化(二)

2 --> 最近Happy在尝试进行图像超分的INT8量化,发现:pytorch量化里面的坑真多,远不如TensorFlow的量化好用。不过花了点时间终于还是用pytorch把图像超分模型完成了量化,以EDSR为例,模型大小73%,推理速度提升40%左右(PC端),视觉效果几乎无损,定量指标待补充。有感于网络上介绍量化的博客一堆,但真正有帮助的较少,所以Happy会尽量以图像超分为例提供一个完整的可复现的量化示例。 在前面的文章中,笔者对Pytorch的“Post Training Static Quantization,PTSQ”进行了原理性的介绍。接下来,我们将以EDSR这个图像超分网络为例进行说明。 准备工作 在真正开始量化之前,我们需要准备好要进行量化的模型,本文以EDSR-baseline模型为基础进行。所以大家可以直接下载官方预训练模型,EDSR的Pytorch官方实现code连接如下: github.com/thstkdgus35/EDSR-PyTorch EDSRx4-baseline预训练模型下载连接如下: https://cv.snu.ac.kr/research/EDSR/models/edsr_baseline_x4-6b446fab.pt 除了要准备上述预训练模型与code外,我们还需要准备校验数据,在这里笔者采用的DIV2K数据,该数据集下载链接如下: https://cv.snu.ac.kr/research/EDSR/DIV2K.tar 模型转换 正如上一篇文章所介绍的,在量化之前需要对模型进行op融合操作,而EDSR官方的实现code是对于融合操作是不太方便的,所以笔者对EDSR进行了一些实现上的调整。调整成如下形式(注:这里的实现code部分参数写成了固定参数): classResBlock(nn.Module):def__init__(self,channels=64):super(ResBlock,self).__init__()self.conv1=nn.Conv2d(channels,channels,3,1,1)self.relu=nn.ReLU(inplace=True)self.conv2=nn.Conv2d(channels,channels,3,1,1)defforward(self,x):identity=xconv1=self.conv1(x)relu=self.relu(conv1)conv2=self.conv2(relu)output=conv2+identityreturnoutputclassEDSR(nn.Module):def__init__(self,num_blocks=16,num_features=64,block=ResBlock):super(EDSR,self).__init__()self.head=nn.Conv2d(3,num_features,3,1,1)body=[block(num_features)for_inrange(num_blocks)]body.append(nn.Conv2d(num_features,num_features,3,1,1))self.body=nn.Sequential(*body)self.tail=nn.Sequential(nn.Conv2d(num_features,num_features*4,3,1,1),nn.PixelShuffle(upscale_factor=2),nn.Conv2d(num_features,num_features*4,3,1,1),nn.PixelShuffle(upscale_factor=2),nn.Conv2d(num_features,3,3,1,1))defforward(self,x,**kwargs):x=self.head(x)res=self.body(x)res+=xx=self.tail(res)returnx 也许有同学会说,模型转换后原始的预训练模型还能导入吗?直接导入肯定是不行的,checkpoint的key发生了变化,所以我们需要对下载的checkpoint进行一下简单的转换。checkpoint的转换code如下(注:这些转换可以都是写死的,已经确认过的): checkpoint=torch.load("edsr_baseline_x4-6b446fab.pt",map_location='cpu')newStateDict=OrderedDict()forkey,valincheckpoint.items():if'head'inkey:newStateDict[key.replace('.0.','.')]=valelif'mean'inkey:continue#newStateDict[key]=valelif'tail'inkey:if'.0.0.'inkey:newStateDict[key.replace('.0.0.','.0.')]=valelif'.0.2.'inkey:newStateDict[key.replace('.0.2.','.2.')]=valelse:newStateDict[key.replace('.1.','.4.')]=valelif'body'inkey:if'.body.0.'inkey:newStateDict[key.replace(".body.0.",'.conv1.')]=valelif'.body.2.'inkey:newStateDict[key.replace(".body.2.",'.conv2.')]=valelif"16"inkey:newStateDict[key]=valtorch.save(newStateDict,"edsr-baseline-fp32.pth.tar") 对比原始code的同学应该会发现:EDSR中的add_mean与sub_mean不见了。是的,笔者将add_mean与sub_mean移到了网络外面,不对其进行量化,具体为什么这样做,见后面的介绍。 除了上述操作外,我们还需要提供前述EDSR实现的量化版本模型,这个没太多需要介绍的,直接看code(主要体现在三点:插入量化节点(即QuantStub与DequantStub)、add转换(即FloatFunctional)、fuse_model模块(即fuse_model函数)): classQuantizableResBlock(ResBlock):def__init__(self,*args,**kwargs):super(QuantizableResBlock,self).__init__(*args,**kwargs)self.add=FloatFunctional()defforward(self,x):identity=xconv1=self.conv1(x)relu=self.relu(conv1)conv2=self.conv2(relu)output=self.add.add(identity,conv2)returnoutputdeffuse_model(self):fuse_modules(self,['conv1','relu'],inplace=True)classQuantizableEDSR(EDSR):def__init__(self,*args,**kwargs):super(QuantizableEDSR,self).__init__(*args,**kwargs)self.quant=QuantStub()self.dequant=DeQuantStub()self.add=FloatFunctional()defforward(self,x):x=self.quant(x)x=self.head(x)res=self.body(x)res=self.add.add(res,x)x=self.tail(res)x=self.dequant(x)returnxdeffuse_model(self):forminself.modules():iftype(m)==QuantizableResBlock:m.fuse_model() 模型量化 在上一篇文章中,我们也介绍了PTSQ的几个步骤(额外包含了模型的构建与保存)。 init: 模型的定义、预训练模型加载、inplace操作替换为非inplace操作; config:定义量化时的配置方式,这里以fbgemm为例,它的activation量化方式为Historam,weight量化方式为per_channel; fuse:模型中的op融合,比如相邻的Conv+ReLU融合,Conv+BN+ReLU融合等等; prepare: 量化前的准备工作,也就是对每个需要进行量化的op插入Observer; feed: 送入校验数据,前面插入的Observer会针对这些数据进行量化前的信息统计; convert:用于在将非量化op转换成量化op,比如将nn.Conv2d转换成nnq.Conv2d, 同时会根据Observer所观测的信息进行nnq.Conv2d中的量化参数的统计,包含scale、zero_point、qweight等; save:用于保存量化好的模型参数. Init 模型的创建与预训练模型,这个比较简单了,直接上code(注:PTSQ模式下模型应当是eval模式)。 checkpoint=torch.load("edsrx4-baseline-fp32.pth.tar")model=QuantizableEDSR(block=QuantizableResBlock)model.load_state_dict(checkpoint)_replace_relu(model)model.eval() config 这个步骤主要是为了指定与推理引擎搭配的一些量化方式,比如X86平台应该采用fbgemm方式进行量化,而ARM平台则应当采用qnnpack方式量化。本文主要是在PC端进行,所以选择了fbgemm进行,相关配置信息如下: backend='fbgemm'torch.backends.quantized.engine=backendmodel.qconfig=torch.quantization.QConfig(activation=default_histogram_observer,weight=default_per_channel_weight_observer) Fuse&Prepare Fuse与Prepare两个步骤的作用主要是 进行OP的融合,比如Conv+ReLU的融合,Conv+BN+ReLU的融合,这个可以见前述实现code中的'fuse_model',pytorch目前提供了几种类型的融合。我们只需知道就可以了,这块不用太过关心,两行code就可以完成: model.fuse_model()torch.quantization.prepare(model,inplace=True) 插入Observer,在每个需要进行量化的op中插入Observer,不同的量化方式会有不同的Observer,它将对喂入的校验数据进行统计,比如统计数据的最大值、最小值、直方图分布等等。 Feed 这个步骤需要采用校验数据喂入到上述准备好的模型中,这个就比较简单了,按照常规模型的测试方式处理就可以了,参考code如下: 注:笔者这里用了100张数据,这个用全部也可以,不过耗时会更长meanBGR=torch.FloatTensor((0.4488,0.4371,0.4040)).view(3,1,1)*255data_root="${DIV2K_train_LR_bicubic/X4}"forindexinrange(1,100):image_path=os.path.join(data_root,f"{index:04d}.png")inputs=preprocess(image_path)inputs-=meanBGRwithtorch.no_grad():output=model(inputs) Convert&Save 在完成前面几个步骤后,我们就可以将浮点类型的模型进行量化了,这个只需要一行code就可以。在转换过程中,它会将nn.Conv2d这类浮点类型op转换成量化版op:nnq.Conv2d。 torch.quantization.convert(model,inplace=True)torch.save(model.state_dict(),"edsrx4-baseline-qint8.pth.tar") 经过上面的几个步骤,我们就完成了EDSR模型的INT8量化,也将其进行了保存。也就是说完成了初步的量化工作,因为接下来的测试论证很关键,如果量化损失很严重也不行的。 量化模型测试 接下来,我们对上述量化好的模型进行一下测试看看效果。量化模型的调用code如下(与常规模型的调用有一点点的区别): deffp32edsr(block=ResBlock,pretrained=None):model=EDSR(block=block)ifpretrained:state_dict=torch.load(pretrained,map_location="cpu")model.load_state_dict(state_dict)returnmodeldefqint8edsr(block=QuantizableResBlock,pretrained=None,quantize=False):model=QuantizableEDSR(block=block)_replace_relu(model)ifquantize:backend='fbgemm'quantize_model(model,backend)else:assertpretrainedin[True,False]ifpretrained:state_dict=torch.load(pretrained,map_location="cpu")model.load_state_dict(state_dict)returnmodeldefquantize_model(model,backend):ifbackendnotintorch.backends.quantized.supported_engines:raiseRuntimeError("Quantizedbackendnotsupported")torch.backends.quantized.engine=backendmodel.eval()_dummy_input_data=torch.rand(1,3,64,64)#Makesurethatweightqconfigmatchesthatoftheserializedmodelsifbackend=='fbgemm':model.qconfig=torch.quantization.QConfig(activation=torch.quantization.default_histogram_observer,weight=torch.quantization.default_per_channel_weight_observer)elifbackend=='qnnpack':model.qconfig=torch.quantization.QConfig(activation=torch.quantization.default_histogram_observer,weight=torch.quantization.default_weight_observer)model.fuse_model()torch.quantization.prepare(model,inplace=True)model(_dummy_input_data)torch.quantization.convert(model,inplace=True) 从上面code可以看到:相比fp32模型,量化模型多了两步骤: replace=True的op替换为replace=False的op; 模型的最简单量化版本,完成初步的op替换。 结合上述code,我们就可以直接对DIV2K数据进行测试了,测试的部分code摘录如下: index=1image_path=os.path.join(data_root,f"{index:04d}.png")inputs=preprocess(image_path)inputs-=meanBGRwithtorch.no_grad():output1=model(inputs)output2=fmodel(inputs)output1+=meanBGRoutput2+=meanBGRshow1=post_process(output1)cv2.imwrite(f"results/{index:03d}-init8.png",show1)show2=post_process(output2)cv2.imwrite(f"results/{index:03d}-fp32.png",show2) image-20210203144114765 上图给出了DIV2K训练集中0016的两种模型的效果对比,左图为FP32模型的超分效果,右图为INT8量化模型的超分效果。可以看到:量化后模型在效果上是视觉无损的(就是说:量化损失导致的效果下降不可感知)。总而言之,量化前后模型的对比可以参考下表(PC端测试,测试数据为DIV2K,速度为平均速度)。 FP32 INT8 压缩/提速 ModelSize 5953K 1610K 73% Speed 5.94s 3.39s 43% 注意事项 为什么要将add_mean与sub_mean移到网络外面不参与量化呢? 从我们的量化对比来看,将其移到外面效果更佳。可能也跟add_mean与sub_mean中的参数有关,两者只是简单的均值处理, 这个地方的量化会导致weight值出现较大偏差,进而影响后续的量化精度。 在量化方式方面,该如何选择呢? 在量化方式方面,activation支持:HistogramObserver,MinMaxObserver,, weight支持:PerChannelMinMaxObserver,MinMaxObserver. 从我们的量化对比来看,Histogram+PerChannelMinMax这种组合要比MinMaxObserver+PerChannelMinMax更佳。下图给出了DIV2K训练集中0018数据采用第二种量化组合效果对比,可以感知到明显的量化损失。 image-20210203145236068

优秀的个人博客,低调大师

干货丨Orca 数据加载入门教程

本文介绍在Orca中加载数据的方法。 1 建立数据库连接 在Orca中通过connect函数连接到DolphinDB服务器: >>> import dolphindb.orca as orca >>> orca.connect(MY_HOST, MY_PORT, MY_USERNAME, MY_PASSWORD) 2 导入数据 下面的教程使用了一个数据文件:quotes.csv。 2.1 read_csv函数 Orca提供read_csv函数,用于导入数据集。需要说明的是,Orca的read_csv函数的engine参数的取值可以是{‘c’, ‘python’, ‘dolphindb’},且该参数默认取值为‘dolphindb’。当取值为‘dolphindb’时,read_csv函数会在DolphinDB服务器目录下寻找要导入的数据文件。当取值为‘python’或‘c’时,read_csv函数会在python客户端的目录下寻找要导入的数据文件。 请注意,当engine参数设置为‘python’或者‘c’时,Orca的 read_csv函数相当于调用了pandas的 read_csv函数进行导入。本节是基于engine参数取值为‘dolphindb’的前提下对Orca的 read_csv函数进行讲解。 当engine参数设置为‘dolphindb’时,Orca的read_csv函数目前支持的参数如下: path:文件路径 sep:分隔符 delimiter:分隔符 names:指定列名 index_col:指定作为index的列 engine:进行导入的引擎 usecols:指定要导入的列 squeeze:当数据文件只有一行或者一列时,是否将DataFrame压缩成Series prefix:给每列加上的前缀字符串 dtype:指定数据类型导入 partitioned:是否允许以分区的方式导入数据 db_handle:要导入的数据库路径 table_name:要导入的表名 partition_columns:进行分区的列名 下面详细介绍Orca与pandas实现有所不同的几个参数。 dtype参数 Orca在导入csv的时候会自动识别要导入文件的数据类型,支持各种通用时间格式。用户也可以通过dtype参数来强制指定数据类型。 需要说明的是,Orca的read_csv函数不仅支持指定各种numpy的数据类型(np.bool, np.int8, np.float32, etc.),还支持支持以字符串的方式指定DolphinDB的提供的所有数据类型,包括所有时间类型和字符串类型。 例如: dfcsv = orca.read_csv("DATA_DIR/quotes.csv", dtype={"TIME": "NANOTIMESTAMP", "Exchange": "SYMBOL", "SYMBOL": "SYMBOL", "Bid_Price": np.float64, "Bid_Size": np.int32, "Offer_Price": np.float64, "Offer_Size": np.int32}) partitioned参数 bool类型,默认为True。该参数为True时,在数据规模达到一定程度时,会将数据导入为分区内存表,如果设置为False,会直接将csv导入为未经分区的DolphinDB普通内存表。 请注意:Orca的分区表与Orca的内存表相比,在操作时也存在许多差异,具体见 Orca分区表的特殊差异。若您的数据量不是很大,且在使用Orca时对Orca与pandas的一致性要求更高,请尽量不要将数据以分区的方式导入。若您数据量极大,对性能要求极高,则建议您采用分区方式导入数据。 db_handle,table_name以及partition_columns参数 Orca的read_csv还支持db_handle,table_name和partition_columns这3个参数,这些参数用于在导入数据的时通过指定DolphinDB的数据库和表等相关信息,将数据导入到DolphinDB的分区表。 DolphinDB支持通过多种方式将数据导入DolphinDB数据库,Orca在调用read_csv函数时指定db_handle, table_name以及partition_columns参数,本质上是调用DolphinDB的loadTextEx函数,通过这种方式,我们可以直接将数据直接导入DolphinDB的分区表。 2.1.1 导入到内存表 导入为内存分区表 直接调用read_csv函数,数据会并行导入。由于采用并行导入,导入速度快,但是对内存占用是普通表的两倍。下面的例子中'DATA_DIR'为数据文件存放的路径。 >>> DATA_DIR = "dolphindb/database" # e.g. data_dir >>> df = orca.read_csv(DATA_DIR + "/quotes.csv") >>> df.head() # output time Exchange Symbol Bid_Price Bid_Size \ 0 2017-01-01 04:40:11.686699 T AAPL 0.00 0 1 2017-01-01 06:42:50.247631 P AAPL 26.70 10 2 2017-01-01 07:00:12.194786 P AAPL 26.75 5 3 2017-01-01 07:15:03.578071 P AAPL 26.70 10 4 2017-01-01 07:59:39.606882 K AAPL 26.90 1 Offer_Price Offer_Size 0 27.42 1 1 27.47 1 2 27.47 1 3 27.47 1 4 0.00 0 导入为普通内存表 partitioned参数取值为False,导入为普通内存表。导入对内存要求低,但是计算速度略低于上面的导入方式: df = orca.read_csv(DATA_DIR + "/quotes.csv", partitioned=False) 2.1.2 导入到磁盘表 DolphinDB的分区表可以保存在本地磁盘,也可以保存在dfs上,磁盘分区表与分布式表的区别就在于分布式表的数据库路径以"dfs://"开头,而磁盘分区表的数据库路径是本地路径。 示例 我们在DolphinDB服务端创建一个磁盘分区表,下面的脚本中,'YOUR_DIR'为保存磁盘数据库的路径: dbPath=YOUR_DIR + "/demoOnDiskPartitionedDB" login('admin', '123456') if(existsDatabase(dbPath)) dropDatabase(dbPath) db=database(dbPath, RANGE, datehour(2017.01.01 00:00:00+(0..24)*3600)) 请注意:以上两段脚本需要在DolphinDB服务端执行,在Python客户端中则可以通过DolphinDB Python API执行脚本。 在Python客户端中调用Orca的read_csv函数,指定数据库db_handle为磁盘分区数据库YOUR_DIR + "/demoOnDiskPartitionedDB",指定表名table_name为"quotes"和进行分区的列partition_columns为"time",将数据导入到DolphinDB的磁盘分区表,并返回一个表示DolphinDB数据表的对象给df,用于后续计算。 >>> df = orca.read_csv(path=DATA_DIR+"/quotes.csv", dtype={"Exchange": "SYMBOL", "SYMBOL": "SYMBOL"}, db_handle=YOUR_DIR + "/demoOnDiskPartitionedDB", table_name="quotes", partition_columns="time") >>> df # output <'dolphindb.orca.core.frame.DataFrame' object representing a column in a DolphinDB segmented table> >>> df.head() # output time Exchange Symbol Bid_Price Bid_Size \ 0 2017-01-01 04:40:11.686699 T AAPL 0.00 0 1 2017-01-01 06:42:50.247631 P AAPL 26.70 10 2 2017-01-01 07:00:12.194786 P AAPL 26.75 5 3 2017-01-01 07:15:03.578071 P AAPL 26.70 10 4 2017-01-01 07:59:39.606882 K AAPL 26.90 1 Offer_Price Offer_Size 0 27.42 1 1 27.47 1 2 27.47 1 3 27.47 1 4 0.00 0 将上述过程整合成的Python中可执行的脚本如下: >>> s = orca.default_session() >>> DATA_DIR = "/dolphindb/database" # e.g. data_dir >>> YOUR_DIR = "/dolphindb/database" # e.g. database_dir >>> create_onDiskPartitioned_database = """ dbPath="{YOUR_DIR}" + "/demoOnDiskPartitionedDB" login('admin', '123456') if(existsDatabase(dbPath)) dropDatabase(dbPath) db=database(dbPath, RANGE, datehour(2017.01.01 00:00:00+(0..24)*3600)) """.format(YOUR_DIR=YOUR_DIR) >>> s.run(create_onDiskPartitioned_database) >>> df = orca.read_csv(path=DATA_DIR+"/quotes.csv", dtype={"Exchange": "SYMBOL", "SYMBOL": "SYMBOL"}, db_handle=YOUR_DIR + "/demoOnDiskPartitionedDB", table_name="quotes", partition_columns="time") 上述脚本中,我们使用的defalut_session实际上就是通过orca.connect函数创建的会话,在Python端,我们可以通过这个会话与DolphinDB服务端进行交互。关于更多功能,请参见DolphinDB Python API。 请注意:在通过 read_csv函数指定数据库导入数据之前,需要确保在DolphinDB服务器上已经创建了对应的数据库。 read_csv函数根据指定的数据库,表名和分区字段导入数据到DolphinDB数据库中,若表存在则追加数据,若表不存在则创建表并且导入数据。 2.1.3 导入到分布式表 read_csv函数若指定db_handle参数为dfs数据库路径,则数据将直接导入到DolphinDB的dfs数据库中。 示例 请注意只有启用enableDFS=1的集群环境或者DolphinDB单例模式才能使用分布式表。 与磁盘分区表类似,首先需要在DolphinDB服务器上创建分布式表,只需要将数据库路径改为"dfs://"开头的字符串即可。 dbPath="dfs://demoDB" login('admin', '123456') if(existsDatabase(dbPath)) dropDatabase(dbPath) db=database(dbPath, RANGE, datehour(2017.01.01 00:00:00+(0..24)*3600)) 在Python客户端中调用Orca的read_csv函数,指定数据库db_handle为分布式数据库"dfs://demoDB",指定表名table_name为"quotes"和进行分区的列partition_columns为"time",将数据导入到DolphinDB的分布式表。 >>> df = orca.read_csv(path=DATA_DIR+"/quotes.csv", dtype={"Exchange": "SYMBOL", "SYMBOL": "SYMBOL"}, db_handle="dfs://demoDB", table_name="quotes", partition_columns="time") >>> df # output <'dolphindb.orca.core.frame.DataFrame' object representing a column in a DolphinDB segmented table> >>> df.head() # output time Exchange Symbol Bid_Price Bid_Size \ 0 2017-01-01 04:40:11.686699 T AAPL 0.00 0 1 2017-01-01 06:42:50.247631 P AAPL 26.70 10 2 2017-01-01 07:00:12.194786 P AAPL 26.75 5 3 2017-01-01 07:15:03.578071 P AAPL 26.70 10 4 2017-01-01 07:59:39.606882 K AAPL 26.90 1 Offer_Price Offer_Size 0 27.42 1 1 27.47 1 2 27.47 1 3 27.47 1 4 0.00 0 将上述过程整合成的Python中可执行的脚本如下: >>> s = orca.default_session() >>> DATA_DIR = "/dolphindb/database" # e.g. data_dir >>> create_dfs_database = """ dbPath="dfs://demoDB" login('admin', '123456') if(existsDatabase(dbPath)) dropDatabase(dbPath) db=database(dbPath, RANGE, datehour(2017.01.01 00:00:00+(0..24)*3600)) """ >>> s.run(create_dfs_database) >>> df = orca.read_csv(path=DATA_DIR+"/quotes.csv", dtype={"Exchange": "SYMBOL", "SYMBOL": "SYMBOL"}, db_handle="dfs://demoDB", table_name="quotes", partition_columns="time") 2.2 read_table函数 Orca提供read_table函数,通过该函数指定DolphinDB数据库和表名来加载DolphinDB数据表的数据,可以用于加载DolphinDB的磁盘表、磁盘分区表和分布式表。若您已在DolphinDB中创建了数据库和表,则可以直接在Orca中调用该函数加载存放在DolphinDB服务端中的数据,read_table函数支持的参数如下: database:数据库名称 table:表名 partition:需要导入的分区,可选参数 加载DolphinDB的磁盘表 read_table函数可以用于加载DolphinDB的磁盘表,首先在DolphinDB服务端创建一个本地磁盘表: >>> s = orca.default_session() >>> YOUR_DIR = "/dolphindb/database" # e.g. database_dir >>> create_onDisk_database=""" saveTable("{YOUR_DIR}"+"/demoOnDiskDB", table(2017.01.01..2017.01.10 as date, rand(10.0,10) as prices), "quotes") """.format(YOUR_DIR=YOUR_DIR) >>> s.run(create_onDisk_database) 通过read_table函数加载磁盘表: >>> df = orca.read_table(YOUR_DIR + "/demoOnDiskDB", "quotes") >>> df.head() # output date prices 0 2017-01-01 8.065677 1 2017-01-02 2.969041 2 2017-01-03 3.688191 3 2017-01-04 4.773723 4 2017-01-05 5.567130 请注意: read_table函数要求所要导入的数据库和表在DolphinDB服务器上已经存在,若只存在数据库和没有创建表,则不能将数据成功导入到Python中。 加载DolphinDB的磁盘分区表 对于已经在DolphinDB上创建的数据表,可以通过read_table函数直接加载。例如,加载2.1.2小节中创建的磁盘分区表: >>> df = orca.read_table(YOUR_DIR + "/demoOnDiskPartitionedDB", "quotes") 加载DolphinDB的分布式表 分布式表同样可以通过read_table函数加载。例如,加载2.1.3小节中创建的分布式表: >>> df = orca.read_table("dfs://demoDB", "quotes") 2.3 from_pandas函数 Orca提供from_pandas函数,该函数接受一个pandas的DataFrame作为参数,返回一个Orca的DataFrame,通过这个方式,Orca可以直接加载原先存放在pandas的DataFrame中的数据。 >>> import pandas as pd >>> import numpy as np >>> pdf = pd.DataFrame(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), columns=['a', 'b', 'c']) >>> odf = orca.from_pandas(pdf) 3 对其它格式文件的支持 对于其它数据格式的导入,Orca也提供了与pandas类似的接口。这些方法包括:read_pickle,read_fwf,read_msgpack,read_clipboard,read_excel,read_json,json_normalize,build_table_schema,read_html,read_hdf,read_feather,read_parquet,read_sas,read_sql_table,read_sql_query,read_sql,read_gbq,read_stata。

优秀的个人博客,低调大师

鸿蒙应用开发入门(五):日志HiLog的使用

目录: 5.1 概述 5.2 HiLog使用说明 5.3. 写demo练习 5.1 概述 做一个Java攻城师, 我们除了关心系统的架构这种high level的问题,还需要了解一些语言的陷阱, 异常的处理, 以及日志的输出, 这些"鸡毛蒜皮"的细节。 我们需要通过打印一条条日志来掌握程序运行的状态,下面我们就来讲解鸿蒙系统中的HiLog日志工具的具体使用方法。 5.2 HiLog使用说明 1. 使用HiLog前必须在HiLog的一个辅助类HiLogLabel中定义日志类型、服务域和标记。一般我们把它定义为常量放在类的最上面: static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 0x00201, "MY_TAG"); 上面有三个参数: 1)日志类型,我们的应用一般取一个常量值:HiLog.LOG_APP,表示是第三方应用。 2)服务域,16进制整数形式,取值范围是0x0 ~ 0xFFFFF。一般情况下,我们建议把这5个16进制数分成两组,前面三个数表示应用中的模块编号,后面两个表示模块中的类的编号。 3)一个字符串常量,它标识方法调用的类或服务行为。一般情况下就写类的名字,一般我可用这个标记对日志进行过滤。 2. 日志的级别,和其他日志一样,HiLog也分成了几个日志级别,由上到下信息越严重: 1)debug:调试信息 2)info:普通信息 3)warn:警告信息 4)error:错误信息 5)fatal:致命错误信息 3. 使用 String url = "www.baidu.com"; int errno = 0; HiLog.warn(label, "Failed to visit %{private}s, reason:%{public}d.", url, errno); 按照上述格式就可用在控制台中输入日志信息了,里边还有点东西,需要进一步解释一下: %{private}s和%{public}d这两个符号我们可用理解为占位符,真正打印到控制台上的值是后面的变量: private:表示私有的,我们在开发阶段的日志中是看得见的,但是运行到手机上后,手机的控制台是隐藏的,看不见的。 public:表示共有的,哪里都看得见,不受限制。 s:表示字符串 d:表示数字 查看更多章节>>> 作者:zhonghongfa 想了解更多内容,请访问: 51CTO和华为官方战略合作共建的鸿蒙技术社区harmonyos.51cto.com

优秀的个人博客,低调大师

超详细的RabbitMQ入门,看这篇就够了!

思维导图 一、什么是消息队列 消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。 “消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。 二、为什么使用消息队列 主要有三个作用: 解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。 异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。 削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。 三、RabbitMQ的特点 RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查: 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。 四、RabbitMQ初の体验 4.1 安装RabbitMQ (Win10系统) 由于只是学习需要,所以安装在win10系统,就懒得开虚拟机。如果用Linux系统安装的话,我建议用Docker拉一个RabbitMQ的镜像下来,这样会方便一点。 4.1.1 安装erLang语言,配置环境变量 首先到erlang官网下载win10版安装包。 下载完之后,就得到这个东西: 接着双击安装,一直点next(下一步)就行了,安装完之后,配置环境变量。 使用cmd命令,输入 erl -version 验证: 4.1.2 安装RabbitMQ服务端 在RabbitMQ的gitHub项目中,下载window版本的服务端安装包。 下载后,就得到这个东西: 接着到双击安装,一直点下一步安装即可,安装完成后,找到安装目录: 在此目录下打开cmd命令,输入rabbitmq-plugins enable rabbitmq_management命令安装管理页面的插件: 然后双击rabbitmq-server.bat启动脚本,然后打开服务管理可以看到RabbitMQ正在运行: 这时,打开浏览器输入http://localhost:15672,账号密码默认是:guest/guest 到这一步,安装就大功告成了! 4.2 永远的Hello Word 服务端搭建好了之后肯定要用客户端去操作,接下来就用Java做一个简单的HelloWord演示。 因为我用的是SpringBoot,所以在生产者这边加入对应的starter依赖即可: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 一般需要创建一个公共项目common,共享一些配置,比如队列主题,交换机名称,路由匹配键名称等等。 首先在application.yml文件加上RabbitMQ的配置信息: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest 然后再生产者这边,加上common包的maven依赖,然后创建一个Direct交换机以及队列的配置类: @Configuration public class DirectRabbitConfig { @Bean public Queue rabbitmqDemoDirectQueue() { /** * 1、name: 队列名称 * 2、durable: 是否持久化 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。 * */ return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false); } @Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交换机 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false); } @Bean public Binding bindDirect() { //链式写法,绑定交换机和队列,并设置匹配键 return BindingBuilder //绑定队列 .bind(rabbitmqDemoDirectQueue()) //到交换机 .to(rabbitmqDemoDirectExchange()) //并设置匹配键 .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING); } } 然后再创建一个发送消息的Service类: @Service public class RabbitMQServiceImpl implements RabbitMQService { //日期格式化 private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Resource private RabbitTemplate rabbitTemplate; @Override public String sendMsg(String msg) throws Exception { try { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); String sendTime = sdf.format(new Date()); Map<String, Object> map = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } } 然后根据业务放在需要用的地方,比如定时任务,或者接口。我这里就简单一点使用Controller层进行发送: @RestController @RequestMapping("/mall/rabbitmq") public class RabbitMQController { @Resource private RabbitMQService rabbitMQService; /** * 发送消息 * @author java技术爱好者 */ @PostMapping("/sendMsg") public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception { return rabbitMQService.sendMsg(msg); } } 生产者写完之后,就写消费者端的代码,消费者很简单。maven依赖,yml文件配置和生产者一样。只需要创建一个类,@RabbitListener注解写上监听队列的名称,如图所示: 这里有个小坑,一开始RabbitMQ服务器里还没有创建队列: 这时如果启动消费者,会报错: 要先启动生产者,发送一条消息: 最后再启动消费者,进行消费: 这时候就会持续监听队列的消息,只要生产者发送一条消息到MQ,消费者就消费一条。我这里尝试发送4条: 由于队列不存在,启动消费者报错的这个问题。最好的方法是生产者和消费者都尝试创建队列,怎么写呢,有很多方式,我这里用一个相对简单一点的: 生产者的配置类加点东西: //实现BeanPostProcessor类,使用Bean的生命周期函数 @Component public class DirectRabbitConfig implements BeanPostProcessor { //这是创建交换机和队列用的rabbitAdmin对象 @Resource private RabbitAdmin rabbitAdmin; //初始化rabbitAdmin对象 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } //实例化bean后,也就是Bean的后置处理器 @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //创建交换机 rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange()); //创建队列 rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue()); return null; } } 这样启动生产者就会自动创建交换机和队列,不用等到发送消息才创建。 消费者需要加一点代码: @Component //使用queuesToDeclare属性,如果不存在则会创建队列 @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC)) public class RabbitDemoConsumer { //...省略 } 这样,无论生产者还是消费者先启动都不会出现问题了~ 代码地址:https://github.com/yehongzhi/mall 五、RabbitMQ中的组成部分 从上面的HelloWord例子中,我们大概也能体验到一些,就是RabbitMQ的组成,它是有这几部分: Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。 Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。 Queue:消息队列,存储消息的队列。 Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。 Consumer:消息消费者。消费队列中存储的消息。 这些组成部分是如何协同工作的呢,大概的流程如下,请看下图: 消息生产者连接到RabbitMQ Broker,创建connection,开启channel。 生产者声明交换机类型、名称、是否持久化等。 生产者发送消息,并指定消息是否持久化等属性和routing key。 exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。 消费者监听接收到消息之后开始业务处理。 六、Exchange的四种类型以及用法 从上面的工作流程可以看出,实际上有个关键的组件Exchange,因为消息发送到RabbitMQ后首先要经过Exchange路由才能找到对应的Queue。 实际上Exchange类型有四种,根据不同的类型工作的方式也有所不同。在HelloWord例子中,我们就使用了比较简单的Direct Exchange,翻译就是直连交换机。其余三种分别是:Fanout exchange、Topic exchange、Headers exchange。 6.1 Direct Exchange 见文知意,直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。 完整的代码就是上面的HelloWord的例子,不再重复代码。 6.2 Fanout exchange 这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。 代码怎么写呢,演示一下: 首先要先配置交换机和队列的名称: public class RabbitMQConfig { /** * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 A 的名称 */ public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A"; /** * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 B 的名称 */ public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B"; /** * RabbitMQ的FANOUT_EXCHANG交换机类型的名称 */ public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name"; } 再配置FanoutExchange类型的交换机和A、B两个队列,并且绑定。这种类型不需要配置routing key: @Component public class DirectRabbitConfig implements BeanPostProcessor { @Resource private RabbitAdmin rabbitAdmin; @Bean public Queue fanoutExchangeQueueA() { //队列A return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false); } @Bean public Queue fanoutExchangeQueueB() { //队列B return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false); } @Bean public FanoutExchange rabbitmqDemoFanoutExchange() { //创建FanoutExchange类型交换机 return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false); } @Bean public Binding bindFanoutA() { //队列A绑定到FanoutExchange交换机 return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange()); } @Bean public Binding bindFanoutB() { //队列B绑定到FanoutExchange交换机 return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange()); } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //启动项目即创建交换机和队列 rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange()); rabbitAdmin.declareQueue(fanoutExchangeQueueB()); rabbitAdmin.declareQueue(fanoutExchangeQueueA()); return null; } } 创建service发布消息的方法: @Service public class RabbitMQServiceImpl implements RabbitMQService { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Resource private RabbitTemplate rabbitTemplate; //发布消息 @Override public String sendMsgByFanoutExchange(String msg) throws Exception { Map<String, Object> message = getMessage(msg); try { rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } //组装消息体 private Map<String, Object> getMessage(String msg) { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); String sendTime = sdf.format(new Date()); Map<String, Object> map = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); return map; } } Controller接口: @RestController @RequestMapping("/mall/rabbitmq") public class RabbitMQController { /** * 发布消息 * * @author java技术爱好者 */ @PostMapping("/publish") public String publish(@RequestParam(name = "msg") String msg) throws Exception { return rabbitMQService.sendMsgByFanoutExchange(msg); } } 接着在消费者项目这边,创建两个队列的监听类,监听队列进行消费: @Component @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A)) public class FanoutExchangeConsumerA { @RabbitHandler public void process(Map<String, Object> map) { System.out.println("队列A收到消息:" + map.toString()); } } @Component @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B)) public class FanoutExchangeConsumerB { @RabbitHandler public void process(Map<String, Object> map) { System.out.println("队列B收到消息:" + map.toString()); } } 然后启动生产者和消费者两个项目,可以看到管理界面创建了一个FanoutExchange交换机和两个队列,并且绑定了: 使用POSTMAN进行发送消息,测试: 然后可以看到控制台,两个队列同时都收到了相同的消息,形成了发布订阅的效果: 6.3 Topic Exchange 直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 "#"。需要注意的是通配符前面必须要加上"."符号。 * 符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。 # 符号:匹配一个或多个词。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。 废话不多说,代码演示一下: 依然是配置TopicExchange名称和三个队列的名称: /** * RabbitMQ的TOPIC_EXCHANGE交换机名称 */ public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name"; /** * RabbitMQ的TOPIC_EXCHANGE交换机的队列A的名称 */ public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a"; /** * RabbitMQ的TOPIC_EXCHANGE交换机的队列B的名称 */ public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b"; /** * RabbitMQ的TOPIC_EXCHANGE交换机的队列C的名称 */ public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c"; 然后还是老配方,配置交换机和队列,然后绑定,创建: @Component public class DirectRabbitConfig implements BeanPostProcessor { //省略... @Bean public TopicExchange rabbitmqDemoTopicExchange() { //配置TopicExchange交换机 return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false); } @Bean public Queue topicExchangeQueueA() { //创建队列1 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false); } @Bean public Queue topicExchangeQueueB() { //创建队列2 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false); } @Bean public Queue topicExchangeQueueC() { //创建队列3 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false); } @Bean public Binding bindTopicA() { //队列A绑定到FanoutExchange交换机 return BindingBuilder.bind(topicExchangeQueueB()) .to(rabbitmqDemoTopicExchange()) .with("a.*"); } @Bean public Binding bindTopicB() { //队列A绑定到FanoutExchange交换机 return BindingBuilder.bind(topicExchangeQueueC()) .to(rabbitmqDemoTopicExchange()) .with("a.*"); } @Bean public Binding bindTopicC() { //队列A绑定到FanoutExchange交换机 return BindingBuilder.bind(topicExchangeQueueA()) .to(rabbitmqDemoTopicExchange()) .with("rabbit.#"); } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange()); rabbitAdmin.declareQueue(topicExchangeQueueA()); rabbitAdmin.declareQueue(topicExchangeQueueB()); rabbitAdmin.declareQueue(topicExchangeQueueC()); return null; } } 然后写一个发送消息的service方法: @Service public class RabbitMQServiceImpl implements RabbitMQService { @Override public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception { Map<String, Object> message = getMessage(msg); try { //发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } } 写一个Controller接口: @RestController @RequestMapping("/mall/rabbitmq") public class RabbitMQController { @Resource private RabbitMQService rabbitMQService; /** * 通配符交换机发送消息 * * @author java技术爱好者 */ @PostMapping("/topicSend") public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception { return rabbitMQService.sendMsgByTopicExchange(msg, routingKey); } } 生产者这边写完,就写消费端,消费端比较简单,写三个监听类: @Component @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A)) public class TopicExchangeConsumerA { @RabbitHandler public void process(Map<String, Object> map) { System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "]收到消息:" + map.toString()); } } @Component @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B)) public class TopicExchangeConsumerB { @RabbitHandler public void process(Map<String, Object> map) { System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "]收到消息:" + map.toString()); } } @Component @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C)) public class TopicExchangeConsumerC { @RabbitHandler public void process(Map<String, Object> map) { System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "]收到消息:" + map.toString()); } } 大功告成,然后启动项目开始调试。启动成功后可以看到队列和路由键绑定的关系: 通过POSTMAN进行测试,测试一下 rabbit.# 的路由键是否能够匹配成功: 测试成功,队列A消费到消息: 接着测试 a.* 路由键,发送 routingKey = a.b : 比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。 实际上稍微思考一下,可以发现通配符(TopicExchange)这种模式其实是可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果的。 FanoutExchange不需要绑定routingKey,所以性能相对TopicExchange会好一点。 6.4 Headers Exchange 这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。如图所示: 创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。代码怎么实现呢,往下看演示代码: 首先还是需要定义交换机名称,队列名称: /** * HEADERS_EXCHANGE交换机名称 */ public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name"; /** * RabbitMQ的HEADERS_EXCHANGE交换机的队列A的名称 */ public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a"; /** * RabbitMQ的HEADERS_EXCHANGE交换机的队列B的名称 */ public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b"; 然后设置交换机,队列,进行绑定: @Component public class DirectRabbitConfig implements BeanPostProcessor { @Bean public Queue headersQueueA() { return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false); } @Bean public Queue headersQueueB() { return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false); } @Bean public HeadersExchange rabbitmqDemoHeadersExchange() { return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false); } @Bean public Binding bindHeadersA() { Map<String, Object> map = new HashMap<>(); map.put("key_one", "java"); map.put("key_two", "rabbit"); //全匹配 return BindingBuilder.bind(headersQueueA()) .to(rabbitmqDemoHeadersExchange()) .whereAll(map).match(); } @Bean public Binding bindHeadersB() { Map<String, Object> map = new HashMap<>(); map.put("headers_A", "coke"); map.put("headers_B", "sky"); //部分匹配 return BindingBuilder.bind(headersQueueB()) .to(rabbitmqDemoHeadersExchange()) .whereAny(map).match(); } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange()); rabbitAdmin.declareQueue(headersQueueA()); rabbitAdmin.declareQueue(headersQueueB()); return null; } } 再写一个Service方法发送消息: @Service public class RabbitMQServiceImpl implements RabbitMQService { @Resource private RabbitTemplate rabbitTemplate; @Override public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception { try { MessageProperties messageProperties = new MessageProperties(); //消息持久化 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setContentType("UTF-8"); //添加消息 messageProperties.getHeaders().putAll(map); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } } 再写一个Controller接口: @RestController @RequestMapping("/mall/rabbitmq") public class RabbitMQController { @Resource private RabbitMQService rabbitMQService; @PostMapping("/headersSend") @SuppressWarnings("unchecked") public String headersSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "json") String json) throws Exception { ObjectMapper mapper = new ObjectMapper(); Map<String, Object> map = mapper.readValue(json, Map.class); return rabbitMQService.sendMsgByHeadersExchange(msg, map); } } 生产者这边写完了,再写两个队列的监听类进行消费: @Component public class HeadersExchangeConsumerA { @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A)) public void process(Message message) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); String contentType = messageProperties.getContentType(); System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "]收到消息:" + new String(message.getBody(), contentType)); } } @Component public class HeadersExchangeConsumerB { @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B)) public void process(Message message) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); String contentType = messageProperties.getContentType(); System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType)); } } 大功告成~启动项目,打开管理界面,我们可以看到交换机绑定队列的信息: 跟上面示意图一样~证明没有问题,一切尽在掌握之中。使用POSTMAN发送,测试全匹配的队列A: 再测试部分匹配的队列B: 总结 这篇文章就先写到这里了。回顾一下学了哪些: 什么是消息队列?为什么使用消息队列? RabbitMQ的特点、组成部分、工作流程 安装RabbitMQ,以及完成一个HelloWord小案例 RabbitMQ交换机的四种类型的特点,以及使用方法 实际上RabbitMQ还有事务机制和负载均衡这些还没讲,因为篇幅实在有点长了,差不多5千字了。所以放在下期讲吧,尽请期待一下。 上面所有例子的代码都上传github了: https://github.com/yehongzhi/mall 如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~ 想第一时间看到我更新的文章,可以微信搜索公众号「java技术爱好者」,拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!! 能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

优秀的个人博客,低调大师

一文入门DNS?从访问GitHub开始

前言 大家都是做开发的,都有GitHub的账号,在日常使用中肯定会遇到这种情况,在不修改任何配置的情况下,有时可以正常访问GitHub,有时又直接未响应,来一起捋捋到底是为啥。 GitHub访问的千层套路 以我家里的电脑为例,在不修改任何配置,不启用什么代理工具的情况下,访问GitHub会得到以下结果: 😂😂😂 虽然很戳心,但好歹能展示一部分。 从网上搜了一堆乱七八糟的攻略,知道了可以通过修改电脑的Hosts文件达到正常访问的能力,于是胡搜了一通, 步骤:百度经验 效果如下: 访问效果依然很感人,最近活动数据不显示,整个界面加载都快接近2分钟了,有什么办法没有咧~ 站长工具 PING PING PING 都是搞开发的,都会用F12看看网络或者资源请求的地址是什么,以上面耗时最慢的地址为例,域名为:github.githubassets.com 打开站长工具的PING功能,地址为:http://ping.chinaz.com/github.githubassets.com 结果如下: 我发现 185.199.108.154 这个IP地址速度快的一批,于是立马更换 Hosts中该域名对应的IP地址 再次访问,效果如下: 那句话怎么说的来着?如什么什么般丝滑,我感觉这就非常丝滑~ 🤪 GitHub项目定时发布最新Hosts 当然了,如果每次访问都得折腾一次,那滋味,简直不要太难受,所以网上已经有人开源了相关的项目,会定时发布最新的 GitHub IP地址,链接:https://github.com/521xueweihan/GitHub520 本文撰写时的Hosts # github 185.199.108.154 github.githubassets.com 199.232.68.133 camo.githubusercontent.com 52.168.24.190 github.map.fastly.net 199.232.69.194 github.global.ssl.fastly.net 140.82.112.4 github.com 140.82.112.5 api.github.com 199.232.68.133 raw.githubusercontent.com 199.232.68.133 user-images.githubusercontent.com 199.232.68.133 favicons.githubusercontent.com 199.232.68.133 avatars5.githubusercontent.com 199.232.68.133 avatars4.githubusercontent.com 199.232.68.133 avatars3.githubusercontent.com 199.232.68.133 avatars2.githubusercontent.com 199.232.68.133 avatars1.githubusercontent.com 199.232.68.133 avatars0.githubusercontent.com 该项目会自动发布在指定的地址上,结合软件使用,可以完全自动化,无需持续更新 当然也可以自行手动更改 为什么改了Hosts就能访问GitHub 平常都是百度 + 谷歌,今天非要探究一下原理!咱们一步一步来,首先大家都需要明确一点,在网络的世界中 域名 只是为了便于记忆和识别而存在的一个唯一地址,真正工作的仍然是 IP Hosts文件是干吗的 简单来说,Hosts文件是存储本机网址域名与其对应的IP地址的一个文件,在网络请求阶段发挥作用 为什么改了Hosts就能生效 这就涉及到了域名解析,因为Hosts文件存放的就是 域名 和 IP 的对应关系,因此它可以在域名解析阶段发挥作用,为什么呢?因为在域名解析的流程中 本机Hosts 解析处于顺序二 即:浏览器解析 -》本机解析 -》XXXX(后面的稍后再提) 所以有时候我们白嫖软件,都会改一下 Hosts,因为需要把它在线验证的域名指向错误的地址去,另外可能存在一定的浏览器缓存或者本机缓存,可以通过重开浏览器或者 PING 域名来检查更改是否生效。 DNS解析到底是什么玩意? 上文中多次提到解析,其实说的就是 DNS解析 同时上文也提到过,在网络世界中真正发挥作用的是 IP,而一般情况下我们访问的都是 域名,为什么能实现这种效果,就是因为域名与IP地址的对应关系存储在一个叫做 DNS(Domain Name System) 的系统里。DNS是一个全球化的分布式数据库,它所提供的服务就是将域名转换为互联网IP地址。 DNS解析的全部流程 网上的关于流程的图很多,我从中借鉴了一副,如下所示: 浏览器缓存:一次请求会首先通过浏览器缓存信息寻找域名映射的IP地址,这也是为什么有时候我们改了本机hosts,需要关闭再打开浏览器才能正常使用,如果找到则返回,没找到则继续到下一级 本机系统缓存:即上文中提到的,通过 hosts 文件来映射域名和IP,在上古时期有很多垃圾软件会悄咪咪的修改系统的 hosts文件,达到 DNS劫持 的目的,即把淘宝域名指向另一个 IP,然后部署一个高仿的淘宝商城,静静等你输入账号,密码,然后凉凉... 本地域名解析服务系统:本地域名系统LDNS一般都是本地区的域名服务器。离你的位置都比较近,Windows系统使用命令ipconfig 就可以查看,在Linux和Mac系统下,直接使用命令 cat /etc/resolv.conf 来查看LDNS服务地址。 LDNS一般都缓存了大部分的域名解析的结果,大部分的解析工作到这里就差不多已经结束了 以下即是所谓的 递归解析 根域名解析:本地域名解析服务系统无法解析时,会向 13根 发起域名解析请求 <font color="red">说明:</font> 所谓的 13根,指的是根域名服务器,是架构因特网所必须的基础设施。根服务器主要用来管理互联网的主目录,由于DNS解析中采用的是UDP协议,仅能传递512字节的有效报文,因此只能构建出A-M 13个根服务器,而真正工作运行肯定不止13台服务器,而是包含很多服务器镜像的 根域名解析服务器返回 gTLD (Generic top-level domain) 给本地解析服务器,即该域名所属的顶级域及其所在的服务器,顶级域名即如:.com .cn等等 本地解析服务器已知顶级域名服务器地址后,发起解析请求 顶级域名解析服务器返回 权限域名服务器 信息给本地解析服务器,权限域名服务器 即如:taobao.com 本地解析服务器已知权限域名服务器地址后,发起解析请求 权限域名服务器返回域名对应的IP地址给本地解析服务器 本地解析服务器缓存相关信息,并返回给用户 是不是有点绕?咱们来整个图吧,递归解析 如下所示: 再问一遍为什么改Hosts就可以访问GitHub 了解了上文之后,对于这个问题就更好回答了,因为GitHub毕竟为外国的网站,咱们访问时有一层 DNS污染,即把对应的域名指向了不可达的IP上,或者禁止访问的IP上,因此很多时候无法使用 修改Hosts文件后即避免了DNS污染,直达目标IP,即可正常访问了,当然了,这种方法是全部通用吗? 答案:肯定不是,因为刚才也提到了,网关层是可以控制某些IP禁止访问的 整一个工具来验证一下猜想,顺便看看我们的整个请求流程: 软件名:BestTrace 我请求的域名是 github.githubassets.com,最终请求接收方IP和我Hosts配置的IP一致,那我换一个 facebook.com 可以看到,当请求到达 221.183.46.249 这个IP时,整个请求就被拦截下来了,因此这并不是万能的办法 除了访问GitHub,还有什么时候可能用到呢? 比如下载 IDEA插件时,如果发现老是刷新不出来插件库,或者下载失败,就可以通过PING工具去配置最佳IP,方便下载~ DNS除了解析还能做什么 智能DNS 网络请求交由域名解析服务器来处理,分配到最佳的服务器IP上 例如:请求的源头是电信还是联通等,如果是电信则将解析的IP分流到电信对应的IP上,或者返回距离最近的服务器IP地址 反向代理水平扩展 典型的互联网架构中,可以通过增加web-server来扩充web层的性能,但反向代理nginx仍是整个系统的唯一入口 如果系统吞吐超过nginx的性能极限,那么将难以扩容,此时就需要dns-server来配合水平扩展。 即DNS解析服务器有序的把域名解析到不同的网关层,每次DNS解析请求,轮询返回不同的ip,这样就能实现nginx的水平扩展,这个方法叫 “DNS轮询” 最后 参考资料: 博客园 DNS解析全过程分析 除了解析域名,DNS还能干吗 如果觉得对你有用的话,不要忘记点个赞啊~ 也可以扫描二维码关注我,一起朝着技术人的顶峰前进!

优秀的个人博客,低调大师

课程发布 | 从入门到实践,彻底搞懂 Serverless!

众所周知,我们在开发应用程序并将其部署在服务器上的过程细节上往往要花费很多精力,有没有一种简单的架构模型能够帮助我们解决这个问题呢?答案就是今天软件架构世界中新鲜但是很热门的一个话题——Serverless(无服务器)架构。 Serverless 兴起于 2017 年,在最近两年伴随着云原生概念的推广愈发火爆,很多开发者非常看好,并认为它一定是未来云计算发展的方向。其实,虽然说是 Serverless,但 Server(服务器)是不可能真正消失的,Serverless 里这个 less 更确切的说是开发不用关心的意思。 Serverless 带来的技术红利众多,其中两个最大的贡献即是在一定程度上大大提高了研发交付的速度和降低了运营成本。 高速度研发交付:在无服务器时代,研发人眼无需对服务器进行监控、配置、更新等运维操作,只需要将代码上传

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册