首页 文章 精选 留言 我的

精选列表

搜索[官方],共10007篇文章
优秀的个人博客,低调大师

Redisson官方文档 - 9. 分布式服务

9.1. 分布式远程服务(Remote Service) 基于Redis的Java分布式远程服务,可以用来通过共享接口执行存在于另一个Redisson实例里的对象方法。换句话说就是通过Redis实现了Java的远程过程调用(RPC)。分布式远程服务基于可以用POJO对象,方法的参数和返回类不受限制,可以是任何类型。 分布式远程服务(Remote Service)提供了两种类型的RRemoteService实例: 服务端(远端)实例 - 用来执行远程方法(工作者实例即worker instance).例如: RRemoteService remoteService = redisson.getRemoteService(); SomeServiceImpl someServiceImpl = new SomeServiceImpl(); // 在调用远程方法以前,应该首先注册远程服务 // 只注册了一个服务端工作者实例,只能同时执行一个并发调用 remoteService.register(SomeServiceInterface.class, someServiceImpl); // 注册了12个服务端工作者实例,可以同时执行12个并发调用 remoteService.register(SomeServiceInterface.class, someServiceImpl, 12); 客户端(本地)实例 - 用来请求远程方法.例如: RRemoteService remoteService = redisson.getRemoteService(); SomeServiceInterface service = remoteService.get(SomeServiceInterface.class); String result = service.doSomeStuff(1L, "secondParam", new AnyParam()); 客户端和服务端必须使用一样的共享接口,生成两者的Redisson实例必须采用相同的连接配置。客户端和服务端实例可以运行在同一个JVM里,也可以是不同的。客户端和服务端的数量不收限制。(注意:尽管Redisson不做任何限制,但是Redis的限制仍然有效。) 在服务端工作者可用实例数量 大于1 的时候,将并行执行并发调用的远程方法。 并行执行工作者数量计算方法如下:T = R * N T - 并行执行工作者总数R - Redisson服务端数量N - 注册服务端时指定的执行工作者数量 超过该数量的并发请求将在列队中等候执行。 在服务端工作者实例可用数量为 1 时,远程过程调用将会按 __顺序执行__。这种情况下,每次只有一个请求将会被执行,其他请求将在列队中等候执行。 9.1.1. 分布式远程服务工作流程 分布式远程服务为每个注册接口建立了两个列队。一个列队用于请求,由服务端监听,另一个列队用于应答回执和结果回复,由客户端监听。应答回执用于判定该请求是否已经被接受。如果在指定的超时时间内没有被执行工作者执行将会抛出RemoteServiceAckTimeoutException错误。 下图描述了每次发起远程过程调用请求的工作流程。 9.1.2. 发送即不管(Fire-and-Forget)模式和应答回执(Ack-Response)模式 分布式远程服务通过org.redisson.core.RemoteInvocationOptions类,为每个远程过程调用提供了一些可配置选项。这些选项可以用来指定和修改请求超时和选择跳过应答回执或结果的发送模式。例如: // 应答回执超时1秒钟,远程执行超时30秒钟 RemoteInvocationOptions options = RemoteInvocationOptions.defaults(); // 无需应答回执,远程执行超时30秒钟 RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck(); // 应答回执超时1秒钟,不等待执行结果 RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult(); // 应答回执超时1分钟,不等待执行结果 RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult(); // 发送即不管(Fire-and-Forget)模式,无需应答回执,不等待结果 RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult(); RRemoteService remoteService = redisson.getRemoteService(); YourService service = remoteService.get(YourService.class, options); 9.1.3. 异步调用 远程过程调用也可以采用异步的方式执行。异步调用需要单独提交一个带有@RRemoteAsync注释(annotation)的异步接口类。异步接口方法签名必须与远程接口的方法签名相符。异步接口的返回类必须是org.redisson.api.RFuture对象或其子对象。在调用RRemoteService.get方法时将对异步接口的方法进行验证。异步接口无须包含所有的远程接口里的方法,只需要包含要求异步执行的方法即可。 // 远程接口 public interface RemoteInterface { Long someMethod1(Long param1, String param2); void someMethod2(MyObject param); MyObject someMethod3(); } // 匹配远程接口的异步接口 @RRemoteAsync(RemoteInterface.class) public interface RemoteInterfaceAsync { RFuture<Long> someMethod1(Long param1, String param2); RFuture<Void> someMethod2(MyObject param); } RRemoteService remoteService = redisson.getRemoteService(); RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class); 9.1.4. 取消异步调用 通过调用Future.cancel()方法可以非常方便的取消一个异步调用。分布式远程服务允许在三个阶段中任何一个阶段取消异步调用: 远程调用请求在列队中排队阶段 远程调用请求已经被分布式远程服务接受,还未发送应答回执,执行尚未开始。 远程调用请求已经在执行阶段 想要正确的处理第三个阶段,在服务端代码里应该检查Thread.currentThread().isInterrupted()的返回状态。范例如下: // 远程接口 public interface MyRemoteInterface { Long myBusyMethod(Long param1, String param2); } // 匹配远程接口的异步接口 @RRemoteAsync(MyRemoteInterface.class) public interface MyRemoteInterfaceAsync { RFuture<Long> myBusyMethod(Long param1, String param2); } // 远程接口的实现 public class MyRemoteServiceImpl implements MyRemoteInterface { public Long myBusyMethod(Long param1, String param2) { for (long i = 0; i < Long.MAX_VALUE; i++) { iterations.incrementAndGet(); if (Thread.currentThread().isInterrupted()) { System.out.println("interrupted! " + i); return; } } } } RRemoteService remoteService = redisson.getRemoteService(); ExecutorService executor = Executors.newFixedThreadPool(5); // 注册远程服务的服务端的同时,通过单独指定的ExecutorService来配置执行线程池 MyRemoteInterface serviceImpl = new MyRemoteServiceImpl(); remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor); // 异步调用方法 MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class); RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam"); // 取消异步调用 future.cancel(true); 9.2. 分布式实时对象(Live Object)服务 9.2.1. 介绍 一个 分布式实时对象(Live Object) 可以被理解为一个功能强化后的Java对象。该对象不仅可以被一个JVM里的各个线程相引用,还可以被多个位于不同JVM里的线程同时引用。Wikipedia对这种特殊对象的概述是: Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior. Redisson分布式实时对象(Redisson Live Object,简称RLO)运用即时生成的代理类(Proxy),将一个指定的普通Java类里的所有字段,以及针对这些字段的操作全部映射到一个Redis Hash的数据结构,实现这种理念。每个字段的get和set方法最终被转译为针对同一个Redis Hash的hget和hset命令,从而使所有连接到同一个Redis节点的所有可以客户端同时对一个指定的对象进行操作。众所周知,一个对象的状态是由其内部的字段所赋的值来体现的,通过将这些值保存在一个像Redis这样的远程共享的空间的过程,把这个对象强化成了一个分布式对象。这个分布式对象就叫做Redisson分布式实时对象(Redisson Live Object,简称RLO)。 通过使用RLO,运行在不同服务器里的多个程序之间,共享一个对象实例变得和在单机程序里共享一个对象实例一样了。同时还避免了针对任何一个字段操作都需要将整个对象序列化和反序列化的繁琐,进而降低了程序开发的复杂性和其数据模型的复杂性:从任何一个客户端修改一个字段的值,处在其他服务器上的客户端(几乎^)即刻便能查看到。而且实现代码与单机程序代码无异。(^连接到从节点的客户端仍然受Redis的最终一致性的特性限制) 鉴于Redis是一个单线程的程序,针对实时对象的所有的字段操作可以理解为全部是原子性操作,也就是说在读取一个字段的过程不会担心被其他线程所修改。 通过使用RLO,可以把Redis当作一个允许被多个JVM同时操作且不受GC影响的共享堆(Heap Space)。 9.2.2. 使用方法 要想获得RLO带来的所有便利,只需要为一个类添加一个@REntity注释,然后再为其中的一个字段添加一个@RId注释即可。 @REntity public class MyLiveObject { @RId private String name; //其他字段 ... ... //get和set方法 ... ... } 就这样简单两步,即可将一个普通的Java对象“升级”成了一个Redisson分布式实时对象。通过Redisson对象实例提供的RedissonLiveObjectService服务对象可以很方便的获取RLO实例: ... RLiveObjectService service = redisson.getLiveObjectService(); MyLiveObject myObject1 = new MyLiveObject(); myObject1.setName("myName"); MyLiveObject myObject1 = service.<MyLiveObject, String>persist(myObject1); //或者取得一个已经存在的RLO实例 MyLiveObject myObject1 = service.<MyLiveObject, String>get(MyLiveObject.class, "myName"); ... RLO的用法和普通Java对象的用法一样,以以下对象为例: @REntity public class MyObject { @RId private String name; private String value; public MyObject(String name) { this.name = name; } public MyObject() { } public String getName() { return name; } public String getValue() { return value; } public void setName(String name) { this.name = name; } public void setValue(String value) { this.value = value; } } 在作为普通对象操作时: //普通Java对象实例 MyObject standardObject1 = new MyObject(); standardObject1.setName("standard1"); //当然也可以使用非默认构造函数 MyObject standardObject2 = new MyObject("standard2"); 也可以作为RLO实例使用: //首先获取服务实例 RLiveObjectService service = redisson.getLiveObjectService(); //通过服务实例构造RLO实例 MyObject standardObject1 = new MyObject(); standardObject1.setName("liveObjectId"); MyObject liveObject1 = service.<MyObject, String>persist(standardObject1); //服务实例会首先通过单一参数为条件查找构造函数,如果能找到就尝试采用"liveObjectId"作为参数 //来构造实例,如果没有找到就采用默认构造函数,然后调用setName("liveObjectId")赋值,最后再 //返回对象。 在使用上,二者完全没有分别。 //为"value"字段赋值的方法也是一样: standardObject1.setValue("abc");//“abc”作为字段值,储存在JVM内存的堆里(Heap space) standardObject2.setValue("abc");//同上 liveObject1.setValue("abc"); //“abc”作为字段值,储存在Redis里,而不是在JVM内存堆里。(虽然它会在字符串池里出现,但是没有被 //对象引用,因此不会影响垃圾回收。) //提取"value"字段的值也是一样的 System.out.println(standardObject1.getValue()); //在控制台里输出"abc",这个值是从JVM的内存堆里获取出来的。 System.out.println(standardObject2.getValue());//同上 System.out.println(liveObject1.getValue()); //控制台输出内容和上面一样,但值是从Redis里获取出来的。 单从上面两段代码看,结果一模一样,但这其中还有一些细微的不同。这里将通过以下例子详细介绍: @REntity public class MyLiveObject { @RId private String name; private MyOtherObject value; public MyLiveObject(String name) { this.name = name; } public MyObject() { } public String getName() { return name; } public MyOtherObject getValue() { return value; } public void setName(String name) { this.name = name; } public void setValue(MyOtherObject value) { this.value = value; } } 和上面的例子不同的是,我们将“value”字段从一个不可变类String换成了一个可变类MyOtherObject,在一个普通的Java对象里,当你调用getValue()方法时,你得到的会是原MyOtherObject实例的一个引用。在RLO对象里,调用同样的方法,返回的将会是一个全新对象的引用。这就会产生以下的现象: //RLO对象: MyLiveObject myLiveObject = service.get(MyLiveObject.class, "1"); myLiveObject.setValue(new MyOtherObject()); System.out.println(myLiveObject.getValue() == myLiveObject.getValue()); //输出值为假(False) (除非在对象编码器里采用了对象池) //普通Java对象: MyLiveObject notLiveObject = new MyLiveObject(); notLiveObject.setValue(new MyOtherObject()); System.out.println(notLiveObject.getValue() == notLiveObject.getValue()); //输出值为真(True) 再比如: //RLO对象: MyLiveObject myLiveObject = service.get(MyLiveObject.class, "1"); MyOtherObject other = new MyOtherObject(); other.setOtherName("ABC"); myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //输出是ABC other.setOtherName("BCD"); System.out.println(myLiveObject.getValue().getOtherName()); //还是输出ABC myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //现在输出是BCD //普通Java对象: MyLiveObject myLiveObject = new MyLiveObject("1"); MyOtherObject other = new MyOtherObject(); other.setOtherName("ABC"); myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //输出是ABC other.setOtherName("BCD"); System.out.println(myLiveObject.getValue().getOtherName()); //输出已经是BCD了 myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //输出还是BCD 产生这个现象的原因是因为Redisson没有在JVM里保存MyOtherObject对象的状态,而是在每次调用set和get的时候,先将一个实例从Redis里序列化和反序列化出来,再赋值取值。这是和JPA里的修改脱管(detach)对象状态类似。这种现象通常情况下对不可变类来说不会有任何影响,比如说String,Double,Long等等。而在操作可变类是你反而可以利用它这种特性,正因为这个实例处于修改脱管状态,取得的实例与其本身脱离了联系,此时对脱管对象的读写操作可以理解为是处于一个具有一些ACID特性的事务状态。正确利用这样的特性将会获益匪浅。当然如果你仍然希望RLO的用法与普通Java对象完全一致,只需将MyOtherObject也转换成一个RLO对象即可。 //RLO套嵌RLO的情形 MyLiveObject myLiveObject = service.get(MyLiveObject.class, "1"); MyOtherObject other = service.get(MyOtherObject.class, "2"); other.setOtherName("ABC"); myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //输出ABC other.setOtherName("BCD"); System.out.println(myLiveObject.getValue().getOtherName()); //现在输出已经是BCD了,和普通Java对象一样 myLiveObject.setValue(other); System.out.println(myLiveObject.getValue().getOtherName()); //还是输出BCD RLO的字段类型基本上无限制,可以是任何类型。比如Java util包里的集合类,Map类等,也可以是自定义的对象。只要指定的编码解码器能够对其进行编码和解码操作便可。关于编码解码器的详细信息请查阅高级使用方法章节。 尽管RLO的字段类型基本上无限制,个别类型还是受限。注释了RId的字段类型不能是数组类(Array),比如int[],long[],double[],byte[]等等。更多关于限制有关的介绍和原理解释请查阅使用限制 章节。 为了保证RLO的用法和普通Java对象的用法尽可能一直,Redisson分布式实时对象服务自动将以下普通Java对象转换成与之匹配的Redisson分布式对象RObject。 普通Java类 转换后的Redisson类 SortedSet.class RedissonSortedSet.class Set.class RedissonSet.class ConcurrentMap.class RedissonMap.class Map.class RedissonMap.class BlockingDeque.class RedissonBlockingDeque.class Deque.class RedissonDeque.class BlockingQueue.class RedissonBlockingQueue.class Queue.class RedissonQueue.class List.class RedissonList.class 类型转换将按照从上至下的顺序匹配类型,例如LinkedList类同时实现了Deque,List和Queue,由于Deque排在靠上的位置,因此它将会被转换成一个RedissonDeque类型。 Redisson的分布式对象也采用类似的方式,将自身的状态储存于Redis当中,(几乎^)所有的状态改变都直接映射到Redis里,不在本地JVM中保留任何赋值。(^本地缓存对象除外,比如RLocalCachedMap) 9.2.3. 高级使用方法 正如上述介绍,RLO类其实都是按需实时生成的代理(Proxy)类。生成的代理类和原类都一同缓存Redisson实例里。这个过程会消耗一些时间,在对耗时比较敏感的情况下,建议通过RedissonLiveObjectService提前注册所有的RLO类。这个服务也可以用来注销不再需要的RLO类,也可以用来查询一个类是否已经注册了。 RLiveObjectService service = redisson.getLiveObjectService(); service.registerClass(MyClass.class); service.unregisterClass(MyClass.class); Boolean registered = service.isClassRegistered(MyClass.class); 9.2.4. 注解(Annotation)使用方法 @REntity 通过指定@REntity的各个参数,可以详细的对每个RLO类实现特殊定制,以达到改变RLO对象的行为。 namingScheme - 命名方案。命名方案规定了每个实例在Redis中对应key的名称。它不仅被用来与已存在的RLO建立关联,还被用来储存新建的RLO实例。默认采用Redisson自带的DefaultNamingScheme对象。 codec - 编码解码器。在运行当中,Redisson用编码解码器来对RLO中的每个字段进行编码解码。Redisson内部采用了实例池管理不同类型的编码解码器实例。Redisson提供了多种不同的编码解码器,默认使用JsonJacksonCodec。 fieldTransformation - 字段转换模式。如上所述,为了尽可能的保证RLO的用法和普通Java对象一致,Redisson会自动将常用的普通Java对象转换成与其匹配的Redisson分布式对象。这是由于字段转换模式的默认值是ANNOTATION_BASED,修改为IMPLEMENTATION_BASED就可以不转换。 @RId @RId注释只能用在具备区分实例的字段上,这类字段可以理解为一个类的id字段或主键字段。这个字段的值将被命名方案namingScheme用来与事先存在的RLO建立引用。加了该注释的字段是唯一在本地JVM里同时保存赋值的字段。一个类只能有一个字段包含@RId注释。 可以通过指定一个生成器generator策略来实现自动生成这个字段的值。默认不提供生成器。 @RObjectField 当@REntity注释里字段转换模式transformationMode是默认值ANNOTATION_BASED时,可以为一个未包含@RId注释的字段添加该注释。这个注释可以用来特别指定该字段的命名方案namingScheme和编码解码器codec。二者可以与@REntity注释里的值不同。 您可能已经看出来了,命名方案namingScheme和编码解码器codec在Redisson分布式实时对象服务里的使用频率是很高的。为了避免重复构造冗余的实例,Redisson在默认情况下通过内置实例池管理重复使用这些实例。有需要可以在构造Redisson实例的同时,通过Config来指定您自己定制的提供者(Provider)对象。 9.2.5. 使用限制 如上所述,带有RId注释字段的类型不能使数组类,这是因为目前默认的命名方案类DefaultNamingScheme还不能正确地将数组类序列化和反序列化。在改善了DefaultNamingScheme类的不足以后会考虑取消这个限制。另外由于带有RId注释的字段是用来指定Redis中映射的key的名称,因此组建一个只含有唯一一个字段的RLO类是毫无意义的。选用RBucket会更适合这样的场景。 9.3. 分布式执行服务(Executor Service) 9.3.1. 分布式执行服务概述 Redisson的分布式执行服务实现了java.util.concurrent.ExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任务。这样的任务也可以通过使用Redisson实例,实现对储存在Redis里的数据进行操作。Redisson分布式执行服务是最快速和有效执行分布式运算的方法。 9.3.2. 任务 Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。 采用Callable任务的范例: public class CallableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; @Override public Long call() throws Exception { RMap<String, Integer> map = redissonClient.getMap("myMap"); Long result = 0; for (Integer value : map.values()) { result += value; } return result; } } RExecutorService executorService = redisson.getExecutorService("myExecutor"); Future<Long> future = executorService.submit(new CallableTask()); Long result = future.get(); 采用Runnable任务的范例: public class RunnableTask implements Runnable { @RInject private RedissonClient redissonClient; private long param; public RunnableTask() { } public RunnableTask(long param) { this.param = param; } @Override public void run() { RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic"); atomic.addAndGet(param); } } RExecutorService executorService = redisson.getExecutorService("myExecutor"); executorService.submit(new RunnableTask(123)); 可以通过@RInject注释来为任务实时注入Redisson实例依赖。 9.3.3. 取消任务 通过Future.cancel()方法可以很方便的取消所有已提交的任务。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断: public class CallableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; @Override public Long call() throws Exception { RMap<String, Integer> map = redissonClient.getMap("myMap"); Long result = 0; // map里包含了许多的元素 for (Integer value : map.values()) { if (Thread.currentThread().isInterrupted()) { // 任务被取消了 return null; } result += value; } return result; } } RExecutorService executorService = redisson.getExecutorService("myExecutor"); Future<Long> future = executorService.submit(new CallableTask()); // 或 RFuture<Long> future = executorService.submitAsync(new CallableTask()); // ... future.cancel(true); 9.4. 分布式调度任务服务(Scheduler Service) 9.4.1. 分布式调度任务服务概述 Redisson的分布式调度任务服务实现了java.util.concurrent.ScheduledExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任务。Redisson独立节点按顺序运行Redis列队里的任务。调度任务是一种需要在未来某个指定时间运行一次或多次的特殊任务。 9.4.2. 设定任务计划 Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。 采用Callable任务的范例: public class CallableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; @Override public Long call() throws Exception { RMap<String, Integer> map = redissonClient.getMap("myMap"); Long result = 0; for (Integer value : map.values()) { result += value; } return result; } } RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES); Long result = future.get(); 采用Runnable任务的范例: public class RunnableTask implements Runnable { @RInject private RedissonClient redissonClient; private long param; public RunnableTask() { } public RunnableTask(long param) { this.param= param; } @Override public void run() { RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic"); atomic.addAndGet(param); } } RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS); // ... ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS); // ... ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS); 9.4.3. 通过CRON表达式设定任务计划 在分布式调度任务中,可以通过CRON表达式来为任务设定一个更复杂的计划。表达式与Quartz的CRON格式完全兼容。 例如: RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?")); // ... executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5)); // ... executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY)); 9.4.4. 取消计划任务 分布式调度任务服务提供了两张取消任务的方式:通过调用ScheduledFuture.cancel()方法或调用RScheduledExecutorService.cancelScheduledTask方法。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断: public class RunnableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; @Override public Long call() throws Exception { RMap<String, Integer> map = redissonClient.getMap("myMap"); Long result = 0; // map里包含了许多的元素 for (Integer value : map.values()) { if (Thread.currentThread().isInterrupted()) { // 任务被取消了 return null; } result += value; } return result; } } RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5)); // ... future.cancel(true); // 或 String taskId = future.getTaskId(); // ... executorService.cancelScheduledTask(taskId); 9.5. 分布式映射归纳服务(MapReduce) 9.5.1 介绍 Redisson提供了通过映射归纳(MapReduce)编程模式来处理储存在Redis环境里的大量数据的服务。这个想法来至于其他的类似实现方式和谷歌发表的研究。所有 映射(Map) 和 归纳(Reduce) 阶段中的任务都是被分配到各个独立节点(Redisson Node)里并行执行的。以下所有接口均支持映射归纳(MapReduce)功能: RMap、 RMapCache、 RLocalCachedMap、 RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue 和 RPriorityDeque 映射归纳(MapReduce)的功能是通过RMapper、 RCollectionMapper、 RReducer 和 RCollator 这几个接口实现的。 1. RMapper 映射器接口适用于映射(Map)类,它用来把映射(Map)中的每个元素转换为另一个作为归纳(Reduce)处理用的键值对。 public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable { void map(KIn key, VIn value, RCollector<KOut, VOut> collector); } 2. RCollectionMapper 映射器接口仅适用于集合(Collection)类型的对象,它用来把集合(Collection)中的元素转换成一组作为归纳(Reduce)处理用的键值对。 public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable { void map(VIn value, RCollector<KOut, VOut> collector); } 3. RReducer 归纳器接口用来将上面这些,由映射器生成的键值对列表进行归纳整理。 public interface RReducer<K, V> extends Serializable { V reduce(K reducedKey, Iterator<V> values); } 4. RCollator 收集器接口用来把归纳整理以后的结果化简为单一一个对象。 public interface RCollator<K, V, R> extends Serializable { R collate(Map<K, V> resultMap); } 以上每个阶段的任务都可以用@RInject注解的方式来获取RedissonClient实例: public class WordMapper implements RMapper<String, String, String, Integer> { @RInject private RedissonClient redissonClient; @Override public void map(String key, String value, RCollector<String, Integer> collector) { // ... redissonClient.getAtomicLong("mapInvocations").incrementAndGet(); } } 9.5.2 映射(Map)类型的使用范例 Redisson提供的RMap、 RMapCache和RLocalCachedMap这三种映射(Map)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。 以下是在映射(Map)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例: public class WordMapper implements RMapper<String, String, String, Integer> { @Override public void map(String key, String value, RCollector<String, Integer> collector) { String[] words = value.split("[^a-zA-Z]"); for (String word : words) { collector.emit(word, 1); } } } public class WordReducer implements RReducer<String, Integer> { @Override public Integer reduce(String reducedKey, Iterator<Integer> iter) { int sum = 0; while (iter.hasNext()) { Integer i = (Integer) iter.next(); sum += i; } return sum; } } public class WordCollator implements RCollator<String, Integer, Integer> { @Override public Integer collate(Map<String, Integer> resultMap) { int result = 0; for (Integer count : resultMap.values()) { result += count; } return result; } } RMap<String, String> map = redisson.getMap("wordsMap"); map.put("line1", "Alice was beginning to get very tired"); map.put("line2", "of sitting by her sister on the bank and"); map.put("line3", "of having nothing to do once or twice she"); map.put("line4", "had peeped into the book her sister was reading"); map.put("line5", "but it had no pictures or conversations in it"); map.put("line6", "and what is the use of a book"); map.put("line7", "thought Alice without pictures or conversation"); RMapReduce<String, String, String, Integer> mapReduce = map.<String, Integer>mapReduce() .mapper(new WordMapper()) .reducer(new WordReducer()); // 统计词频 Map<String, Integer> mapToNumber = mapReduce.execute(); // 统计字数 Integer totalWordsAmount = mapReduce.execute(new WordCollator()); 9.5.3 集合(Collection)类型的使用范例 Redisson提供的RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue和RPriorityDeque这几种集合(Collection)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。 以下是在集合(Collection)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例: public class WordMapper implements RCollectionMapper<String, String, Integer> { @Override public void map(String value, RCollector<String, Integer> collector) { String[] words = value.split("[^a-zA-Z]"); for (String word : words) { collector.emit(word, 1); } } } public class WordReducer implements RReducer<String, Integer> { @Override public Integer reduce(String reducedKey, Iterator<Integer> iter) { int sum = 0; while (iter.hasNext()) { Integer i = (Integer) iter.next(); sum += i; } return sum; } } public class WordCollator implements RCollator<String, Integer, Integer> { @Override public Integer collate(Map<String, Integer> resultMap) { int result = 0; for (Integer count : resultMap.values()) { result += count; } return result; } } RList<String> list = redisson.getList("myList"); list.add("Alice was beginning to get very tired"); list.add("of sitting by her sister on the bank and"); list.add("of having nothing to do once or twice she"); list.add("had peeped into the book her sister was reading"); list.add("but it had no pictures or conversations in it"); list.add("and what is the use of a book"); list.add("thought Alice without pictures or conversation"); RCollectionMapReduce<String, String, Integer> mapReduce = list.<String, Integer>mapReduce() .mapper(new WordMapper()) .reducer(new WordReducer()); // 统计词频 Map<String, Integer> mapToNumber = mapReduce.execute(); // 统计字数 Integer totalWordsAmount = mapReduce.execute(new WordCollator());

