首页 文章 精选 留言 我的

精选列表

搜索[官方],共10003篇文章
优秀的个人博客,低调大师

谷歌官方Android应用架构库——LiveData

LiveData 是一个数据持有者类,它持有一个值并允许观察该值。不同于普通的可观察者,LiveData 遵守应用程序组件的生命周期,以便 Observer 可以指定一个其应该遵守的 Lifecycle。 如果 Observer 的 Lifecycle 处于 STARTED 或 RESUMED 状态,LiveData 会认为 Observer 处于活动状态。 publicclassLocationLiveDataextendsLiveData<Location>{ privateLocationManagerlocationManager; privateSimpleLocationListenerlistener=newSimpleLocationListener(){ @Override publicvoidonLocationChanged(Locationlocation){ setValue(location); } }; publicLocationLiveData(Contextcontext){ locationManager=(LocationManager)context.getSystemService( Context.LOCATION_SERVICE); } @Override protectedvoidonActive(){ locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER,0,0,listener); } @Override protectedvoidonInactive(){ locationManager.removeUpdates(listener); } } Location 监听的实现有 3 个重要部分: onActive():当 LiveData 有一个处于活动状态的观察者时该方法被调用,这意味着需要开始从设备观察位置更新。 vonInactive():当 LiveData 没有任何处于活动状态的观察者时该方法被调用。由于没有观察者在监听,所以没有理由保持与 LocationManager 的连接。这是非常重要的,因为保持连接会显著消耗电量并且没有任何好处。 setValue():调用该方法更新 LiveData 实例的值,并将此变更通知给处于活动状态的观察者。 可以像下面这样使用新的 LocationLiveData: publicclassMyFragmentextendsLifecycleFragment{ publicvoidonActivityCreated(BundlesavedInstanceState){ LiveData<Location>myLocationListener=...; Util.checkUserStatus(result->{ if(result){ myLocationListener.addObserver(this,location->{ //updateUI }); } }); } } 请注意,addObserver() 方法将 LifecycleOwner 作为第一个参数传递。这样做表示该观察者应该绑定到 Lifecycle,意思是: 如果 Lifecycle 不处于活动状态(STARTED 或 RESUMED),即使该值发生变化也不会调用观察者。 如果 Lifecycle 被销毁,那么自动移除观察者。 LiveData 是生命周期感知的事实给我们提供了一个新的可能:可以在多个 activity,fragment 等之间共享它。为了保持实例简单,可以将其作为单例,如下所示: publicclassLocationLiveDataextendsLiveData<Location>{ privatestaticLocationLiveDatasInstance; privateLocationManagerlocationManager; @MainThread publicstaticLocationLiveDataget(Contextcontext){ if(sInstance==null){ sInstance=newLocationLiveData(context.getApplicationContext()); } returnsInstance; } privateSimpleLocationListenerlistener=newSimpleLocationListener(){ @Override publicvoidonLocationChanged(Locationlocation){ setValue(location); } }; privateLocationLiveData(Contextcontext){ locationManager=(LocationManager)context.getSystemService( Context.LOCATION_SERVICE); } @Override protectedvoidonActive(){ locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER,0,0,listener); } @Override protectedvoidonInactive(){ locationManager.removeUpdates(listener); } } 现在 fragment 可以像下面这样使用它: publicclassMyFragmentextendsLifecycleFragment{ publicvoidonActivityCreated(BundlesavedInstanceState){ Util.checkUserStatus(result->{ if(result){ LocationLiveData.get(getActivity()).observe(this,location->{ //updateUI }); } }); } } 可能会有多个 fragment 和 activity 在观察 MyLocationListener 实例,LiveData 可以规范的管理它们,以便只有当它们中的任何一个可见(即处于活动状态)时才连接到系统服务。 LiveData 有以下优点: 没有内存泄漏:因为 Observer 被绑定到它们自己的 Lifecycle 对象上,所以,当它们的 Lifecycle 被销毁时,它们能自动的被清理。 不会因为 activity 停止而崩溃:如果 Observer 的 Lifecycle 处于闲置状态(例如:activity 在后台时),它们不会收到变更事件。 始终保持数据最新:如果 Lifecycle 重新启动(例如:activity 从后台返回到启动状态)将会收到最新的位置数据(除非还没有)。 正确处理配置更改:如果 activity 或 fragment 由于配置更改(如:设备旋转)重新创建,将会立即收到最新的有效位置数据。 资源共享:可以只保留一个 MyLocationListener 实例,只连接系统服务一次,并且能够正确的支持应用程序中的所有观察者。 不再手动管理生命周期:fragment 只是在需要的时候观察数据,不用担心被停止或者在停止之后启动观察。由于 fragment 在观察数据时提供了其 Lifecycle,所以 LiveData 会自动管理这一切。 LiveData 的转换 有时候可能会需要在将 LiveData 发送到观察者之前改变它的值,或者需要更具另一个 LiveData 返回一个不同的 LiveData 实例。 Lifecycle 包提供了一个 Transformations 类包含对这些操作的帮助方法。 ,%20android.arch.core.util.Function LiveData<User>userLiveData=...; LiveData<String>userName=Transformations.map(userLiveData,user->{ user.name+""+user.lastName }); ,%20android.arch.core.util.Function privateLiveData<User>getUser(Stringid){ ...; } LiveData<String>userId=...; LiveData<User>user=Transformations.switchMap(userId,id->getUser(id)); 使用这些转换允许在整个调用链中携带观察者的 Lifecycle 信息,以便只有在观察者观察到 LiveData 的返回时才运算这些转换。转换的这种惰性运算性质允许隐式的传递生命周期相关行为,而不必添加显式的调用或依赖。 每当你认为在 ViewModel 中需要一个 Lifecycle 类时,转换可能是解决方案。 例如:假设有一个 UI,用户输入一个地址然后会收到该地址的邮政编码。该 UI 简单的 ViewModel 可能像这样: classMyViewModelextendsViewModel{ privatefinalPostalCodeRepositoryrepository; publicMyViewModel(PostalCodeRepositoryrepository){ this.repository=repository; } privateLiveData<String>getPostalCode(Stringaddress){ //DON'TDOTHIS returnrepository.getPostCode(address); } } 如果是像这种实现,UI 需要先从之前的 LiveData 注销并且在每次调用 getPostalCode() 时重新注册到新的实例。此外,如果 UI 被重新创建,它将会触发新的 repository.getPostCode() 调用,而不是使用之前的调用结果。 不能使用那种方式,而应该实现将地址输入转换为邮政编码信息。 classMyViewModelextendsViewModel{ privatefinalPostalCodeRepositoryrepository; privatefinalMutableLiveData<String>addressInput=newMutableLiveData(); publicfinalLiveData<String>postalCode= Transformations.switchMap(addressInput,(address)->{ returnrepository.getPostCode(address); }); publicMyViewModel(PostalCodeRepositoryrepository){ this.repository=repository } privatevoidsetInput(Stringaddress){ addressInput.setValue(address); } } 请注意,我们甚至使 postalCode 字段为 public final,因为它永远不会改变。postalCode 被定义为 addressInput 的转换,所以当 addressInput 改变时,如果有处于活动状态的观察者,repository.getPostCode() 将会被调用。如果在调用时没有处于活动状态的观察者,在添加观察者之前不会进行任何运算。 该机制允许以较少的资源根据需要惰性运算来创建 LiveData。ViewModel 可以轻松获取到 LiveData 并在它们上面定义转换规则。 创建新的转换 在应用程序中可能会用到十几种不同的特定转换,但是默认是不提供的。可以使用 MediatorLiveData 实现自己的转换,MediatorLiveData 是为了用来正确的监听其它 LiveData 实例并处理它们发出的事件而特别创建的。MediatorLiveData 需要特别注意正确的向源 LiveData 传递其处于活动/闲置状态。有关详细信息,请参阅 Transformations 类。 本文作者:佚名 来源:51CTO

优秀的个人博客,低调大师

官方文档,才是正途-docker-compose

需要的ingress网络映射,还是host宿主机端口映射: https://docs.docker.com/compose/compose-file/#secrets ======================== docker service create --name web \ --publish mode=host,published=80,target=80 \ nginx ========================== ports Expose ports. SHORT SYNTAX Either specify both ports (HOST:CONTAINER), or just the container port (a random host port will be chosen). Note: When mapping ports in theHOST:CONTAINERformat, you may experience erroneous results when using a container port lower than 60, because YAML will parse numbers in the formatxx:yyas sexagesimal (base 60). For this reason, we recommend always explicitly specifying your port mappings as strings. ports: - "3000" - "3000-3005" - "8000:8000" - "9090-9091:8080-8081" - "49100:22" - "127.0.0.1:8001:8001" - "127.0.0.1:5000-5010:5000-5010" - "6060:6060/udp" LONG SYNTAX The long form syntax allows the configuration of additional fields that can’t be expressed in the short form. target: the port inside the container published: the publicly exposed port protocol: the port protocol (tcporudp) mode:hostfor publishing a host port on each node, oringressfor a swarm mode port which will be load balanced. ports: - target: 80 published: 8080 protocol: tcp mode: host Note:The long syntax is new in v3.2

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 源码组织结构

Strom 的代码有三个层次: 第一,Storm 在一开始就是按照兼容多语言的目的来设计的。Nimbus 是一个 Thrift 服务,拓扑也被定义为 Thrift 架构。Thrift 的使用使得 Storm 可以用于任何一种语言。 第二,所有的 Storm 接口都设计为 Java 接口。所以,尽管 Storm 核心代码中有大量的 Clojure 实现,所有的访问都必须经过 Java API。这就意味着 Storm 的每个特性都可以通过 Java 来实现。 第三,Storm 的实现中大量使用了 Clojure。可以说,Storm 的代码结构大概是一半的 Java 代码加上一半的 Clojure 代码。但是由于 Clojure 更具有表现力,所以实际上 Storm 的核心逻辑大多是采用 Clojure 来实现的。 下面详细说明了每个层次的细节信息。 storm.thrift 要理解 Storm 的代码架构,首先需要了解storm.thrift文件。 Storm 使用这个fork版本的 Thrift(“storm” 分支)来生成代码。这个 “fork” 版本实际上就是 Thrift7,其中所有的 Java package 也都重命名成了org.apache.thrift7。在其他方面,它与 Thrift7 完全相同。这个 fork 主要是为了解决 Thrift 缺乏向后兼容的机制的问题,同时,也可以让用户在自己的 Storm 拓扑中使用其他版本的 Thrift。 拓扑中的每个 spout 或者 bolt 都有一个特定的标识,这个标识称为“组件 id”。组件 id 主要为了从拓扑中 spout 和 bolt 的输出流中选择一个或多个流作为某个 bolt 订阅的输入流。Storm 拓扑中就包含有一个组件 id 与每种类型的组件(spout 与 bolt)相关联的 map。 Spout 和 Bolt 有相同的 Thrift 定义。我们来看看Bolt 的 Thrift 定义。它包含一个ComponentObject结构和一个ComponentCommon结构。 ComponentObject定义了 bolt 的实现,这个实现可以是以下三种类型中的一种: 一个 Java 序列化对象(实现了IBolt接口的对象)。 一个用于表明其他语言的实现的ShellComponent对象。以这种方式指定一个 bolt 会让 Storm 实例化一个ShellBolt对象来处理基于 JVM 的 worker 进程与组件的非 JVM 实现之间的通信。 一个带有类名与构造器参数的 Java 对象结构,Storm 可以使用这个结构来实例化 bolt。如果你需要定义一个非 JVM 语言的拓扑这个类型会很有用。使用这种方式,你可以在不创建并且序列化一个 Java 对象的情况下使用基于 JVM 的 spout 与 bolt。 ComponentCommon定义了组件的其他方面特性,包括: 该组件的输出流以及每个流的 metadata(无论是一个直接流还是基于域定义的流); 该组件消费的输入流(使用流分组所定义的一个将组件 id 与流 id 相关联的 map 来指定); 该组件的并行度; 该组件的组件级配置。 注意,spout 的结构也有一个ComponentCommon域,所以理论上说 spout 也可以声明一个输入流。然而 Storm 的 Java API 并没有为 spout 提供消费其他的流的方法,并且如果你为 spout 声明了输入流,在提交拓扑的时候也会报错。这是因为 spout 的输入流声明不是为了用户的使用,而是为了 Storm 内部的使用。Storm 会为拓扑添加隐含的流与 bolt 来设置应答框架(acking framework)。这些隐含的流中就有两个流用于从 acker bolt 向拓扑中的每个 spout 发送消息。在发现 tuple 树完成或者失败之后,acker 就会通过这些隐含的流发送 “ack” 或者 “fail” 消息。将用户的拓扑转化为运行时拓扑的代码在这里。 Java 接口 Storm 的对外接口基本上为 Java 接口,主要的几个接口有: IRichBolt IRichSpout TopologyBuilder 大部分接口的策略为: 使用一个 Java 接口来定义接口; 实现一个具有适当的默认实现的 Base 类。 你可以从BaseRichSpout类中观察到这种策略的工作机制。 如上所述,Spout 和 Bolt 都已经根据拓扑的 Thrift 定义进行了序列化。 在这些接口中,IBolt、ISpout与IRichBolt、IRichSpout之间存在着一些细微的差别。其中最主要的区别是带有 “Rich” 的接口中增加了declareOutputFields方法。这种区别的原因主要在于每个输出流的输出域声明必须是 Thrift 结构的一部分(这样才能实现跨语言操作),而用户本身只需要将流声明为自己的类的一部分即可。TopologyBuilder在构造 Thrift 结构时所做的就是调用declareOutputFields方法来获取声明并将其转化为 Thrift 结构。这种转化过程可以在TopologyBuilder的源码中看到。 实现 通过 Java 接口来详细说明所有的功能可以确保 Storm 的每个特征都是有效的。更重要的是,关注 Java 接口可以让有 Java 使用经验的用户更易上手。 另一方面,Storm 的核心架构主要是通过 Clojure 实现的。尽管按照一般的计数规则来说代码库中 Java 与 Clojure 各占 50%,但是大部分逻辑实现还是基于 Clojure 的。不过也有两个例外,分别是 DRPC 和事务型拓扑的实现。这两个部分是完全使用 Java 实现的。这是为了说明在 Storm 中如何实现高级抽象。DRPC 和事务型拓扑的实现分别位于backtype.storm.coordination、backtype.storm.drpc和backtype.storm.transactional包中。 以下是主要的 Java 包和 Clojure 命名空间的总结。 Java packages backtype.storm.coordination: 实现了用于将批处理整合到 Storm 上层的功能,DRPC 和事务型拓扑都需要这个功能。CoordinatedBolt是其中最重要的类。 backtype.storm.drpc: DRPC 高级抽象的实现。 backtype.storm.generated: 为 Storm 生成的 Thrift 代码(使用了这个fork版本的 Thrift,其中仅仅将包名重命名为 org.apache.thrift7 来避免与其他 Thrift 版本的冲突)。 backtype.storm.grouping: 包含自定义流分组的接口。 backtype.storm.hooks: 用于在 Storm 中添加事件钩子的接口,这些事件包括任务发送 tuple、tuple 被 ack 等等。 backtype.storm.serialization: Storm 序列化/反序列化 tuple 的接口。这是在Kryo的基础上构建的。 backtype.storm.spout: Spout 与一些关联接口的定义(例如SpoutOutputCollector)。其中也包含有用于实现非 JVM 语言 spout 的协议的ShellSpout。 backtype.storm.task: Bolt 与关联接口的定义(例如OutputCollector)。其中也包含有用于实现非 JVM 语言 bolt 的协议的ShellBolt。最后,TopologyContext也是在这里定义的,该类可以用于在拓扑运行时为 spout 和 bolt 提供拓扑以及他们自身执行的相关信息。 backtype.storm.testing: 包含很多 bolt 测试类以及用于 Storm 单元测试的工具类。 backtype.storm.topology: 在 Thrift 结构上层的 Java 层,用于为 Storm 提供完全的 Java API(用户不必了解 Thrift)。TopologyBuilder和一些为不同的 spout 和 bolt 提供帮助的基础类都在这里。稍微高级一点的IBasicBolt接口也在这里,该接口是一种实现基本的 bolt 的简单方式。 backtype.storm.transactional: 事务型拓扑的实现。 backtype.storm.tuple: Storm tuple 数据模型的实现。 backtype.storm.utils: 整个代码库中通用的数据结构和各种工具类。 Clojure namespaces 译者注:Clojure 部分内容暂不提供翻译。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 序列化

本文阐述了 Storm 0.6.0 以上版本的序列化机制。在低于 0.6.0 版本的 Storm 中使用了另一种序列化系统,详细信息可以参考Serialization (prior to 0.6.0)一文。 Storm 中的 tuple 可以包含任何类型的对象。由于 Storm 是一个分布式系统,所以在不同的任务之间传递消息时 Storm 必须知道怎样序列化、反序列化消息对象。 Storm 使用Kryo对对象进行序列化。Kryo 是一个生成小序列的灵活、快速的序列化库。 Storm 本身支持基础类型、字符串、字节数组、ArrayList、HashMap、HashSet 以及 Clojure 的集合类型的序列化。如果你需要在 tuple 中使用其他的对象类型,你就需要注册一个自定义的序列化器。 动态类型 在 tuple 中没有对各个域(field)的直接类型声明。你需要将对象放入对应的域中,然后 Storm 可以动态地实现对象的序列化。在学习序列化接口之前,我们先来了解一下为什么 Storm 的 tuple 是动态类型化的。 为 tuple 的 fields 增加静态类型会大幅增加 Storm 的 API 的复杂度。比如 Hadoop 就将它的 key 和 value 都静态化了,这就要求用户自己添加大量的注解。使用 Hadoop 的 API 非常繁琐,而相应的“类型安全”并不会为应用带来多大的好处。相对的,动态类型就非常易于使用。 更进一步而言,也不可能有什么合理的方法将 Storm 的 tuple 的类型静态化。假如一个 Bolt 订阅了多个 stream,从这些 stream 传入的 tuple 很可能都带有不同的类型。在 Bolt 的 execute 方法接收到一个 tuple 的时候,这个 tuple 可能来自任何一个 stream,也可能包含各种组合类型。也许你可以使用某种反射机制来为 bolt 订阅的每个 stream 声明一个方法类处理 tuple,但是 Storm 可以提供一种更简单、更直接的动态类型机制来解决这个问题。 最后要说明的是,Storm 使用动态类型定义的另一个原因就是为了支持 Clojure、JRuby 这样的动态类型语言。 定制序列化 前面已经提到,Storm 使用 Kryo 来处理序列化。如果要实现自定义的序列化生成器,你需要注册一个新的机遇 Kryo 的序列化生成器。强烈建议读者先仔细阅读Kryo 主页来理解它是怎样处理自定义的序列化的。 可以通过拓扑的topology.kryo.register属性来添加自定义序列化生成器。该属性接收一个注册列表,每个注册项都可以使用以下两种注册格式中的一种格式: 只有一个待注册的类的名称。在这种情况下,Storm 会使用 Kryo 的FieldsSerializer来序列化该类。这也许并不一定是该类的最优化方式 —— 可以查看 Kryo 的文档来了解更多细节内容。 一个包含待注册的类的名称和对应的序列化实现类名称,该序列化实现类实现了com.esotericsoftware.kryo.Serializer接口。 我们来看一个例子: topology.kryo.register: - com.mycompany.CustomType1 - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer - com.mycompany.CustomType3 这里com.mycompany.CustomType1和com.mycompany.CustomType3会使用FieldsSerializer,而com.mycompany.CustomType2则会使用com.mycompany.serializer.CustomType2Serializer来实现序列化。 在拓扑的配置中,Storm 提供了用于注册序列化生成器的选项。Config类有一个registerSerialization方法可以将序列化生成器注册到配置中。 Config 中有一个更高级的配置项叫做Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS。如果你将该项设置为 true,Storm 会忽略掉所有已注册但是在拓扑的 classpath 中没有相应的代码的序列化器。如果不这么做,Storm 会在无法查找到序列化器的时候抛出错误。如果你在集群中运行有多个拓扑并且每个拓扑都有不同的序列化器,但是你又想要在storm.yaml中声明好所有的序列化器,在这种情况下这个配置项会有很大的帮助。 Java 序列化 如果 Storm 发现了一个没有注册序列化器的类型,它会使用 Java 自带的序列化器。如果这个对象无法被 Java 序列化器序列化,Storm 就会抛出异常。 注意,Java 自身的序列化机制非常耗费资源,而且不管在 CPU 的性能上还是在序列化对象的大小上都没有优势。强烈建议读者在生产环境中运行拓扑的时候注册一个自定义的序列化器。保留 Java 的序列化机制主要为了便于设计新拓扑的原型。 你可以通过将Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION配置为 false 的方式来将序列化器回退到 Java 的序列化机制。 特定组件的序列化注册 Storm 0.7.0 支持对特定组件的配置(详情请参阅Storm 的配置一文)。当然,如果某个组件定义了一个序列化器,这个序列化器也需要能够支持其他的 bolt —— 否则,后续的 bolt 将会无法接收来自该组件的消息! 在拓扑提交的时候,拓扑会选择一组序列化器用于在所有的组件间传递消息。这是通过将特定组件的序列化器注册信息与普通的序列化器信息融合在一起实现的。如果两个组件为同一个类定义了两个序列化器,Storm 会从中任意选择一个。 如果在两个组件的序列化器注册信息冲突的时候需要强制使用一个序列化器,可以在拓扑级的配置中定义你想要的序列化器。对于序列化器的注册信息,拓扑中配置的值是优先于具体组件的配置的。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— Storm 与 Kestrel

本文说明了如何使用 Storm 从 Kestrel 集群中消费数据。 前言 Storm 本教程中使用了storm-kestrel项目和storm-starter项目中的例子。建议读者将这几个项目 clone 到本地,并动手运行其中的例子。 Kestrel 本文假定读者可以如此项目所述在本地运行一个 Kestrel 集群。 Kestrel 服务器与队列 Kestrel 服务中包含有一组消息队列。Kestrel 队列是一种非常简单的消息队列,可以运行于 JVM 上,并使用 memcache 协议(以及一些扩展)与客户端交互。详情可以参考storm-kestrel项目中的KestrelThriftClient类的实现。 每个队列均严格遵循先入先出的规则。为了提高服务性能,数据都是缓存在系统内存中的;不过,只有开头的 128MB 是保存在内存中的。在服务停止的时候,队列的状态会保存到一个日志文件中。 请参阅此文了解更多详细信息。 Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特点。 例如,Twitter 就使用 Kestrel 作为消息系统的核心环节,此文中介绍了相关信息。 ** 向 Kestrel 中添加数据 首先,我们需要一个可以向 Kestrel 的队列添加数据的程序。下述方法使用了storm-kestrel项目中的KestrelClient的实现。该方法从一个包含 5 个句子的数组中随机选择一个句子添加到 Kestrel 的队列中。 private static void queueSentenceItems(KestrelClient kestrelClient, String queueName) throws ParseError, IOException { String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; Random _rand = new Random(); for(int i=1; i<=10; i++){ String sentence = sentences[_rand.nextInt(sentences.length)]; String val = "ID " + i + " " + sentence; boolean queueSucess = kestrelClient.queue(queueName, val); System.out.println("queueSucess=" +queueSucess+ " [" + val +"]"); } } 从 Kestrel 中移除数据 此方法从一个队列中取出一个数据,但并不把该数据从队列中删除: private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { byte[] data = item._data; String receivedVal = new String(data); System.out.println("receivedItem=" + receivedVal); } } 此方法会从队列中取出并移除数据: private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { int itemID = item._id; byte[] data = item._data; String receivedVal = new String(data); kestrelClient.ack(queueName, itemID); System.out.println("receivedItem=" + receivedVal); } } } 向 Kestrel 中连续添加数据 下面的程序可以向本地 Kestrel 服务的一个sentence_queue队列中连续添加句子,这也是我们的最后一个程序。 可以在命令行窗口中输入一个右中括号]并回车来停止程序。 import java.io.IOException; import java.io.InputStream; import java.util.Random; import backtype.storm.spout.KestrelClient; import backtype.storm.spout.KestrelClient.Item; import backtype.storm.spout.KestrelClient.ParseError; public class AddSentenceItemsToKestrel { /** * @param args */ public static void main(String[] args) { InputStream is = System.in; char closing_bracket = ']'; int val = closing_bracket; boolean aux = true; try { KestrelClient kestrelClient = null; String queueName = "sentence_queue"; while(aux){ kestrelClient = new KestrelClient("localhost",22133); queueSentenceItems(kestrelClient, queueName); kestrelClient.close(); Thread.sleep(1000); if(is.available()>0){ if(val==is.read()) aux=false; } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ParseError e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end"); } } 使用 KestrelSpout 下面的拓扑使用KestrelSpout从一个 Kestrel 队列中读取句子,并将句子分割成若干个单词(Bolt:SplitSentence),然后输出每个单词出现的次数(Bolt:WordCount)。数据处理的细节可以参考消息的可靠性保证一文。 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word")); 运行 首先,以生产模式或者开发者模式启动你的本地 Kestrel 服务。 然后,等待大约 5 秒钟以防出现网络连接异常。 现在可以运行向队列中添加数据的程序,并启动 Storm 拓扑。程序启动的顺序并不重要。 如果你以 TOPOLOGY_DEBUG 模式运行拓扑你会观察到拓扑中 tuple 发送的细节信息。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 配置开发环境

本文详细讲解了配置 Storm 开发环境的相关信息。简单地说,配置过程包含以下几个步骤: 下载Storm 发行版,将其解压缩并复制到你的PATH环境变量的bin目录中(也可以根据需要自定义安装目录 —— 译者注); 如果需要在远程集群中运行拓扑,则需要在~/.storm/storm.yaml文件中配置好集群的相关信息。 上述几步的详细内容如下。 什么是开发环境? Storm 包含两种操作模式:本地模式与远程模式(即集群模式 —— 译者注)。在本地模式下,你可以在本地机器上的一个进程中完成所有的开发、测试拓扑的工作。而在远程模式下,为了运行拓扑,你需要先向服务器集群提交该拓扑。 Storm 的开发环境已经为你准备好了一切,因此,你可以在本地模式下完成开发、测试拓扑的工作,将拓扑打包并提交到远程服务器,并在远程服务器集群上运行或者终止拓扑。 我们再来回顾一下本地机器与远程集群之间的关系。Storm 集群是由一个称为 “Nimbus” 的主节点管理的。本地机器通过与 Nimbus 通信来提交代码(代码已经打包为 jar 格式),这样代码文件中包含的拓扑就可以在集群中运行。Nimbus 会小心地维护着代码在集群中的分布式结构,并为待运行的拓扑分配 worker。本地机器可以使用一个称为storm的命令行客户端来与 Nimbus 进行通信。不过,storm客户端仅用于远程模式,不能用于本地模式下开发、测试拓扑。 在本地机器上安装 Storm 如果要从本地机器上直接向远程集群提交拓扑,你需要在本地机器上安装 Storm 程序。本地的 Storm 程序可以提供与远程集群交互的storm客户端。在安装本地 Storm 之前,你需要从这里下载一个 Storm 安装程序并将其解压到你的电脑的某个位置。然后将 Storm 的bin/目录添加到你的PATH环境变量中,确保bin/storm脚本可以直接运行。 在本地机器上安装的 Storm 仅能用于与远程集群的交互。对于本地模式下的开发、测试拓扑,推荐使用 Maven 来将 Storm 添加到你的项目的开发依赖中。关于 Maven 的使用请参考此文。 在远程集群上开始/终止拓扑的运行 在上一步中我们已经安装好了本地的storm客户端。接下来就需要告诉客户端需要连接哪一个 Storm 集群。这可以通过在~/.storm/storm.yaml文件中填写 Storm 集群的主节点的 host 地址来实现: nimbus.host: "123.45.678.890" 另外,如果你在 AWS 上应用storm-deploy项目来配置 Storm 集群,它会自动配置好你的~/.storm/storm.yaml文件。你也可以使用attach命令手动配置附属的 Storm 集群(或者在多个集群之间切换): lein run :deploy --attach --name mystormcluster 更多内容请参考 storm-deploy 项目的wiki。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— Trident API 概述

Trident 的核心数据模型是“流”(Stream),不过与普通的拓扑不同的是,这里的流是作为一连串batch来处理的。流是分布在集群中的不同节点上运行的,并且对流的操作也是在流的各个partition 上并行运行的。 Trident 中有 5 类操作: 针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输; 针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输; 通过网络数据传输进行的聚合操作; 针对数据流的分组操作; 融合与联结操作。 本地分区操作 本地分区操作是在每个分区块上独立运行的操作,其中不涉及网络数据传输。 函数 函数负责接收一个输入域的集合并选择输出或者不输出 tuple。输出 tuple 的域会被添加到原始数据流的输入域中。如果一个函数不输出 tuple,那么原始的输入 tuple 就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入 tuple 。假设你有下面这样的函数: public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } } 再假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每个 tuple 中包含有 “a”、“b”、“c” 三个域: [1, 2, 3] [4, 1, 6] [3, 0, 8] 如果你运行这段代码: mystream.each(new Fields("b"), new MyFunction(), new Fields("d"))) 那么最终输出的结果 tuple 就会包含有 “a”、“b”、“c”、“d” 4 个域,就像下面这样: [1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0] 过滤器 过滤器负责判断输入的 tuple 是否需要保留。以下面的过滤器为例: public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } } 通过使用这段代码: mystream.each(new Fields("b", "a"), new MyFilter()) 就可以将下面这样带有 “a”、“b”、“c” 三个域的 tuple [1, 2, 3] [2, 1, 1] [2, 3, 4] 最终转化成这样的结果 tuple: [2, 1, 1] partitionAggregate partitionAggregate会在一批 tuple 的每个分区上执行一个指定的功能操作。与上面的函数不同,由partitionAggregate发送出的 tuple 会将输入 tuple 的域替换。以下面这段代码为例: mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) 假如输入流中包含有 “a”、“b” 两个域并且有以下几个 tuple 块: Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10] 经过上面的代码之后,输出就会变成带有一个名为 “sum” 的域的数据流,其中的 tuple 就是这样的: Partition 0: [3] Partition 1: [11] Partition 2: [20] Storm 有三个用于定义聚合器的接口:CombinerAggregator,ReducerAggregator以及Aggregator。 这是CombinerAggregator接口: public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); } CombinerAggregator会将带有一个域的一个单独的 tuple 返回作为输出。CombinerAggregator会在每个输入 tuple 上运行初始化函数,然后使用组合函数来组合所有输入的值。如果在某个分区中没有 tuple,CombinerAggregator就会输出zero方法的结果。例如,下面是Count的实现代码: public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } } 如果你使用 aggregate 方法来代替 partitionAggregate 方法,你就会发现CombinerAggregator的好处了。在这种情况下,Trident 会在发送 tuple 之前通过分区聚合操作来优化计算过程。 ReducerAggregator的接口实现是这样的: public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); } ReducerAggregator会使用init方法来产生一个初始化的值,然后使用该值对每个输入 tuple 进行遍历,并最终生成并输出一个单独的 tuple,这个 tuple 中就包含有我们需要的计算结果值。例如,下面是将 Count 定义为ReducerAggregator的代码: public class Count implements ReducerAggregator<Long> { public Long init() { return 0L; } public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } } ReducerAggregator同样可以用于 persistentAggregate,你会在后面看到这一点。 最常用的聚合器接口还是下面的Aggregator接口: public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); } Aggregator聚合器可以生成任意数量的 tuple,这些 tuple 也可以带有任意数量的域。聚合器可以在执行过程中的任意一点输出tuple,他们的执行过程是这样的: 在处理一批数据之前先调用 init 方法。init 方法的返回值是一个代表着聚合状态的对象,这个对象接下来会被传入 aggregate 方法和 complete 方法中。 对于一个区块中的每个 tuple 都会调用 aggregate 方法。这个方法能够更新状态并且有选择地输出 tuple。 在区块中的所有 tuple 都被 aggregate 方法处理之后就会调用 complete 方法。 下面是使用 Count 作为聚合器的代码: public class CountAgg extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } } 有时你可能会需要同时执行多个聚合操作。这个过程叫做链式处理,可以使用下面这样的代码来实现: mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd() 这段代码会在每个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 [“count”, “sum”] 域的单独的 tuple。 stateQuery 与 partitionPersist stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你可以参考Trident State 文档来了解如何使用它们。 projection projection方法只会保留操作中指定的域。如果你有一个带有 [“a”, “b”, “c”, “d”] 域的数据流,通过执行这段代码: mystream.project(new Fields("b", "d")) 就会使得输出数据流中只包含有 [“b”, “d”] 域。 重分区操作 重分区操作会执行一个用来改变在不同的任务间分配 tuple 的方式的函数。在重分区的过程中分区的数量也可能会发生变化(例如,重分区之后的并行度就有可能会增大)。重分区会产生一定的网络数据传输。下面是重分区操作的几个函数: shuffle:通过随机轮询算法来重新分配目标区块的所有 tuple。 broadcast:每个 tuple 都会被复制到所有的目标区块中。这个函数在 DRPC 中很有用 —— 比如,你可以使用这个函数来获取每个区块数据的查询结果。 partitionBy:该函数会接收一组域作为参数,并根据这些域来进行分区操作。可以通过对这些域进行哈希化,并对目标分区的数量取模的方法来选取目标区块。partitionBy 函数能够保证来自同一组域的结果总会被发送到相同的目标区间。 global:这种方式下所有的 tuple 都会被发送到同一个目标分区中,而且数据流中的所有的块都会由这个分区处理。 batchGlobal:同一个 batch 块中的所有 tuple 会被发送到同一个区块中。当然,在数据流中的不同区块仍然会分配到不同的区块中。 partition:这个函数使用自定义的分区方法,该方法会实现backtype.storm.grouping.CustomStreamGrouping接口。 聚类操作 Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操作。其中,aggregate 方法会分别对数据流中的每个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的所有 batch 执行聚类处理,并将结果存入某个 state 中。 在数据流上执行 aggregate 方法会执行一个全局的聚类操作。在你使用ReducerAggregator或者Aggregator时,数据流首先会被重新分区成一个单独的分区,然后聚类函数就会在该分区上执行操作。而在你使用CombinerAggregator时,Trident 首先会计算每个分区的部分聚类结果,然后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成之后结束这个聚类过程。CombinerAggregator比其他的聚合器的运行效率更高,在聚类时应该尽可能使用CombinerAggregator。 下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子: mystream.aggregate(new Count(), new Fields("count")) 与 partitionAggregate 一样,aggregate 的聚合器也可以进行链式处理。然而,如果你在一个处理链中同时使用了CombinerAggregator和非CombinerAggregator,Trident 就不能对部分聚类操作进行优化了。 想要了解更多使用 persistentAggregate 的方法,可以参考Trident State 文档一文。 对分组数据流的操作 通过对指定的域执行 partitionBy 操作,groupBy 操作可以将数据流进行重分区,使得相同的域的 tuple 分组可以聚集在一起。例如,下面是一个 groupBy 操作的示例: 如果你在分组数据流上执行聚合操作,聚合器会在每个分组(而不是整个区块)上运行。persistentAggregate 同样可以在一个分组数据里上运行,这种情况下聚合结果会存储在MapState中,其中的 key 就是分组的域名。 和其他操作一样,对分组数据流的聚合操作也可以以链式的方式执行。 融合(Merge)与联结(join) Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样: topology.merge(stream1, stream2, stream3); Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。 联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。 下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1″, “val2″] 域,另外一个流含有 [“x”, “val1″] 域: topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c")); 上面的例子会使用 “key” 和 “x” 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含: join 域的列表。在这个例子里,输出的 “key” 域与 stream1 的 “key” 域以及 stream2 的 “x” 域对应。 来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,“ a” 和 “b” 域与 stream1 的 “val1” 和 “val2” 域对应;而 “c” 域则与 stream2 的 “val1” 域相对应。 在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。 到这里你大概仍然会对如何进行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。 你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 问题与解决