优秀的个人博客,低调大师

Redisson官方文档 - 6. 分布式对象

每个Redisson对象实例都会有一个与之对应的Redis数据实例,可以通过调用getName方法来取得Redis数据实例的名称(key)。 RMap map = redisson.getMap("mymap"); map.getName(); // = mymap 所有与Redis key相关的操作都归纳在RKeys这个接口里: RKeys keys = redisson.getKeys(); Iterable<String> allKeys = keys.getKeys(); Iterable<String> foundedKeys = keys.getKeysByPattern('key*'); long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3"); long deletedKeysAmount = keys.deleteByPattern("test?"); String randomKey = keys.randomKey(); long keysAmount = keys.count(); 6.1. 通用对象桶(Object Bucket) Redisson的分布式RBucketJava对象是一种通用对象桶可以用来存放任类型的对象。 RBucket<AnyObject> bucket = redisson.getBucket("anyObject"); bucket.set(new AnyObject(1)); AnyObject obj = bucket.get(); bucket.trySet(new AnyObject(3)); bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); bucket.getAndSet(new AnyObject(6)); 6.2. 二进制流(Binary Stream) Redisson的分布式RBinaryStream Java对象同时提供了InputStream接口和OutputStream接口的实现。流的最大容量受Redis主节点的内存大小限制。 RBinaryStream stream = redisson.getBinaryStream("anyStream"); byte[] content = ... stream.set(content); InputStream is = stream.getInputStream(); byte[] readBuffer = new byte[512]; is.read(readBuffer); OutputStream os = stream.getOuputStream(); byte[] contentToWrite = ... os.write(contentToWrite); 6.3. 地理空间对象桶(Geospatial Bucket) Redisson的分布式RGeoJava对象是一种专门用来储存与地理位置有关的对象桶。 RGeo<String> geo = redisson.getGeo("test"); geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"), new GeoEntry(15.087269, 37.502669, "Catania")); geo.addAsync(37.618423, 55.751244, "Moscow"); Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS); geo.hashAsync("Palermo", "Catania"); Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1"); List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS); Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS); 6.4. BitSet Redisson的分布式RBitSetJava对象采用了与java.util.BiteSet类似结构的设计风格。可以理解为它是一个分布式的可伸缩式位向量。需要注意的是RBitSet的大小受Redis限制,最大长度为4 294 967 295。 RBitSet set = redisson.getBitSet("simpleBitset"); set.set(0, true); set.set(1812, false); set.clear(0); set.addAsync("e"); set.xor("anotherBitset"); 6.4.1. BitSet数据分片(Sharding) 基于Redis的Redisson集群分布式BitSet通过RClusteredBitSet接口,为集群状态下的Redis环境提供了BitSet数据分片的功能。通过优化后更加有效的算法,突破了原有的BitSet大小限制,达到了集群物理内存容量大小。在这里可以获取更多的内部信息。 RClusteredBitSet set = redisson.getClusteredBitSet("simpleBitset"); set.set(0, true); set.set(1812, false); set.clear(0); set.addAsync("e"); set.xor("anotherBitset"); 该功能仅限于Redisson PRO版本。 6.5. 原子整长形(AtomicLong) Redisson的分布式整长形RAtomicLong对象和Java中的java.util.concurrent.atomic.AtomicLong对象类似。 RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong"); atomicLong.set(3); atomicLong.incrementAndGet(); atomicLong.get(); 6.6. 原子双精度浮点(AtomicDouble) Redisson还提供了分布式原子双精度浮点RAtomicDouble,弥补了Java自身的不足。 RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble"); atomicDouble.set(2.81); atomicDouble.addAndGet(4.11); atomicDouble.get(); 6.7. 话题(订阅分发) Redisson的分布式话题RTopic对象实现了发布、订阅的机制。 RTopic<SomeObject> topic = redisson.getTopic("anyTopic"); topic.addListener(new MessageListener<SomeObject>() { @Override public void onMessage(String channel, SomeObject message) { //... } }); // 在其他线程或JVM节点 RTopic<SomeObject> topic = redisson.getTopic("anyTopic"); long clientsReceivedMessage = topic.publish(new SomeObject()); 在Redis节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。 6.7.1. 模糊话题 Redisson的模糊话题RPatternTopic对象可以通过正式表达式来订阅多个话题。 // 订阅所有满足`topic1.*`表达式的话题 RPatternTopic<Message> topic1 = redisson.getPatternTopic("topic1.*"); int listenerId = topic1.addListener(new PatternMessageListener<Message>() { @Override public void onMessage(String pattern, String channel, Message msg) { Assert.fail(); } }); 在Redis节点故障转移(主从切换)或断线重连以后,所有的模糊话题监听器将自动完成话题的重新订阅。 6.8. 布隆过滤器(Bloom Filter) Redisson利用Redis实现了Java分布式布隆过滤器(Bloom Filter)。 RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample"); // 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03 bloomFilter.tryInit(55000000L, 0.03); bloomFilter.add(new SomeObject("field1Value", "field2Value")); bloomFilter.add(new SomeObject("field5Value", "field8Value")); bloomFilter.contains(new SomeObject("field1Value", "field8Value")); 6.8.1. 布隆过滤器数据分片(Sharding) 基于Redis的RedisOS你集群分布式布隆过滤器通过RClusteredBloomFilter接口,为集群状态下的Redis环境提供了布隆过滤器数据分片的功能。通过优化后更加有效的算法,突破了原有的BitSet大小限制,达到了集群物理内存容量大小。在这里可以获取更多的内部信息。 RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample"); // 采用以下参数创建布隆过滤器 // expectedInsertions = 255000000 // falseProbability = 0.03 bloomFilter.tryInit(255000000L, 0.03); bloomFilter.add(new SomeObject("field1Value", "field2Value")); bloomFilter.add(new SomeObject("field5Value", "field8Value")); bloomFilter.contains(new SomeObject("field1Value", "field8Value")); 该功能仅限于Redisson PRO版本。 6.9. 基数估计算法(HyperLogLog) Redisson利用Redis实现了Java分布式基数估计算法(HyperLogLog)对象。 RHyperLogLog<Integer> log = redisson.getHyperLogLog("log"); log.add(1); log.add(2); log.add(3); log.count(); 6.10. 整长型累加器(LongAdder) 基于Redis的Redisson分布式整长型累加器采用了与java.util.concurrent.atomic.LongAdder类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong对象快 12000 倍。完美适用于分布式统计计量场景。 RLongAdder atomicLong = redisson.getLongAdder("myLongAdder"); atomicLong.add(12); atomicLong.increment(); atomicLong.decrement(); atomicLong.sum(); 当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。 RLongAdder atomicLong = ... atomicLong.destroy(); 6.11. 双精度浮点累加器(DoubleAdder) 基于Redis的Redisson分布式双精度浮点累加器采用了与java.util.concurrent.atomic.DoubleAdder类似的接口。通过利用客户端内置的DoubleAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicDouble对象快 12000 倍。完美适用于分布式统计计量场景。 RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble"); atomicDouble.add(12); atomicDouble.increment(); atomicDouble.decrement(); atomicDouble.sum(); 当不再使用双精度浮点累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。 RLongDouble atomicDouble = ... atomicDouble.destroy(); 6.12. 限流器(RateLimiter) 基于Redis的分布式限流器可以用来在分布式环境下现在请求方的调用频率。既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。该算法不保证公平性。 RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter"); // 初始化 // 最大流速 = 每1秒钟产生10个令牌 rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS); // 获取4个令牌 rateLimiter.tryAcquire(4); // 尝试获取4个令牌,尝试等待时间为2秒钟 rateLimiter.tryAcquire(4, 2, TimeUnit.SECONDS); rateLimiter.tryAcquireAsync(2, 2, TimeUnit.SECONDS); // 尝试获取1个令牌,等待时间不限 rateLimiter.acquire(); // 尝试获取1个令牌,等待时间不限 RFuture<Void> future = rateLimiter.acquireAsync(); 该功能仅限于Redisson PRO版本。

优秀的个人博客,低调大师

Redisson官方文档 - 7. 分布式集合

7.1. 映射(Map) 基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。同时还保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有本地缓存功能的映射。 RMap<String, SomeObject> map = redisson.getMap("anyMap"); SomeObject prevObject = map.put("123", new SomeObject()); SomeObject currentObject = map.putIfAbsent("323", new SomeObject()); SomeObject obj = map.remove("123"); map.fastPut("321", new SomeObject()); map.fastRemove("321"); RFuture<SomeObject> putAsyncFuture = map.putAsync("321"); RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321"); map.fastPutAsync("321", new SomeObject()); map.fastRemoveAsync("321"); 映射的字段锁的用法: RMap<MyKey, MyValue> map = redisson.getMap("anyMap"); MyKey k = new MyKey(); RLock keyLock = map.getLock(k); keyLock.lock(); try { MyValue v = map.get(k); // 其他业务逻辑 } finally { keyLock.unlock(); } RReadWriteLock rwLock = map.getReadWriteLock(k); rwLock.readLock().lock(); try { MyValue v = map.get(k); // 其他业务逻辑 } finally { keyLock.readLock().unlock(); } 7.1.1. 映射(Map)的元素淘汰(Eviction),本地缓存(LocalCache)和数据分片(Sharding) Redisson提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类: 元素淘汰(Eviction) 类 -- 带有元素淘汰(Eviction)机制的映射类允许针对一个映射中每个元素单独设定 有效时间 和 最长闲置时间 。 本地缓存(LocalCache) 类 -- 本地缓存(Local Cache)也叫就近缓存(Near Cache)。这类映射的使用主要用于在特定的场景下,映射缓存(MapCache)上的高度频繁的读取操作,使网络通信都被视为瓶颈的情况。Redisson与Redis通信的同时,还将部分数据保存在本地内存里。这样的设计的好处是它能将读取速度提高最多 45倍 。 数据分片(Sharding) 类 -- 数据分片(Sharding)类仅适用于Redis集群环境下,因此带有数据分片(Sharding)功能的映射也叫集群分布式映射。它利用分库的原理,将单一一个映射结构切分为若干个小的映射,并均匀的分布在集群中的各个槽里。这样的设计能使一个单一映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长。在扩容的同时,还能够使读写性能和元素淘汰处理能力随之成线性增长。 以下列表是Redisson提供的所有映射的名称及其特性: 接口名称 中文名称 RedissonClient 对应的构造方法 本地缓存功能 Local Cache 数据分片功能 Sharding 元素淘汰功能 Eviction RMap映射 getMap() No No No RMapCache映射缓存 getMapCache() No No Yes RLocalCachedMap本地缓存映射 getLocalCachedMap() Yes No No RLocalCachedMapCache本地缓存映射缓存仅限于Redisson PRO版本 getLocalCachedMapCache() Yes No Yes RClusteredMap集群分布式映射存仅限于Redisson PRO版本 getClusteredMap() No Yes No RClusteredMapCache集群分布式映射缓存存仅限于Redisson PRO版本 getClusteredMapCache() No Yes Yes RClusteredLocalCachedMap 集群分布式本地缓存映射存仅限于Redisson PRO版本 getClusteredLocalCachedMap() Yes Yes No RClusteredLocalCachedMapCache 集群分布式本地缓存映射缓存存仅限于Redisson PRO版本 getClusteredLocalCachedMapCache() Yes Yes Yes 除此以外,Redisson还提供了Spring Cache和JCache的实现。 元素淘汰功能(Eviction) Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。 目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。 RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap"); // 有效时间 ttl = 10分钟 map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES); // 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟 map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS); // 有效时间 = 3 秒钟 map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS); // 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟 map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); 本地缓存功能(Local Cache) 在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用Redisson提供的带有本地缓存功能的分布式本地缓存映射RLocalCachedMapJava对象会是一个很好的选择。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。本地缓存功能充分的利用了JVM的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较分布式映射相比提高最多 45倍 。以下配置参数可以用来创建这个实例: LocalCachedMapOptions options = LocalCachedMapOptions.defaults() // 用于淘汰清除本地缓存内的元素 // 共有以下几种选择: // LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。 // LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。 // SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。 // WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。 // NONE - 永不淘汰清除缓存元素。 .evictionPolicy(EvictionPolicy.NONE) // 如果缓存容量值为0表示不限制本地缓存容量大小 .cacheSize(1000) // 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。 // 断线重连的策略有以下几种: // CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存 // LOAD - 在服务端保存一份10分钟的作废日志 // 如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素 // 如果断线时间超过了这个时间,则将清空本地缓存中所有的内容 // NONE - 默认值。断线重连时不做处理。 .reconnectionStrategy(ReconnectionStrategy.NONE) // 以下选项适用于不同本地缓存之间相互保持同步的情况 // 缓存同步策略有以下几种: // INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素 // UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素 // NONE - 不做任何同步处理 .syncStrategy(SyncStrategy.INVALIDATE) // 每个Map本地缓存里元素的有效时间,默认毫秒为单位 .timeToLive(10000) // 或者 .timeToLive(10, TimeUnit.SECONDS) // 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位 .maxIdle(10000) // 或者 .maxIdle(10, TimeUnit.SECONDS); RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options); String prevObject = map.put("123", 1); String currentObject = map.putIfAbsent("323", 2); String obj = map.remove("123"); // 在不需要旧值的情况下可以使用fast为前缀的类似方法 map.fastPut("a", 1); map.fastPutIfAbsent("d", 32); map.fastRemove("b"); RFuture<String> putAsyncFuture = map.putAsync("321"); RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321"); map.fastPutAsync("321", new SomeObject()); map.fastRemoveAsync("321"); 当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。 RLocalCachedMap<String, Integer> map = ... map.destroy(); 如何通过加载数据的方式来降低过期淘汰事件发布信息对网络的影响 代码范例: public void loadData(String cacheName, Map<String, String> data) { RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE)); RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE)); loadMap.putAll(data); clearMap.clearLocalCache(); } 数据分片功能(Sharding) Map数据分片是Redis集群模式下的一个功能。Redisson提供的分布式集群映射RClusteredMap Java对象也是基于RMap实现的。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。在这里可以获取更多的内部信息。 RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap"); SomeObject prevObject = map.put("123", new SomeObject()); SomeObject currentObject = map.putIfAbsent("323", new SomeObject()); SomeObject obj = map.remove("123"); map.fastPut("321", new SomeObject()); map.fastRemove("321"); 7.1.2. 映射持久化方式(缓存策略) Redisson供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种: 将Redisson的分布式映射类型作为业务和外部储存媒介之间的缓存。 或是用来增加Redisson映射类型中数据的持久性,或是用来增加已被驱逐的数据的寿命。 或是用来缓存数据库,Web服务或其他数据源的数据。 Read-through策略 通俗的讲,如果一个被请求的数据不存在于Redisson的映射中的时候,Redisson将通过预先配置好的MapLoader对象加载数据。 Write-through(数据同步写入)策略 在遇到映射中某条数据被更改时,Redisson会首先通过预先配置好的MapWriter对象写入到外部储存系统,然后再更新Redis内的数据。 Write-behind(数据异步写入)策略 对映射的数据的更改会首先写入到Redis,然后再使用异步的方式,通过MapWriter对象写入到外部储存系统。在并发环境下可以通过writeBehindThreads参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。 以上策略适用于所有实现了RMap、RMapCache、RLocalCachedMap和RLocalCachedMapCache接口的对象。 配置范例: MapOptions<K, V> options = MapOptions.<K, V>defaults() .writer(myWriter) .loader(myLoader); RMap<K, V> map = redisson.getMap("test", options); // 或 RMapCache<K, V> map = redisson.getMapCache("test", options); // 或 RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options); // 或 RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options); 7.1.3. 映射监听器(Map Listener) Redisson为所有实现了RMapCache或RLocalCachedMapCache接口的对象提供了监听以下事件的监听器: 事件 | 监听器元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener 元素 过期 事件 | org.redisson.api.map.event.EntryExpiredListener 元素 删除 事件 | org.redisson.api.map.event.EntryRemovedListener 元素 更新 事件 | org.redisson.api.map.event.EntryUpdatedListener 使用范例: RMapCache<String, Integer> map = redisson.getMapCache("myMap"); // 或 RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options); int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() { @Override public void onUpdated(EntryEvent<Integer, Integer> event) { event.getKey(); // 字段名 event.getValue() // 新值 event.getOldValue() // 旧值 // ... } }); int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() { @Override public void onCreated(EntryEvent<Integer, Integer> event) { event.getKey(); // 字段名 event.getValue() // 值 // ... } }); int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() { @Override public void onExpired(EntryEvent<Integer, Integer> event) { event.getKey(); // 字段名 event.getValue() // 值 // ... } }); int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() { @Override public void onRemoved(EntryEvent<Integer, Integer> event) { event.getKey(); // 字段名 event.getValue() // 值 // ... } }); map.removeListener(updateListener); map.removeListener(createListener); map.removeListener(expireListener); map.removeListener(removeListener); 7.1.4. LRU有界映射 Redisson提供了基于Redis的以LRU为驱逐策略的分布式LRU有界映射对象。顾名思义,分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。 RMapCache<String, String> map = redisson.getMapCache("map"); // 尝试将该映射的最大容量限制设定为10 map.trySetMaxSize(10); // 将该映射的最大容量限制设定或更改为10 map.setMaxSize(10); map.put("1", "2"); map.put("3", "3", 1, TimeUnit.SECONDS); 7.2. 多值映射(Multimap) 基于Redis的Redisson的分布式RMultimap Java对象允许Map中的一个字段值包含多个元素。字段总数受Redis限制,每个Multimap最多允许有4 294 967 295个不同字段。 7.2.1. 基于集(Set)的多值映射(Multimap) 基于Set的Multimap不允许一个字段值包含有重复的元素。 RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap"); map.put(new SimpleKey("0"), new SimpleValue("1")); map.put(new SimpleKey("0"), new SimpleValue("2")); map.put(new SimpleKey("3"), new SimpleValue("4")); Set<SimpleValue> allValues = map.get(new SimpleKey("0")); List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5")); Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues); Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0")); 7.2.2. 基于列表(List)的多值映射(Multimap) 基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。 RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1"); map.put(new SimpleKey("0"), new SimpleValue("1")); map.put(new SimpleKey("0"), new SimpleValue("2")); map.put(new SimpleKey("0"), new SimpleValue("1")); map.put(new SimpleKey("3"), new SimpleValue("4")); List<SimpleValue> allValues = map.get(new SimpleKey("0")); Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5")); List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues); List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0")); 7.2.3. 多值映射(Multimap)淘汰机制(Eviction) Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache接口和RListMultimapCache接口,分别对应的是Set和List的Multimaps。 所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。 RSetMultimapCache的使用范例: RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap"); multimap.put("1", "a"); multimap.put("1", "b"); multimap.put("1", "c"); multimap.put("2", "e"); multimap.put("2", "f"); multimap.expireKey("2", 10, TimeUnit.MINUTES); 7.3. 集(Set) 基于Redis的Redisson的分布式Set结构的RSet Java对象实现了java.util.Set接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RSet<SomeObject> set = redisson.getSet("anySet"); set.add(new SomeObject()); set.remove(new SomeObject()); Redisson PRO版本中的Set对象还可以在Redis集群环境下支持单集合数据分片。 7.3.1. 集(Set)淘汰机制(Eviction) 基于Redis的Redisson的分布式RSetCache Java对象在基于RSet的前提下实现了针对单个元素的淘汰机制。由于RSetCache是基于RSet实现的,使它还集成了java.util.Set接口。 目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。 RSetCache<SomeObject> set = redisson.getSetCache("anySet"); // ttl = 10 seconds set.add(new SomeObject(), 10, TimeUnit.SECONDS); 7.3.2. 集(Set)数据分片(Sharding) Set数据分片是Redis集群模式下的一个功能。Redisson提供的分布式RClusteredSet Java对象也是基于RSet实现的。在这里可以获取更多的信息。 RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet"); set.add(new SomeObject()); set.remove(new SomeObject()); 除了RClusteredSet以外,Redisson还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。RClusteredSetCache 类分别同时提供了RClusteredSet 和RSetCache 这两个接口的实现。当然这些都是基于java.util.Set的接口实现上的。 该功能仅限于Redisson PRO版本。 7.4. 有序集(SortedSet) 基于Redis的Redisson的分布式RSortedSet Java对象实现了java.util.SortedSet接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。 RSortedSet<Integer> set = redisson.getSortedSet("anySet"); set.trySetComparator(new MyComparator()); // 配置元素比较器 set.add(3); set.add(1); set.add(2); set.removeAsync(0); set.addAsync(5); 7.5. 计分排序集(ScoredSortedSet) 基于Redis的Redisson的分布式RScoredSortedSet Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。 RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple"); set.add(0.13, new SomeObject(a, b)); set.addAsync(0.251, new SomeObject(c, d)); set.add(0.302, new SomeObject(g, d)); set.pollFirst(); set.pollLast(); int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置 Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分 7.6. 字典排序集(LexSortedSet) 基于Redis的Redisson的分布式RLexSortedSet Java对象在实现了java.util.Set<String>接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。 RLexSortedSet set = redisson.getLexSortedSet("simple"); set.add("d"); set.addAsync("e"); set.add("f"); set.lexRangeTail("d", false); set.lexCountHead("e"); set.lexRange("d", true, "z", false); 7.7. 列表(List) 基于Redis的Redisson分布式列表(List)结构的RList Java对象在实现了java.util.List接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RList<SomeObject> list = redisson.getList("anyList"); list.add(new SomeObject()); list.get(0); list.remove(new SomeObject()); 7.8. 队列(Queue) 基于Redis的Redisson分布式无界队列(Queue)结构的RQueue Java对象实现了java.util.Queue接口。尽管RQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RQueue<SomeObject> queue = redisson.getQueue("anyQueue"); queue.add(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); 7.9. 双端队列(Deque) 基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque Java对象实现了java.util.Deque接口。尽管RDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RDeque<SomeObject> queue = redisson.getDeque("anyDeque"); queue.addFirst(new SomeObject()); queue.addLast(new SomeObject()); SomeObject obj = queue.removeFirst(); SomeObject someObj = queue.removeLast(); 7.10. 阻塞队列(Blocking Queue) 基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。尽管RBlockingQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue"); queue.offer(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES); poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。 7.11. 有界阻塞队列(Bounded Blocking Queue) 基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。队列的初始容量(边界)必须在使用前设定好。 RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue"); // 如果初始容量(边界)设定成功则返回`真(true)`, // 如果初始容量(边界)已近存在则返回`假(false)`。 queue.trySetCapacity(2); queue.offer(new SomeObject(1)); queue.offer(new SomeObject(2)); // 此时容量已满,下面代码将会被阻塞,直到有空闲为止。 queue.put(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES); poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。 7.12. 阻塞双端队列(Blocking Deque) 基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque Java对象实现了java.util.concurrent.BlockingDeque接口。尽管RBlockingDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。 RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque"); deque.putFirst(1); deque.putLast(2); Integer firstValue = queue.takeFirst(); Integer lastValue = queue.takeLast(); Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES); Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES); poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。 7.13. 阻塞公平队列(Blocking Fair Queue) 基于Redis的Redisson分布式无界阻塞公平队列(Blocking Fair Queue)结构的RBlockingFairQueue Java对象在实现Redisson分布式无界阻塞队列(Blocking Queue)结构RBlockingQueue接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。 以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了poll、pollFromAny、pollLastAndOfferFirstTo和take方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。 RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue"); queue.offer(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES); 该功能仅限于Redisson PRO版本。 7.14. 阻塞公平双端队列(Blocking Fair Deque) 基于Redis的Redisson分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的RBlockingFairDeque Java对象在实现Redisson分布式无界阻塞双端队列(Blocking Deque)结构RBlockingDeque接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。 以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了poll、take、pollFirst、takeFirst、pollLast和takeLast方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。 RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque"); deque.offer(new SomeObject()); SomeObject firstElement = queue.peekFirst(); SomeObject firstElement = queue.pollFirst(); SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES); SomeObject firstElement = queue.takeFirst(); SomeObject lastElement = queue.peekLast(); SomeObject lastElement = queue.pollLast(); SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES); SomeObject lastElement = queue.takeLast(); 该功能仅限于Redisson PRO版本。 7.15. 延迟队列(Delayed Queue) 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。 RQueue<String> distinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue); // 10秒钟以后将消息发送到指定队列 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 一分钟以后将消息发送到指定队列 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES); 在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。 RDelayedQueue<String> delayedQueue = ... delayedQueue.destroy(); 7.16. 优先队列(Priority Queue) 基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue的接口。可以通过比较器(Comparator)接口来对元素排序。 RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue"); queue.trySetComparator(new MyComparator()); // 指定对象比较器 queue.add(3); queue.add(1); queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.poll(); 7.17. 优先双端队列(Priority Deque) 基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque的接口。可以通过比较器(Comparator)接口来对元素排序。 RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue"); queue.trySetComparator(new MyComparator()); // 指定对象比较器 queue.addLast(3); queue.addFirst(1); queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.pollFirst(); queue.pollLast(); 7.18. 优先阻塞队列(Priority Blocking Queue) 基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue的最大容量是4 294 967 295个元素。 RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue"); queue.trySetComparator(new MyComparator()); // 指定对象比较器 queue.add(3); queue.add(1); queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.take(); 当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。 7.19. 优先阻塞双端队列(Priority Blocking Deque) 基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque的接口。addLast、 addFirst、push方法不能再这个对里使用。PriorityBlockingDeque的最大容量是4 294 967 295个元素。 RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue"); queue.trySetComparator(new MyComparator()); // 指定对象比较器 queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.pollFirst(); queue.pollLast(); queue.takeFirst(); queue.takeLast(); 当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。