本文介绍了用户在使用 Storm 过程中遇到的问题与相应的解决方法。 Worker 进程在启动时挂掉而没有留下堆栈跟踪信息的问题 可能出现的现象: 拓扑在一个节点上运行正常,但是多个 worker 进程在多个节点上就会崩溃 解决方案: 你的网络配置可能有问题,导致每个节点无法根据 hostname 连接到其他的节点。ZeroMQ 有时会在不能识别 host 的时候挂掉 进程。如果是这种情况,有两种可行的解决方案: 在 /etc/hosts 文件中配置好 hostname 与 IP 的对应关系 设置一个局域网 DNS 服务器,使得节点可以根据 hostname 定位到其他节点 节点之间无法通信 可能出现的现象: 每个 spout tuple 的处理都不成功 拓扑中的处理过程不起作用 解决方案: Storm 不支持 ipv6,你可以在 supervisor 的 child-opts 配置中添加-Djava.net.preferIPv4Stack=true参数,然后重启 supervisor。 你的网络配置可能存在问题,请参考上个问题中的解决方案。 拓扑在一段时间后停止了 tuple 的处理过程 可能出现的现象: 拓扑正常运行一段时间后突然停止了数据处理过程,并且 spout 的 tuple 一起开始处理失败 解决方案: 这是 ZeroMQ 2.1.10 中的一个已经确认的问题,请将 ZMQ 降级到 2.1.7 版本。 Storm UI 中没有显示出所有的 supervisor 信息 可能出现的现象: Storm UI 中缺少部分 supervisor 的信息 在刷新 Storm UI 页面后 supervisor 列表会变化 解决方案: 确保 supervisor 的本地工作目录是相互独立的(也就是说不要出现在 NFS 中共享同一个目录的情况) 尝试删除 supervisor 的本地工作目录,然后重启 supervisor 后台进程。supervisor 启动时会为自己创建一个唯一的 id 并存储在本地目录中。如果这个 id 被复制到其他节点中,就会让 Storm 无法确定哪个 supervisor 正在运行(这种情况并不少见,如果需要扩展集群,就很容易出现直接将某个节点的 Storm 文件直接复制到新节点的情况 —— 译者注)。 “Multiple defaults.yaml found” 错误 可能出现的现象: 在使用storm jar命令部署拓扑时出现此错误 解决方案: 你很可能在拓扑的 jar 包中包含了 Storm 自身的 jar 包。注意,在打包拓扑时,请不要将 Storm 自身的 jar 包加入,因为 Storm 已经在它的 classpath 中提供了这些 jar 包。 运行 storm jar 命令时出现 “NoSuchMethorError” 可能出现的现象: 运行storm jar命令时出现奇怪的 “NoSuchMethodError” 解决方案: 这可能是由于你部署拓扑的 Storm 版本与你构建拓扑时使用的 Storm 版本不同。请确保你编译拓扑时使用的 Storm 版本与你运行拓扑的 Storm 客户端版本相同。 Kryo ConcurrentModificationException 可能出现的现象: 系统运行时出现如下的异常堆栈跟踪信息 java.lang.RuntimeException: java.util.ConcurrentModificationException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56) at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67) at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:679) Caused by: java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390) at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409) at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408) at java.util.HashMap.writeObject(HashMap.java:1016) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472) at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27) 解决方案: 这个信息表示你在将一个可变的对象作为 tuple 发送出去。你发送到 outputcollector 中的所有对象必须是非可变的。这个错误表明对象在被序列化并发送到网络中时你的 bolt 正在修改这个对象。 Storm 中的 NullPointerException 可能出现的现象: Storm 运行中出现了如下的 NullPointerException java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56) at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67) at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Caused by: java.lang.NullPointerException at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24) at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99) at backtype.storm.util$fast_list_map.invoke(util.clj:771) at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99) at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205) at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81) ... 6 more 解决方案: 这个问题是由于多个线程同时调用OutputCollector中的方法造成的。Storm 中所有的 emit、ack、fail 方法必须在同一个线程中运行。出现这个问题的一种场景是在一个IBasicBolt中创建了一个独立的线程。由于IBasicBolt会在execute方法调用之后自动调用ack,所以这就会出现多个线程同时使用OutputCollector的情况,进而抛出这个异常。也就是说,在使用IBasicBolt时,所有的消息发送操作必须在同一个线程的execute方法中执行。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark安全性

Spark安全性 Spark目前已经支持以共享秘钥的方式进行身份认证。开启身份认证配置参数为 spark.authenticate 。这个配置参数决定了Spark通讯协议是否使用共享秘钥做身份验证。验证过程就是一个基本的握手过程,确保通讯双方都有相同的秘钥并且可以互相通信。如果共享秘钥不同,双方是不允许通信的。共享秘钥可用以下方式创建: 对于以YARN方式部署的Spark,将 spark.authenticate 设为true可以自动生成并分发共享秘钥。每个Spark应用会使用唯一的共享秘钥。 而对于其他部署类型,需要在每个节点上设置 spark.authenticate.secret 参数。这个秘钥将会在由所有 Master/Workers以及各个Spark应用共享。 Web UI Spark UI 也可以通过配置 spark.ui.filters 来使用javax servlet filters确保安全性,因为某些用户可能希望web UI上的某些数据应该保密,并对其他用户不可见。用户可以自定义 javax servlet filter 来对登陆用户进行认证,Spark会根据用户的ACL(访问控制列表)来确保该登陆用户有权限访问某个Spark应用的web UI。Spark ACL的行为可由 spark.acls.enable 和 spark.ui.view.acls共同控制。注意,启动Spark应用的用户总是会有权限访问该应用对应的UI。而在YARN模式下,Spark web UI会使用YARN的web应用代理机制,通过Hadoop过滤器进行认证。 Spark还支持修改运行中的Spark应用对应的ACL以便更改其权限控制,不过这可能会导致杀死应用或者杀死任务的动作。这一特性由 spark.acls.enable 和 spark.modify.acls 共同控制。注意,如果你需要web UI的认证,比如为了能够在web UI上使用杀死应用的按钮,那么就需要将用户同时添加到modify ACL和view ACL中。在YARN上,控制用户修改权限的modify ACL是通过YARN接口传进来的。 Spark可以允许多个管理员共同管理ACL,而且这些管理员总是能够访问和修改所有的Spark应用。而管理员本身是由 spark.admin.acls 配置的。在一个共享的多用户集群中,你可能需要配置多个管理员,另外,该配置也可以用于支持开发人员调试Spark应用。 事件日志 如果你启用了事件日志(event logging),那么这些日志对应的目录(spark.eventLog.dir)需要事先手动创建并设置好权限。如果你要限制这些日志文件的权限,首先还是要将日志目录的权限设为 drwxrwxrwxt,目录owner应该是启动history server的超级用户,对应的组限制为该超级用户所在组。这样其他所有用户就只能往该目录下写日志,但同时又不能删除或改名。事件日志文件只有在用户或者用户组具有读写权限时才能写入。 加密 Spark对Akka和HTTP协议(广播和文件服务器中使用)支持SSL。同时,对数据块传输服务(block transfer service)支持SASL加密。Spark web UI暂时还不支持任何加密。 Spark的临时数据存储,如:混洗中间文件、缓存数据或其他应用相关的临时文件,目前也没有加密。如果需要加密这些数据,只能通过配置集群管理器将数据存储到加密磁盘上。 SSL配置 Spark SSL相关配置是层级式的。用户可以配置SSL的默认配置,以便支持所有的相关通讯协议,当然,如果设置了针对某个具体协议配置值,其值将会覆盖默认配置对应的值。这种方式主要是为了方便用户,用户可以轻松地为所有协议都配置好默认值,同时又不会影响针对某一个具体协议的特殊配置需求。通用的SSL默认配置在 spark.ssl 这一配置命名空间下,对于Akka的SSL配置,在spark.ssl.akka下,而对用于广播和文件服务器中的HTTP协议,其配置在 spark.ssl.fs 下。详细的清单见配置指南(configuration page)。 SSL必须在每个节点上都配置好,并且包括各个使用特定通讯协议的相关模块。 YARN模式 key-store 文件可以由客户端准备好,然后作为Spark应用的一部分分发到各个执行器(executor)上使用。用户也可以在Spark应用启动前,通过配置 spark.yarn.dist.files 或者 spark.yarn.dist.archives 来部署key-store文件。这些文件传输过程的加密是由YARN本身负责的,和Spark就没什么关系了。 对于一些长期运行并且可以写HDFS的Spark应用,如:Spark Streaming 上的应用,可以用 spark-submit 的 –principal 和 –keytab 参数分别设置principal 和 keytab 信息。keytab文件将会通过 Hadoop Distributed Cache(如果YARN配置了 SSL并且HDFS启用了加密,那么分布式缓存的传输也会被加密) 复制到Application Master所在机器上。Kerberos的登陆信息将会被principal和keytab周期性地刷新,同时HDFS所需的代理token也会被周期性的刷新,这样Spark应用就能持续地写入HDFS了。 独立模式 独立模式下,用户需要为master和worker分别提供key-store和相关配置选项。这些配置可以通过在SPARK_MASTER_OPTS 和 SPARK_WORKER_OPTS,或者 SPARK_DEAMON_JAVA_OPTS环境变量中添加相应的java系统属性来设置。独立模式下,用户可以通过worker的SSL设置来改变执行器(executor)的配置,因为这些执行器进程都是worker的子进程。不过需要注意的是,执行器如果需要启用本地SSL配置值(如:从worker进程继承而来的环境变量),而不是用户在客户端设置的值,就需要将 spark.ssl.userNodeLocalConf 设为 true。 准备key-stores key-stores 文件可以由 keytool 程序生成。keytool 相关参考文档见这里:here。独立模式下,配置key-store和trust-store至少有这么几个基本步骤: 为每个节点生成一个秘钥对 导出各节点秘钥对中的公匙(public key)到一个文件 将所有这些公匙导入到一个 trust-store 文件中 将该trust-store文件发布到所有节点 配置SASL加密 启用认证后(spark.authenticate),数据块传输服务(block transfer service)可以支持SASL加密。如需启用SASL加密的话,还需要在 Spark 应用中设置 spark.authenticate.enableSaslEncryption 为 true。 如果是开启了外部混洗服务(external shuffle service),那么只需要将 spark.network.sasl.serverAlwaysEncrypt 设为true即可禁止非加密的网络连接。因为这个配置一旦启用,所有未使用 SASL加密的Spark应用都无法连接到外部混洗服务上。 配置网络安全端口 Spark计算过程中大量使用网络通信,而有些环境中对网络防火墙的设置要求很严格。下表列出来Spark用于通讯的一些主要端口,以及如何配置这些端口。 仅独立部署适用 访问源 访问目标 默认值 用途 配置 注意 Browser Standalone Master 8080 Web UI spark.master.ui.port / SPARK_MASTER_WEBUI_PORT 基于Jetty。仅独立模式有效。 Browser Standalone Worker 8081 Web UI spark.worker.ui.port / SPARK_WORKER_WEBUI_PORT 基于Jetty。仅独立模式有效。 Driver / Standalone Worker Standalone Master 7077 提交作业 / 合并集群 SPARK_MASTER_PORT 基于Akka。设为0表示随机。仅独立模式有效。 Standalone Master Standalone Worker (random) 调度执行器 SPARK_WORKER_PORT 基于Akka。设为0表示随机。仅独立模式有效。 所有集群管理器适用 访问源 访问目标 默认值 用途 配置 注意 Browser Application 4040 Web UI spark.ui.port 基于Jetty Browser History Server 18080 Web UI spark.history.ui.port 基于Jetty Executor / Standalone Master Driver (random) 连接Spark应用 / 执行器状态变化通知 spark.driver.port 基于Akka。设为0表示随机。 Driver Executor (random) 任务调度 spark.executor.port 基于Akka。设为0表示随机。 Executor Driver (random) 传输文件和jar包的文件服务器 spark.fileserver.port 基于Jetty Executor Driver (random) HTTP广播 spark.broadcast.port 基于Jetty。TorrentBroadcast通过block manager发送数据,所以不会用这个端口。 Executor Driver (random) 类文件服务器 spark.replClassServer.port 基于Jetty。仅Spark shell使用。 Executor / Driver Executor / Driver (random) 数据块管理器端口 spark.blockManager.port 基于ServerSocketChannel使用原始socket通信。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

GitHub 官方发布的编程语言趋势

【大咖・来了 第7期】10月24日晚8点观看《智能导购对话机器人实践》 最近,我们统计了各编程语言在Github.com上近些年的活跃度。 下图清晰地展示出,自2008年Github创建以来,各编程语言的排名发生了不少变化。 图中引用的语言包括了在公共和私人的repository中所使用的编程语言(fork过来的除外),而语言的排名则由Linguist检测得出。 须注意,本图表所展现的各语言在GitHub上的活跃度是相对的。例如,Ruby on Rails自2008年出现在GitHub上,因此它在早期阶段具有相对较高的活跃度。 在2008和2015年之间,GitHub上活跃度变化***的是Java,它从第七名一直升到第二名。究其原因,可能是安卓设备兴起和企业对于版本控制平台需求增加。 【责任编辑: chenqingxiang TEL:(010)68476606】

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Apache Tomcat

Apache Tomcat

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。