优秀的个人博客,低调大师

Android 触摸手势基础 官方文档概览2

触摸手势检测基础 手势检测一般包含两个阶段: 1.获取touch事件数据 2.解析这些数据,看它们是否满足你的应用所支持的某种手势。 相关API: MotionEvent 兼容版的: MotionEventCompat (Note thatMotionEventCompatis not a replacement for the MotionEvent class. Rather, it provides static utility methods to which you pass your MotionEvent object in order to receive the desired action associated with that event.) 一般的Activity或View中的touch事件处理 Activity或View类的onTouchEvent()回调函数会接收到touch事件。 为了截获touch事件,你需要覆写Activity或View的onTouchEvent方法。 View中还可以使用setOnTouchListener()方法添加点击事件的View.OnTouchListener监听对象。这样就可以不继承View而处理点击事件。 但是如果需要处理双击、长按、fling(快滑)等手势,你需要利用GestureDetector类。 onTouchEvent方法的返回值 onTouchEvent方法的返回值,如果返回true,意味着你已经处理过了touch事件;如果返回false,将会继续通过view stack传递事件,直到事件被处理。 这里需要注意ACTION_DOWN事件,如果返回false,则该listener将不会被告知后面的一系列ACTION_MOVE和ACTION_UP事件。 检测手势 Android提供了GestureDetector类来检测一般的手势。 基本使用: 1.生成GestureDetector对象(或GestureDetectorCompat对象),构造时的参数传入监听器对象。 监听器对象实现GestureDetector.OnGestureListener接口。 如果你仅仅是想利用其中的一些手势而不是全部,那么你可以选择继承GestureDetector.SimpleOnGestureListener类,这是一个适配器模式,即这个类实现了GestureDetector.OnGestureListener接口,为其中所有的方法提供了空实现(返回值都是false),当继承GestureDetector.SimpleOnGestureListener类时,子类只需要覆写感兴趣的方法,其他方法是空实现。 2.为了让GestureDetector对象接收到事件,需要覆写View或Activity中的onTouchEvent()方法,将事件传递给detector对象。 一个例子: 根据官网上说: 关于onDown()方法的返回值,最好是返回true,因为所有的手势都是从onDown()信息开始的。 如果像GestureDetector.SimpleOnGestureListener默认实现一样返回false,系统就会认为你想要忽略之后的其他手势,然后GestureDetector.OnGestureListener的其他方法就不会被调用。 但是实际程序验证的时候,发现返回true还是false好像没有什么影响。(??) 跟踪运动 速度 有很多不同的方法来记录手势中的运动,比如pointer的起始位置和终止位置;pointer运动的方向;手势的历史(通过getHistorySize()方法得到);还有pointer的运动速度。 Android提供了VelocityTracker类和VelocityTrackerCompat类,来记录touch事件的速度。 代码例子: 滚动手势 如果一个标准的布局有可能会超出它的容器的边界,可以把它嵌套在一个ScrollView中,这样就会得到一个可以滚动的布局,由framewok处理。 实现一个自定义的scroller应该只在一些特殊情况下需要。 Scroller用来随时间制造滚动动画,使用平台标准的滚动物理参数(摩擦力、速度等)。 Scroller自己本身实际上并不绘制任何东西。 Scroller记录滚动的偏移值,但是它并不会将这些位置应用到你的View,你需要自己动手。 详见:http://developer.android.com/training/gestures/scroll.html 多点触摸手势 当多个pointer同时触摸屏幕,系统会生成如下事件: ACTION_DOWN—For the first pointer that touches the screen. This starts the gesture. The pointer data for this pointer is always at index 0 in theMotionEvent. ACTION_POINTER_DOWN—For extra pointers that enter the screen beyond the first. The pointer data for this pointer is at the index returned bygetActionIndex(). ACTION_MOVE—A change has happened during a press gesture. ACTION_POINTER_UP—Sent when a non-primary pointer goes up. ACTION_UP—Sent when the last pointer leaves the screen. 你可以依靠每一个pointer的index和ID来追踪每一个pointer: Index:MotionEvent会把每一个pointer的信息放在一个数组里,index即是这个数组索引。大多数你用的MotionEvent方法是以这个index作为参数的。 ID:每一个pointer还有一个ID映射,在touch事件中保持恒定一致(persistent),这样就可以在整个手势中跟踪一个单独的pointer。 pointer在一个motion event中出现的顺序是未定的,所以pointer的index在不同的事件中是可变的,但是只要pointer保持active,它的ID是保持不变的。 通过getPointerId()获得ID,这样就可以在多个motion event中追踪pointer。然后对于连续的motion event,可以使用findPointerIndex()方法来获得指定ID的pointer在当前事件中的index。 比如: private int mActivePointerId; public boolean onTouchEvent(MotionEvent event) { .... // Get the pointer ID mActivePointerId = event.getPointerId(0); // ... Many touch events later... // Use the pointer ID to find the index of the active pointer // and fetch its position int pointerIndex = event.findPointerIndex(mActivePointerId); // Get the pointer's current position float x = event.getX(pointerIndex); float y = event.getY(pointerIndex); } 获取MotionEvent的动作应该使用getActionMasked()方法(或者是兼容版的MotionEventCompat.getActionMasked())。 与旧版的getAction()不同,getActionMasked()方法是被设计为可以多个pointer工作的。 它会返回带掩模的动作,不带pointer用于index的那些位。 你可以使用getActionIndex()来得到index。 拖动和缩放 拖动一个对象: 如果是Android 3.0以上,可以使用View的新接口View.OnDragListener参见:Drag and Drop。 其他参见:http://developer.android.com/training/gestures/scale.html 缩放可以使用ScaleGestureDetector。 ScaleGestureDetector可以和GestureDetector一起使用。 ViewGroup中的Touch事件处理 处理ViewGroup的touch事件要麻烦一些,因为很可能各种touch事件的目标不是ViewGroup而是它的child。 为了确保每一个child正确地接收到touch events,需要覆写ViewGroup的onInterceptTouchEvent()方法。 如果onInterceptTouchEvent()方法返回true,说明MotionEvent被截获了,它将不会被传递给child,而是传递给parent的onTouchEvent()方法。 如果你在parent的onInterceptTouchEvent()方法中返回了true,先前还在处理touch event的child view将会接收到一个ACTION_CANCEL,之后的事件就会全传递到parent的onTouchEvent中。 如果onInterceptTouchEvent()方法返回false,则事件继续顺着view结构向下传递,parent不会截获事件,也不会调用parent的onTouchEvent()方法。 另: ViewConfiguration提供一些常量。 TouchDelegate类可以用来设置View的触摸区域。 用法见:http://developer.android.com/training/gestures/viewgroup.html 本文转自莫水千流博客园博客,原文链接:http://www.cnblogs.com/zhoug2020/p/6106863.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Storm官方提供的trident单词计数的例子

上代码: 1 public class TridentWordCount { 2 public static class Split extends BaseFunction { 3 @Override 4 public void execute(TridentTuple tuple, TridentCollector collector) { 5 String sentence = tuple.getString(0); 6 for (String word : sentence.split(" ")) { 7 collector.emit(new Values(word)); 8 } 9 } 10 } 11 12 public static StormTopology buildTopology(LocalDRPC drpc) { 13 //这个是一个batch Spout 一次发3个.. 14 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), 15 new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), 16 new Values("how many apples can you eat"), new Values("to be or not to be the person")); 17 spout.setCycle(true);//Spout是否循环发送 18 19 TridentTopology topology = new TridentTopology(); 20 TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)//类似于setSpout 21 .each(new Fields("sentence"),new Split(), new Fields("word"))//setbolt 22 .groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16); 23 24 topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( 25 "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), 26 new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); 27 return topology.build(); 28 } 29 30 public static void main(String[] args) throws Exception { 31 Config conf = new Config(); 32 conf.setMaxSpoutPending(20); 33 if (args.length == 0) { 34 LocalDRPC drpc = new LocalDRPC(); 35 LocalCluster cluster = new LocalCluster(); 36 cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); 37 for (int i = 0; i < 100; i++) { 38 System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); 39 Thread.sleep(1000); 40 } 41 } 42 else { 43 conf.setNumWorkers(3); 44 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); 45 } 46 } 47 } 本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675985.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 多语言接口协议

本文描述了 Storm (0.7.1 版本以上)的多语言接口协议。 Storm 多语言协议 Shell 组件 Storm 的多语言支持主要通过 ShellBolt,ShellSpout 和 ShellProcess 类来实现。这些类实现了 IBolt 接口、ISpout 接口,并通过使用 Java 的 ProcessBuilder 类调用 shell 进程实现了执行脚本的接口协议。 输出域 输出域是拓扑的 Thrift 定义的一部分。也就是说,如果你在 Java 中使用了多语言接口,那么你就需要创建一个继承自 ShellBolt 并实现 IRichBolt 接口的 bolt,这个 bolt 还需要在declareOutputFields方法中声明输出域(ShellSpout 也有类似的问题)。 你可以在基础概念一文中了解更多相关信息。 协议报头 最简单的协议是通过执行脚本或程序的标准输入输出(STDIN/STDOUT)来实现的。在这个过程中传输的数据都是以 JSON 格式编码的,这样可以支持很多种语言。 打包 为了在集群上运行壳组件,执行的外壳脚本必须和待提交的 jar 包一起置于resources/目录下。 但是,在本地开发测试时,resources 目录只需要保持在 classpath 中即可。 协议 注意: 输入输出协议的结尾都使用行读机制,所以,必须要修剪掉输入中的新行并将他们添加到输出中。 所有的 JSON 输入输出都由一个包含 “end” 的行结束标志。注意,这个定界符并不是 JSON 的一部分。 下面的几个标题就是从脚本作者的 STDIN 与 STDOUT 的角度出发的。 初始握手 两种类型壳组件的初始握手过程都是相同的: STDIN: 设置信息。这是一个包含 Storm 配置、PID 目录、拓扑上下文的 JSON 对象: { "conf": { "topology.message.timeout.secs": 3, // etc }, "pidDir": "...", "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt1", "4": "example-bolt2" }, "taskid": 3, // 以下内容仅支持 Storm 0.10.0 以上版本 "componentid": "example-bolt" "stream->target->grouping": { "default": { "example-bolt2": { "type": "SHUFFLE"}}}, "streams": ["default"], "stream->outputfields": {"default": ["word"]}, "source->stream->grouping": { "example-spout": { "default": { "type": "FIELDS", "fields": ["word"] } } } "source->stream->fields": { "example-spout": { "default": ["word"] } } } } 你的脚本应该在这个目录下创建一个以 PID 命名的空文件。比如,PID 是 1234 的时候,在目录中创建一个名为 1234 的空文件。这个文件可以让 supervisor 了解到进程的 PID,这样,supervisor 在需要的时候就可以关闭该进程。 Storm 0.10.0 加强了发送到壳组件的上下文的功能,现在的上下文中包含了兼容 JVM 组件的拓扑上下文中的所有内容。新增的一个关键因素是确定拓扑中某个壳组件的源与目标(也就是输入与输出)的功能,这是通过stream->target->grouping和source->stream->grouping字典实现的。在这些关联字典的底层,分组是以字典的形式表示的,至少包含有一个type键,并且也可以含有一个fields键,该键可以用于指定在FIELDS分组中所涉及的域。 STDOUT: 你的 PID,以 JSON 对象的形式展现,比如{"pid": 1234}。这个壳组件将会把 PID 记录到它自己的日志中。 接下来怎么做就要取决于组件的具体类型了。 Spouts Shell Spouts 都是同步的。以下内容是在一个 while(true) 循环中实现的: STDIN: 一个 next、ack 或者 fail 命令。 “next” 与 ISpout 的nextTuple等价,可以这样定义 “next”: {"command": "next"} 可以这样定义 “ack”: {"command": "ack", "id": "1231231"} 可以这样定义 “fail”: {"command": "fail", "id": "1231231"} STDOUT: 前面的命令对你的 spout 作用产生的结果。这个结果可以是一组 emits 和 logs。 emit 大概是这样的: { "command": "emit", // tuple 的 id,如果是不可靠 emit 可以省略此值,该 id 可以为字符串或者数字 "id": "1231231", // tuple 将要发送到的流 id,如果发送到默认流,将该值留空 "stream": "1", // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id "task": 9, // 这个 tuple 中的所有值 "tuple": ["field1", 2, 3] } 如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。 “log” 会将消息记录到 worker log 中,“log” 大概是这样的: { "command": "log", // 待记录的消息 "msg": "hello world!" } STDOUT: “sync” 命令会结束 emits 与 logs 的队列,“sync” 是这样使用的: {"command": "sync"} 在 sync 之后, ShellSpout 不会继续读取你的输出,直到它发送出新的 next,ack 或者 fail。 注意,与 ISpout 类似,worker 中的所有 spouts 都会在调用 next,ack 或者 fail 之后锁定,直到你调用 sync。同样,如果没有需要发送的 tuple,你也应该在 sync 之前 sleep 一小段时间。ShellSpout 不会自动 sleep。 Bolts Shell Bolts 的协议是异步的。你会在有 tuple 可用时立即从 STDIN 中获取到 tuple,同时你需要像下面的示例这样调用 emit,ack,fail,log 等操作写入 STDOUT: STDIN: 就是一个 tuple!这是一个 JSON 编码的结构: { // tuple 的 id,为了兼容缺少 64 位数据类型的语言,这里使用了字符串 "id": "-6955786537413359385", // 创建该 tuple 的 id "comp": "1", // tuple 将要发往的流 id "stream": "1", // 创建该 tuple 的任务 "task": 9, // tuple 中的所有值 "tuple": ["snow white and the seven dwarfs", "field2", 3] } STDOUT: 一个 ack,fail,emit 或者 log。例如,emit 是这样的: { "command": "emit", // 标记这个输出 tuple 的 tuples 的 ids "anchors": ["1231231", "-234234234"], // tuple 将要发送到的流 id,如果发送到默认流,将该值留空 "stream": "1", // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id "task": 9, // 这个 tuple 中的所有值 "tuple": ["field1", 2, 3] } 如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。注意,由于 shell bolt 协议的异步特性,如果你在 emit 之后立即接收数据,有可能不会收到对应的任务 id,而是收到上一个 emit 的任务 id,或者是一个待处理的新 tuple。然而,最终接收到的任务 id 序列仍然是和 emit 的顺序完全一致的。 ack 是这样的: { "command": "ack", // 待 ack 的 tuple "id": "123123" } fail 是这样的: { "command": "fail", // 待 fail 的 tuple "id": "123123" } “log” 会将消息记录到 worker log 中,“log” 是这样的: { "command": "log", // 待记录的消息 "msg": "hello world!" } 注意:对于 0.7.1 版本,shell bolt 不再需要进行“同步”。 处理心跳(0.9.3 及以上版本适用) Storm 0.9.3 通过在 ShellSpout/ShellBolt 与他们的多语言子进程之间使用心跳来检测子进程是否处于挂起或僵死状态。所有通过多语言接口与 Storm 交互的库都必须使用以下步骤来…… Spout Shell Spouts 是同步的,所有子进程会在next()的结尾发送sync命令。因此,你不需要为 spouts 做过多的处理。也就是说,在next()过程中不能够让子进程的 sleep 时间超过 worker 的延时时间。 Bolt Shell Bolts 是异步的,所以 ShellBolt 会定期向它的子进程发送心跳 tuple。心跳 tuple 是这样的: { "id": "-6955786537413359385", "comp": "1", "stream": "__heartbeat", // 这个 shell bolt 的系统任务 id "task": -1, "tuple": [] } 在子进程收到心跳 tuple 之后,它必须向 ShellBolt 发送一个sync命令。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 使用非 JVM 语言开发

两个部分:创建拓扑,以及使用其他语言实现 spouts 与 bolts 由于 Storm 的拓扑都是基于 thrift 结构的,所以使用其他语言创建拓扑也是一件很容易的事情 使用其他语言实现的 spouts 与 bolts 称为“多语言组件”(multilang components)或者“脱壳”(shelling) 这是具体的实现协议:多语言接口协议 thrift 结构允许你定义以一个程序和脚本的方式定义多语言组件(例如,可以使用 python 程序和文件实现 bolt) 在 Java 中,需要覆写 ShellBolt 或者 ShellSpout 来创建多语言组件 注意,输出域是在 thrift 结构中声明的,所以在 Java 中你需要这样创建多语言组件: 在 Java 中声明域,并通过在 shellbolt 的构造器中指定输出域来处理其他语言的代码 多语言组件在 STDIN/STDOUT 中使用 JSON 消息来和子进程通信 已经实现了 Ruby,Python 等语言的相关协议,例如,python 支持 emit、anchor、ack 与 log等操作 “storm shell” 命令简化了构造 jar 包与向 nimbus 上传文件的过程 构建 jar 文件并将其上传 使用 nimbus 的 host/port 以及 jar 文件的 id 来调用你的程序 以非 JVM 语言实现 DSL 的相关说明 译者注:由于本文部分内容与另一篇文档定义 Storm 的非 JVM 语言 DSL重复,这里不再罗列,详情请参阅该文档。 转载自并发编程网 - ifeve.com

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。