首页 文章 精选 留言 我的

精选列表

搜索[快速入门],共10000篇文章
优秀的个人博客,低调大师

【Java入门提高篇】Day25 史上最详细的HashMap红黑树解析

当当当当当当当,好久不见,最近又是换工作,又是换房子,忙的不可开交,断更了一小段时间,最重要的一篇迟迟出不来,每次都犹抱琵琶半遮面,想要把它用通俗易懂的方式进行说明,确实有一定的难度,可愁煞我也,但自己挖的坑,哭着也要把它补上。请允许我当一回标题党。 好了,言归正传,本篇主要内容便是介绍HashMap的男二号——TreeNode(男一号还是给Node吧,毕竟是TreeNode的爷爷,而且普通节点一般来说也比TreeNode要多),本篇主要从以下几个方面介绍: 1.红黑树介绍 2.TreeNode结构 3.树化的过程 4.红黑树的左旋和右旋 5.TreeNode的左旋和右旋 6.红黑树的插入 7.TreeNode的插入 8.红黑树的删除 9.TreeNode的删除 讲解红黑树的部分算是理论部分,讲解TreeNode的部分则是代码实践部分,配合服用效果更加。 保守估计,仔细食用本篇大约需要半小时,请各位细细品尝。 红黑树介绍 什么是红黑树?嗯,首先,它是一颗树,所谓的树,便是长的像这样的东西 不像树?emmmm,你把它想象成一颗倒过来的树就好了,A~H都是树的节点,每个节点有零个或者多个子节点,或者说多个孩子,但除了根节点以外,每个节点都只有一个父节点,也称只有一个父亲(老王嘿嘿一笑)。最上面的A是根节点,最下面的D、H、F、G是叶子节点。每一个非根节点有且只有一个父节点;树是具有一层一层的层次结构,这里A位于第一层,B、C位于第二层,依次类推。将左边的B节点部分(包括BDEH)拿出来,则又是一颗树,称为树的子树。 好了,知道树是什么东西了,那么红黑树是什么样的呢? 红黑树,本质上来说是一颗二叉搜索树。嗯,还是先说说这个二叉搜索树吧。二叉代表它的节点最多有两个子节点,而且左右有顺序,不能颠倒,分别叫左孩子和右孩子,这两个节点互为兄弟节点,嗯,其实叫法根现实里的叫法差不多,以下图为例,4、9互为兄弟,7是他们的父亲,9是2的叔叔,8是2的堂兄弟,很简单吧。说完了称谓,再来说说用途,既然叫做搜索树表示它的用途是为了更快的搜索和查找而设计的,所以这棵树本身满足一定的排序规则,即树中的任何节点的值大于它的左孩子,且小于它的右孩子。任意节点的左、右子树也分别为二叉查找树。嗯,结合下图意会一下: 而红黑树,就跟它的名字一样,又红又黑,红黑并进,理实交融,节点是非红即黑的,看起来就像这样: 红黑树的主要特性: (1)每个节点要么是黑色,要么是红色。(节点非黑即红) (2)根节点是黑色。 (3)每个叶子节点(NIL)是黑色。 (4)如果一个节点是红色的,则它的子节点必须是黑色的。(也就是说父子节点不能同时为红色) (5)从一个节点到该节点的子孙节点的所有路径上包含相同数目的黑节点。(这一点是平衡的关键) 说简单也简单,其实就是一颗比较平衡的又红又黑的二叉树嘛。 TreeNode结构 既然我们已经知道红黑树长什么样了,那么我们再来看看HashMap中的TreeNode代码里是如何表示的: /** * 用于Tree bins 的Entry。 扩展LinkedHashMap.Entry(进而扩展Node),因此可以用作常规节点或链接节点的扩展。 */ static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> { TreeNode<K,V> parent; // 红黑树父节点 TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // 删除后需要取消链接 boolean red; TreeNode(int hash, K key, V val, Node<K,V> next) { super(hash, key, val, next); } //省略后续代码 TreeNode继承自LinkedHashMap中的内部类——LinkedHashMap.Entry,而这个内部类又继承自Node,所以算是Node的孙子辈了。我们再来看看它的几个属性,parent用来指向它的父节点,left指向左孩子,right指向右孩子,prev则指向前一个节点(原链表中的前一个节点),注意,这些字段跟Entry,Node中的字段一样,是使用默认访问权限的,所以子类可以直接使用父类的属性。 树化的过程 在前几篇中已经有所介绍,当HashMap桶中的元素个数超过一定数量时,就会树化,也就是将链表转化为红黑树的结构。 public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { ...省略部分代码... else { for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); //当桶中元素个数超过阈值(8)时就进行树化 if (binCount >= TREEIFY_THRESHOLD - 1) treeifyBin(tab, hash); break; } ...省略部分代码... } final void treeifyBin(Node<K,V>[] tab, int hash) { int n, index; Node<K,V> e; if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY) resize(); else if ((e = tab[index = (n - 1) & hash]) != null) { TreeNode<K,V> hd = null, tl = null; do { //将节点替换为TreeNode TreeNode<K,V> p = replacementTreeNode(e, null); if (tl == null) //hd指向头结点 hd = p; else { //这里其实是将单链表转化成了双向链表,tl是p的前驱,每次循环更新指向双链表的最后一个元素,用来和p相连,p是当前节点 p.prev = tl; tl.next = p; } tl = p; } while ((e = e.next) != null); if ((tab[index] = hd) != null) //将链表进行树化 hd.treeify(tab); } } 从代码中可以看到,在treeifyBin函数中,先将所有节点替换为TreeNode,然后再将单链表转为双链表,方便之后的遍历和移动操作。而最终的操作,实际上是调用TreeNode的方法treeify进行的。 final void treeify(Node<K,V>[] tab) { //树的根节点 TreeNode<K,V> root = null; //x是当前节点,next是后继 for (TreeNode<K,V> x = this, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; //如果根节点为null,把当前节点设置为根节点 if (root == null) { x.parent = null; x.red = false; root = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; //这里循环遍历,进行二叉搜索树的插入 for (TreeNode<K,V> p = root;;) { //p指向遍历中的当前节点,x为待插入节点,k是x的key,h是x的hash值,ph是p的hash值,dir用来指示x节点与p的比较,-1表示比p小,1表示比p大,不存在相等情况,因为HashMap中是不存在两个key完全一致的情况。 int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; //如果hash值相等,那么判断k是否实现了comparable接口,如果实现了comparable接口就使用compareTo进行进行比较,如果仍旧相等或者没有实现comparable接口,则在tieBreakOrder中比较 else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; //进行插入平衡处理 root = balanceInsertion(root, x); break; } } } } //确保给定节点是桶中的第一个元素 moveRootToFront(tab, root); } //这里不是为了整体排序,而是为了在插入中保持一致的顺序 static int tieBreakOrder(Object a, Object b) { int d; //用两者的类名进行比较,如果相同则使用对象默认的hashcode进行比较 if (a == null || b == null || (d = a.getClass().getName(). compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; } 这里的逻辑其实不复杂,仅仅是循环遍历当前树,然后找到可以该节点可以插入的位置,依次和遍历节点比较,比它大则跟其右孩子比较,小则与其左孩子比较,依次遍历,直到找到左孩子或者右孩子为null的位置进行插入。 真正复杂一点的地方在于balanceInsertion函数,这个函数中,将红黑树进行插入平衡处理,保证插入节点后仍保持红黑树的性质。这个函数稍后在TreeNode的插入中进行介绍,这里先看看moveRootToFront,这个函数是将root节点移动到桶中的第一个元素,也就是链表的首节点,这样做是因为在判断桶中元素类型的时候会对链表进行遍历,将根节点移动到链表前端可以确保类型判断时不会出现错误。 /** * 把给定节点设为桶中的第一个元素 */ static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) { int n; if (root != null && tab != null && (n = tab.length) > 0) { int index = (n - 1) & root.hash; //first指向链表第一个节点 TreeNode<K,V> first = (TreeNode<K,V>)tab[index]; if (root != first) { //如果root不是第一个节点,则将root放到第一个首节点位置 Node<K,V> rn; tab[index] = root; TreeNode<K,V> rp = root.prev; if ((rn = root.next) != null) ((TreeNode<K,V>)rn).prev = rp; if (rp != null) rp.next = rn; if (first != null) first.prev = root; root.next = first; root.prev = null; } //这里是防御性编程,校验更改后的结构是否满足红黑树和双链表的特性 //因为HashMap并没有做并发安全处理,可能在并发场景中意外破坏了结构 assert checkInvariants(root); } } 红黑树的左旋和右旋 左旋和右旋,顾名思义嘛,就是将节点以某个节点为中心向左或者向右进行旋转操作以保持二叉树的平衡,让我们看图说话: 图画的有点大。将就着看一下吧,左旋相当于以要旋转的节点为中心,将子树整体向左旋转,该节点变成子树的根节点,原来的父节点A变成了左孩子,如果右孩子C有左孩子D,则将其变为A的右孩子。说起来好像有点绕,可以联系图进行形象化的理解,当节点A向左旋转之后,C的左孩子D可以理解为因为重力作用掉到A的右孩子位置,嗯,就是这样。右旋也是类似理解即可。 TreeNode的左旋和右旋 了解了左旋和右旋,让我们看看代码里是怎样实现的: /** * 左旋 */ static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) { //这里的p即上图的A节点,r指向右孩子即C,rl指向右孩子的左孩子即D,pp为p的父节点 TreeNode<K,V> r, pp, rl; if (p != null && (r = p.right) != null) { if ((rl = p.right = r.left) != null) rl.parent = p; //将p的父节点的孩子节点指向r if ((pp = r.parent = p.parent) == null) (root = r).red = false; else if (pp.left == p) pp.left = r; else pp.right = r; //将p置为r的左节点 r.left = p; p.parent = r; } return root; } /** * 右旋 */ static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) { //这里的p即上图的A节点,l指向左孩子即C,lr指向左孩子的右孩子即E,pp为p的父节点 TreeNode<K,V> l, pp, lr; if (p != null && (l = p.left) != null) { if ((lr = p.left = l.right) != null) lr.parent = p; if ((pp = l.parent = p.parent) == null) (root = l).red = false; else if (pp.right == p) pp.right = l; else pp.left = l; l.right = p; p.parent = l; } return root; } 其实,也很简单嘛。23333 红黑树的插入 现在来看看一个比较麻烦一点的操作,红黑树的插入,首先找到这个节点要插入的位置,即一层一层比较,大的放右边,小的放左边,直到找到为null的节点放入即可,但是如何在插入的过程保持红黑树的特性呢,想想好像比较头疼,但是再仔细想想其实就会发现,其实只有这么几种情况: 1.插入的为根节点,则直接把颜色改成黑色即可。 2.插入的节点的父节点是黑色节点,则不需要调整,因为插入的节点会初始化为红色节点,红色节点是不会影响树的平衡的。 3.插入的节点的祖父节点为null,即插入的节点的父节点是根节点,直接插入即可(因为根节点肯定是黑色)。 4.插入的节点父节点和祖父节点都存在,并且其父节点是祖父节点的左节点。这种情况稍微麻烦一点,又分两种子情况: i.插入节点的叔叔节点是红色,则将父亲节点和叔叔节点都改成黑色,然后祖父节点改成红色即可。 ii.插入节点的叔叔节点是黑色或不存在: a.若插入节点是其父节点的右孩子,则将其父节点左旋, b.若为左孩子,则将其父节点变成黑色节点,将其祖父节点变成红色节点,然后将其祖父节点右旋。 5.插入的节点父节点和祖父节点都存在,并且其父节点是祖父节点的右节点。这种情况跟上面是类似的,分两种子情况: i.插入节点的叔叔节点是红色,则将父亲节点和叔叔节点都改成黑色,然后祖父节点改成红色即可。 ii.插入节点的叔叔节点是黑色或不存在: a.若插入节点是其父节点的左孩子,则将其父节点右旋 b.若为右孩子,则将其父节点变成黑色节点,将其祖父节点变成红色节点,然后将其祖父节点左旋。 然后重复进行上述操作,直到变成1或2情况时则结束变换。说半天,可能还是云里雾里,一图胜千言,让我们从无到有构建一颗红黑树,假设插入的顺序为:10,5,9,3,6,7,19,32,24,17(数字是我拍脑袋瞎想的。) 先来插个10,为情景1,直接改成黑色即可,再插入5,为情景2,比10小,放到10的左孩子位置,插入9,比10小,但是比5大,放到5的右孩子位置,此时,为情景4iia,左旋后变成了情景4iib,变色右旋即可完成转化。插入3后为情景4i,将父节点和叔叔节点同时变色即可,插入6不需要调整,插入7后为情景5i,变色即可。插入19不需要调整,插入32,变成了5iib,左旋变色即可,插入24,变成5iia,右旋后变成5i,变色即可,最后插入17,完美。 看图说话是不是就简单明了了,看在我画图这么辛苦的份上,点个关注给个赞可好(滑稽)。 TreeNode的插入 了解了红黑树的删除之后,我们再来看下TreeNode中是怎样用代码实现的: static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) { x.red = true; for (TreeNode<K,V> xp, xpp, xppl, xppr;;) { //情景1:父节点为null if ((xp = x.parent) == null) { x.red = false; return x; } //情景2,3:父节点是黑色节点或者祖父节点为null else if (!xp.red || (xpp = xp.parent) == null) return root; //情景4:插入的节点父节点和祖父节点都存在,并且其父节点是祖父节点的左节点 if (xp == (xppl = xpp.left)) { //情景4i:插入节点的叔叔节点是红色 if ((xppr = xpp.right) != null && xppr.red) { xppr.red = false; xp.red = false; xpp.red = true; x = xpp; } //情景4ii:插入节点的叔叔节点是黑色或不存在 else { //情景4iia:插入节点是其父节点的右孩子 if (x == xp.right) { root = rotateLeft(root, x = xp); xpp = (xp = x.parent) == null ? null : xp.parent; } //情景4iib:插入节点是其父节点的左孩子 if (xp != null) { xp.red = false; if (xpp != null) { xpp.red = true; root = rotateRight(root, xpp); } } } } //情景5:插入的节点父节点和祖父节点都存在,并且其父节点是祖父节点的右节点 else { //情景5i:插入节点的叔叔节点是红色 if (xppl != null && xppl.red) { xppl.red = false; xp.red = false; xpp.red = true; x = xpp; } //情景5ii:插入节点的叔叔节点是黑色或不存在 else {· //情景5iia:插入节点是其父节点的左孩子 if (x == xp.left) { root = rotateRight(root, x = xp); xpp = (xp = x.parent) == null ? null : xp.parent; } //情景5iib:插入节点是其父节点的右孩子 if (xp != null) { xp.red = false; if (xpp != null) { xpp.red = true; root = rotateLeft(root, xpp); } } } } } } 其实就是一毛一样的,对号入座即可。 红黑树的删除 讲完插入,接下来我们来说说删除,删除的话,比插入还要复杂一点,请各位看官先深呼吸,做好阅读准备。 之前已经说过,红黑树是一颗特殊的二叉搜索树,所以进行删除操作时,其实是先进行二叉搜索树的删除,然后再进行调整。所以,其实这里分为两部分内容:1.二叉搜索树的删除,2.红黑树的删除调整。 二叉搜索树的删除主要有这么几种情景: 情景1:待删除的节点无左右孩子。 情景2:待删除的节点只有左孩子或者右孩子。 情景3:待删除的节点既有左孩子又有右孩子。 对于情景1,直接删除即可,情景2,则直接把该节点的父节点指向它的左孩子或者右孩子即可,情景3稍微复杂一点,需要先找到其右子树的最左孩子(或者左子树的最右孩子),即左(右)子树中序遍历时的第一个节点,然后将其与待删除的节点互换,最后再删除该节点(如果有右子树,则右子树上位)。总之,就是先找到它的替代者,找到之后替换这个要删除的节点,然后再把这个节点真正删除掉。 其实二叉搜索树的删除总体来说还是比较简单的,删除完之后,如果替代者是红色节点,则不需要调整,如果是黑色节点,则会导致左子树和右子树路径中黑色节点数量不一致,需要进行红黑树的调整,跟上面一样,替代节点为其父节点的左孩子与右孩子的情况类似,所以这里只说其为左孩子的情景(PS:上一步的寻找替换节点使用的是右子树的最左节点,所以该节点如果有孩子,只能是右孩子): 情景1:只有右孩子且为红色,直接用右孩子替换该节点然后变成黑色即可。 (D代表替代节点,即要被删除的节点,之前在经过二叉搜索树的删除后,D节点其实已经被删除了,这里为了方便理解这个变化过程,所以把这个节点也画出来了,所以当前的初始状态是待删除节点与其替换节点互换位置与颜色之后的状态) 情景2:只有右孩子且为黑色,那么删除该节点会导致父节点的左子树路径上黑色节点减一,此时只能去借助右子树,从右子树中借一个红色节点过来即可,具体取决于右子树的情况,这里又分成两种: i.兄弟节点是红色,则此时父节点是黑色,且兄弟节点肯定有两个孩子,且兄弟节点的左右子树路径上均有两个黑色节点,此时只需将兄弟节点与父节点颜色互换,然后将父节点左旋,左旋后,兄弟节点的左子树SL挂到了父节点p的右孩子位置,这时会导致p的右子树路径上的黑色节点比左子树多一,此时再SL置为红色即可。 ii.兄弟节点是黑色,那么就只能打它孩子的主意了,这里主要关注远侄子(兄弟节点的右孩子,即SR)的颜色情况,这里分成两种情况: a.远侄子SR是黑色,近侄子任意(白色代表颜色可为任意颜色),则先将S转为红色,然后右旋,再将SL换成P节点颜色,P涂成黑色,S也涂成黑色,再进行左旋即可。其实简单说就是SL上位,替换父节点位置。 b.远侄子SR为红色,近侄子任意(该子树路径中有且仅有一个黑色节点),则先将兄弟节点与父节点颜色互换,将SR涂成黑色,再将父节点左旋即可。 emmmm...好像也不是很麻烦嘛(逃)。 TreeNode的删除节点 TreeNode删除节点其实也是两步走,先进行二叉搜索树的删除,然后再进行红黑树的调整,跟之前的情况分析是一致的。 final void removeTreeNode(HashMap<K,V> map, Node<K,V>[] tab, boolean movable) { ...... //p是待删除节点,replacement用于后续的红黑树调整,指向的是p或者p的继承者。 //如果p是叶子节点,p==replacement,否则replacement为p的右子树中最左节点 if (replacement != p) { //若p不是叶子节点,则让replacement的父节点指向p的父节点 TreeNode<K,V> pp = replacement.parent = p.parent; if (pp == null) root = replacement; else if (p == pp.left) pp.left = replacement; else pp.right = replacement; p.left = p.right = p.parent = null; } //若待删除的节点p时红色的,则树平衡未被破坏,无需进行调整。 //否则删除节点后需要进行调整 TreeNode<K,V> r = p.red ? root : balanceDeletion(root, replacement); //p为叶子节点,则直接将p从树中清除 if (replacement == p) { // detach TreeNode<K,V> pp = p.parent; p.parent = null; if (pp != null) { if (p == pp.left) pp.left = null; else if (p == pp.right) pp.right = null; } } } 麻烦的地方就在删除节点后的调整了,所有逻辑都在balanceDeletion函数里,两个参数分别表示根节点和删除节点的继承者,来看看它的具体实现: static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) { for (TreeNode<K,V> xp, xpl, xpr;;) { //x为空或x为根节点,直接返回 if (x == null || x == root) return root; //x为根节点,染成黑色,直接返回(因为调整过后,root并不一定指向删除操作过后的根节点,如果之前删除的是root节点,则x将成为新的根节点) else if ((xp = x.parent) == null) { x.red = false; return x; } //如果x为红色,则无需调整,返回 else if (x.red) { x.red = false; return root; } //x为其父节点的左孩子 else if ((xpl = xp.left) == x) { //如果它有红色的兄弟节点xpr,那么它的父亲节点xp一定是黑色节点 if ((xpr = xp.right) != null && xpr.red) { xpr.red = false; xp.red = true; //对父节点xp做左旋转 root = rotateLeft(root, xp); //重新将xp指向x的父节点,xpr指向xp新的右孩子 xpr = (xp = x.parent) == null ? null : xp.right; } //如果xpr为空,则向上继续调整,将x的父节点xp作为新的x继续循环 if (xpr == null) x = xp; else { //sl和sr分别为其近侄子和远侄子 TreeNode<K,V> sl = xpr.left, sr = xpr.right; if ((sr == null || !sr.red) && (sl == null || !sl.red)) { xpr.red = true; //若sl和sr都为黑色或者不存在,即xpr没有红色孩子,则将xpr染红 x = xp; //本轮结束,继续向上循环 } else { //否则的话,就需要进一步调整 if (sr == null || !sr.red) { if (sl != null) //若左孩子为红,右孩子不存在或为黑 sl.red = false; //左孩子染黑 xpr.red = true; //将xpr染红 root = rotateRight(root, xpr); //右旋 xpr = (xp = x.parent) == null ? null : xp.right; //右旋后,xpr指向xp的新右孩子,即上一步中的sl } if (xpr != null) { xpr.red = (xp == null) ? false : xp.red; //xpr染成跟父节点一致的颜色,为后面父节点xp的左旋做准备 if ((sr = xpr.right) != null) sr.red = false; //xpr新的右孩子染黑,防止出现两个红色相连 } if (xp != null) { xp.red = false; //将xp染黑,并对其左旋,这样就能保证被删除的X所在的路径又多了一个黑色节点,从而达到恢复平衡的目的 root = rotateLeft(root, xp); } //到此调整已经完毕,进入下一次循环后将直接退出 x = root; } } } //x为其父节点的右孩子,跟上面类似 else { // symmetric if (xpl != null && xpl.red) { xpl.red = false; xp.red = true; root = rotateRight(root, xp); xpl = (xp = x.parent) == null ? null : xp.left; } if (xpl == null) x = xp; else { TreeNode<K,V> sl = xpl.left, sr = xpl.right; if ((sl == null || !sl.red) && (sr == null || !sr.red)) { xpl.red = true; x = xp; } else { if (sl == null || !sl.red) { if (sr != null) sr.red = false; xpl.red = true; root = rotateLeft(root, xpl); xpl = (xp = x.parent) == null ? null : xp.left; } if (xpl != null) { xpl.red = (xp == null) ? false : xp.red; if ((sl = xpl.left) != null) sl.red = false; } if (xp != null) { xp.red = false; root = rotateRight(root, xp); } x = root; } } } } } 呼。。。终于。。酝酿了好多天的一篇文章总算是写完了,为了尽量确认转换的准确性,找了很多资料进行参考,过程中花了不少时间,曾多次准备放弃。。。不过总算是没有死在娘胎里,也算是完成了一桩心事,开心。. 之后还会继续更新,欢迎大家继续关注。也欢迎大家前来打脸真正重要的东西,用眼睛是看不见的。

优秀的个人博客,低调大师

【Java入门提高篇】Day24 Java容器类详解(七)HashMap源码分析(下)

前两篇对HashMap这家伙的主要方法,主要算法做了一个详细的介绍,本篇主要介绍HashMap中默默无闻地工作着的集合们,包括KeySet,values,EntrySet,以及对应的迭代器:HashIterator,KeyIterator,ValueIterator,EntryIterator和fast-fail 机制。会介绍三个集合的作用以及它们中隐藏的惊人秘密。 KeySet 我们先来看看KeySet,HashMap中的成员变量keySet保存了所有的Key集合,事实上,这是继承自它的父类AbstractMap的成员变量: transient Set<K> keySet; 而keySet方法,也是覆盖了父类的方法: //AbstractMap 中的keySet方法 public Set<K> keySet() { Set<K> ks = keySet; if (ks == null) { ks = new AbstractSet<K>() { public Iterator<K> iterator() { return new Iterator<K>() { private Iterator<Entry<K,V>> i = entrySet().iterator(); public boolean hasNext() { return i.hasNext(); } public K next() { return i.next().getKey(); } public void remove() { i.remove(); } }; } public int size() { return AbstractMap.this.size(); } public boolean isEmpty() { return AbstractMap.this.isEmpty(); } public void clear() { AbstractMap.this.clear(); } public boolean contains(Object k) { return AbstractMap.this.containsKey(k); } }; keySet = ks; } return ks; } //HashMap 中的keySet方法 /** * 返回一个键值的集合视图,该集合由map支持,因此对map的更改会反映在集合中,反之亦然。 * 如果在对集合进行迭代的过程中修改了map中的映射(除了通过迭代器的删除操作),迭代的结果是未定义的。 * 该集合支持元素删除,通过Iterator.remove,Set.remove,removeAll,retainAll和clear操作 * 从映射中删除相应的映射。 它不支持add或addAll操作。 */ public Set<K> keySet() { Set<K> ks = keySet; if (ks == null) { ks = new KeySet(); keySet = ks; } return ks; } 可以看到,AbstractMap中keySet是一个AbstractSet类型,而覆盖后的keySet方法中,keySet被赋值为KeySet类型。翻翻构造器可以发现,在构造器中并没有初始化keySet,而是在KeySet方法中对keySet进行的初始化(HashMap中都是使用类似的懒加载机制),KeySet是HashMap中的一个内部类,让我们再来看看这个KeySet类型的全貌: final class KeySet extends AbstractSet<K> { public final int size() { return size; } public final void clear() { this.clear(); } public final Iterator<K> iterator() { return new KeyIterator(); } public final boolean contains(Object o) { return containsKey(o); } public final boolean remove(Object key) { return removeNode(hash(key), key, null, false, true) != null; } public final Spliterator<K> spliterator() { return new KeySpliterator<>(HashMap.this, 0, -1, 0, 0); } public final void forEach(Consumer<? super K> action) { Node<K,V>[] tab; if (action == null) throw new NullPointerException(); if (size > 0 && (tab = table) != null) { int mc = modCount; for (int i = 0; i < tab.length; ++i) { for (Node<K,V> e = tab[i]; e != null; e = e.next) action.accept(e.key); } if (modCount != mc) throw new ConcurrentModificationException(); } } } 其实KeySet就是继承自AbstractSet,并覆盖了其中的大部分方法,遍历KeySet时,会使用其中的KeyIterator,至于Spliterator,是为并行遍历设计的,一般是用于Stream的并行操作。forEach方法则是用于遍历操作,将函数式接口操作action应用于每一个元素,我们来看一个小栗子: public class Test { public static void main(String[] args) { Map<String, Integer> map = new HashMap(); map.put("小明", 66); map.put("小李", 77); map.put("小红", 88); map.put("小刚", 89); map.put("小力", 90); map.put("小王", 91); map.put("小黄", 92); map.put("小青", 93); map.put("小绿", 94); map.put("小黑", 95); map.put("小蓝", 96); map.put("小紫", 97); map.put("小橙", 98); map.put("小赤", 99); map.put("Frank", 100); Set<String> ks = map.keySet(); System.out.printf("keySet:%s,keySet的大小:%d,keySet中是否包含Frank:%s", ks, ks.size(), ks.contains("Frank")); System.out.println(); ks.forEach((item) -> System.out.println(item)); } } 输出如下: keySet:[小刚, 小橙, 小蓝, 小力, 小青, 小黑, 小明, 小李, 小王, 小紫, 小红, 小绿, Frank, 小黄, 小赤],keySet的大小:15,keySet中是否包含Frank:true 小刚 小橙 小蓝 小力 小青 小黑 小明 小李 小王 小紫 小红 小绿 Frank 小黄 小赤 如果不记得这个AbstractMap和AbstractSet在容器框架中是什么地位,可以往前翻翻这系列文章的第一篇,看看容器家族的族谱。 但是说了这么多,这个keySet。里面的元素是什么时候放进去的呢?我们自然会想到,大概就是调用put方法往里添加元素的时候,顺便把key放进keySet中,完美!让我们再回顾一下putVal方法,来看看是不是这样的: final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; //如果当前table未初始化,则先重新调整大小至初始容量 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; //(n-1)& hash 这个地方即根据hash求序号,想了解更多散列相关内容可以查看下一篇 if ((p = tab[i = (n - 1) & hash]) == null) //不存在,则新建节点 tab[i] = newNode(hash, key, value, null); else { Node<K,V> e; K k; //先找到对应的node if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; else if (p instanceof TreeNode) //如果是树节点,则调用相应的putVal方法,这部分放在第三篇内容里 //todo putTreeVal e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { //如果是链表则之间遍历查找 for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { //如果没有找到则在该链表新建一个节点挂在最后 p.next = newNode(hash, key, value, null); if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st //如果链表长度达到树化的最大长度,则进行树化,该函数内容也放在第三篇 //todo treeifyBin treeifyBin(tab, hash); break; } if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } //如果已存在该key的映射,则将值进行替换 if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } //修改次数加一 ++modCount; if (++size > threshold) resize(); afterNodeInsertion(evict); return null; } emmmmm,好像没找到?你也许会想,会不会是在TreeNode的putTreeVal方法或者在treeifyBin方法中对key进行插入?好了好了,不要再翻了,其实这个奥秘隐藏在KeySet的迭代器中,再回头看看,它的迭代器返回的是一个KeyIterator,而KeyIterator也是HashMap中的一个内部类,继承自HashMap中的另一个内部类HashIterator。 HashIterator 让我们带着这个疑问,来看看这个HashIterator类里到底有什么玄机: abstract class HashIterator { //指向下一个节点 Node<K,V> next; //当前节点 Node<K,V> current; //为实现 fast-fail 机制而设置的期望修改数 int expectedModCount; //当前遍历到的序号 int index; HashIterator() { expectedModCount = modCount; Node<K,V>[] t = table; current = next = null; index = 0; if (t != null && size > 0) { // 移动到第一个非null节点 do {} while (index < t.length && (next = t[index++]) == null); } } public final boolean hasNext() { return next != null; } final Node<K,V> nextNode() { Node<K,V>[] t; Node<K,V> e = next; // fast-fail 机制的实现 即在迭代器往后遍历时,每次都检测expectedModCount是否和modCount相等 // 不相等则抛出ConcurrentModificationException异常 if (modCount != expectedModCount) throw new ConcurrentModificationException(); //如果遍历越界,则抛出NoSuchElementException异常 if (e == null) throw new NoSuchElementException(); if ((next = (current = e).next) == null && (t = table) != null) { //如果遍历到末尾,则跳到table中下一个不为null的节点处 do {} while (index < t.length && (next = t[index++]) == null); } return e; } public final void remove() { Node<K,V> p = current; if (p == null) throw new IllegalStateException(); if (modCount != expectedModCount) throw new ConcurrentModificationException(); current = null; K key = p.key; //移除节点 removeNode(hash(key), key, null, false, false); expectedModCount = modCount; } } 可以发现,在迭代器中,使用nextNode进行遍历时,先把next引用赋值给current,然后把next.next赋值给next,再获取了外部类HashMap中的table引用(t = table),这样就直接通过遍历table的方式来实现对key,value和entry的读取。 if ((next = (current = e).next) == null && (t = table) != null) { //如果遍历到末尾,则跳到table中下一个不为null的节点处 do {} while (index < t.length && (next = t[index++]) == null); } KeyIterator,ValueIterator,EntryIterator都是HashIterator的子类,实现也很简单,仅仅修改了泛型类型: final class KeyIterator extends HashIterator implements Iterator<K> { public final K next() { return nextNode().key; } } final class ValueIterator extends HashIterator implements Iterator<V> { public final V next() { return nextNode().value; } } final class EntryIterator extends HashIterator implements Iterator<Map.Entry<K,V>> { public final Map.Entry<K,V> next() { return nextNode(); } } 这样keySet在遍历的时候,就可以通过它的迭代器去遍历访问外部类HashMap中的table,类似的,values和entrySet也是使用相似的方式进行遍历。 public Collection<V> values() { Collection<V> vs = values; if (vs == null) { vs = new Values(); values = vs; } return vs; } final class Values extends AbstractCollection<V> { public final int size() { return size; } public final void clear() { this.clear(); } public final Iterator<V> iterator() { return new ValueIterator(); } public final boolean contains(Object o) { return containsValue(o); } public final Spliterator<V> spliterator() { return new ValueSpliterator<>(HashMap.this, 0, -1, 0, 0); } public final void forEach(Consumer<? super V> action) { Node<K,V>[] tab; if (action == null) throw new NullPointerException(); if (size > 0 && (tab = table) != null) { int mc = modCount; for (int i = 0; i < tab.length; ++i) { for (Node<K,V> e = tab[i]; e != null; e = e.next) action.accept(e.value); } if (modCount != mc) throw new ConcurrentModificationException(); } } } public Set<Map.Entry<K,V>> entrySet() { Set<Map.Entry<K,V>> es; return (es = entrySet) == null ? (entrySet = new EntrySet()) : es; } final class EntrySet extends AbstractSet<Map.Entry<K,V>> { public final int size() { return size; } public final void clear() { this.clear(); } public final Iterator<Map.Entry<K,V>> iterator() { return new EntryIterator(); } public final boolean contains(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>) o; Object key = e.getKey(); Node<K,V> candidate = getNode(hash(key), key); return candidate != null && candidate.equals(e); } public final boolean remove(Object o) { if (o instanceof Map.Entry) { Map.Entry<?,?> e = (Map.Entry<?,?>) o; Object key = e.getKey(); Object value = e.getValue(); return removeNode(hash(key), key, value, true, true) != null; } return false; } public final Spliterator<Map.Entry<K,V>> spliterator() { return new EntrySpliterator<K,V>(HashMap.this, 0, -1, 0, 0); } public final void forEach(Consumer<? super Map.Entry<K,V>> action) { Node<K,V>[] tab; if (action == null) throw new NullPointerException(); if (size > 0 && (tab = table) != null) { int mc = modCount; for (int i = 0; i < tab.length; ++i) { for (Node<K,V> e = tab[i]; e != null; e = e.next) action.accept(e); } if (modCount != mc) throw new ConcurrentModificationException(); } } } 至此,这个未解之谜算是告一段落了。 transient 但是,细心的同学可能会发现,HashMap中的table,entrySet,keySet,value等成员变量,都是用transient修饰的,为什么要这样做呢? 首先,我们还是先说说这个transient是干嘛用的,这就要涉及Java中的序列化了,序列化是什么东西呢? Java中对象的序列化指的是将对象转换成以字节序列的形式来表示,这些字节序列包含了对象的数据和信息。一个序列化后的对象可以被写到数据库或文件中,也可用于网络传输,一般当我们使用缓存cache(内存空间不够有可能会本地存储到硬盘)或远程调用rpc(网络传输)的时候,经常需要让我们的实体类实现Serializable接口,目的就是为了让其可序列化。 当然,就像数据存储是为了读取那样,序列化后的最终目的是为了恢复成原先的Java对象,要不然序列化后干嘛呢,这个过程就叫做反序列化。 当我们使用实现Serializable接口的方式来进行序列化时,所有字段都会被序列化,那如果不想让某个字段被序列化(比如出于安全考虑,不将敏感字段序列化传输),便可以使用transient关键字来标志,表示不想让这个字段被序列化。 那么问题来了,存储节点信息的table用transient修饰了,那么序列化和反序列化的时候,数据还怎么传输??? emmmm,这又涉及到一个蛋疼的操作,序列化并没有那么简单,实现了Serializable接口后,在序列化时,会先检测这个类是否存在writeObject和readObject方法,如果存在,则调用相应的方法: /** * 将HashMap的实例状态保存到一个流中 */ private void writeObject(java.io.ObjectOutputStream s) throws IOException { int buckets = capacity(); // 写出threshold,loadfactor和所有隐藏的成员 s.defaultWriteObject(); s.writeInt(buckets); s.writeInt(size); internalWriteEntries(s); } /** * 从流中重构HashMap实例 */ private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { // 读取threshold,loadfactor和所有隐藏的成员 s.defaultReadObject(); reinitialize(); if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new InvalidObjectException("Illegal load factor: " + loadFactor); // 读取并忽略桶的数量 s.readInt(); // 读取映射的数量 int mappings = s.readInt(); if (mappings < 0) throw new InvalidObjectException("Illegal mappings count: " + mappings); else if (mappings > 0) { // (如果是0,则使用默认值) // Size the table using given load factor only if within // range of 0.25...4.0 float lf = Math.min(Math.max(0.25f, loadFactor), 4.0f); float fc = (float)mappings / lf + 1.0f; int cap = ((fc < DEFAULT_INITIAL_CAPACITY) ? DEFAULT_INITIAL_CAPACITY : (fc >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)fc)); float ft = (float)cap * lf; threshold = ((cap < MAXIMUM_CAPACITY && ft < MAXIMUM_CAPACITY) ? (int)ft : Integer.MAX_VALUE); SharedSecrets.getJavaOISAccess().checkArray(s, Map.Entry[].class, cap); @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] tab = (Node<K,V>[])new Node[cap]; table = tab; // 读取键值对信息,然后把映射插入HashMap实例中 for (int i = 0; i < mappings; i++) { @SuppressWarnings("unchecked") K key = (K) s.readObject(); @SuppressWarnings("unchecked") V value = (V) s.readObject(); putVal(hash(key), key, value, false, false); } } } 这确实是一个极其糟糕的设计。。。而且这里还是一个private方法。 那么直接使用默认的序列化不好吗?非要大费周章的骚操作一波?一部分原因是为了解决效率问题,因为HashMap中很多桶是空的,将其序列化没有任何意义,所以需要手动使用 writeObject() 方法,只序列化实际存储元素的数组。另一个很重要的原因便是,HashMap的存储是依赖于对象的hashCode的,而Object.hashCode()方法是依赖于具体虚拟机的,所以同一个对象,在不同虚拟机中的HashCode可能不同,那这样映射到的HashMap中的位置也不一样,这样序列化和反序列化的对象就不一样了。引用大神的一段话: For example, consider the case of a hash table. The physical representation is a sequence of hash buckets containing key-value entries. The bucket that an entry resides in is a function of the hash code of its key, which is not, in general, guaranteed to be the same from JVM implementation to JVM implementation. In fact, it isn't even guaranteed to be the same from run to run. Therefore, accepting the default serialized form for a hash table would constitute a serious bug. Serializing and deserializing the hash table could yield an object whose invariants were seriously corrupt. 蹩脚翻译一下: 例如,考虑散列表的情况。 它的物理存储是一系列包含键值条目的散列桶。 条目驻留的存储区是其密钥的哈希码的函数,通常,JVM的实现不保证相同。 事实上,它甚至不能保证每次运行都是一样的。 因此,接受哈希表的默认序列化形式将构成严重的错误。 对哈希表进行序列化和反序列化可能会产生不变性被严重损毁的对象。 好了,到此为止,这部分内容算是over了,后面会继续介绍HashMap中最麻烦的一部分,TreeNode让我们师母已呆 记得动动小手点个赞或者点个关注哦,如果觉得不错的话,也欢迎分享给你的朋友,让bug传播的更远一些,呸,说错了,让知识传播的更远一些如果写的有误的地方,欢迎大家及时指出,我会第一时间予以修正,也欢迎提出改进建议,之后还会继续更新,欢迎继续关注! 真正重要的东西,用眼睛是看不见的。

优秀的个人博客,低调大师

【Java入门提高篇】Day23 Java容器类详解(六)HashMap源码分析(中)

上一篇中对HashMap中的基本内容做了详细的介绍,解析了其中的get和put方法,想必大家对于HashMap也有了更好的认识,本篇将从了算法的角度,来分析HashMap中的那些函数。 HashCode 先来说说HashMap中HashCode的算法,在上一篇里,我们看到了HashMap中的put方法是这样的: public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } 那这个hash函数又是什么呢?让我们来看看它的真面目: /** * 将高位与低位进行与运算来计算哈希值。因为在hashmap中使用2的整数幂来作为掩码,所以只在当前掩码之上的位上发生 * 变化的散列总是会发生冲突。(在已知的例子中,Float键的集合在小表中保持连续的整数)因此,我们应用一个位运算 * 来向下转移高位的影响。 这是在综合考虑了运算速度,效用和质量之后的权衡。因为许多常见的散列集合已经合理分布 * (所以不能从扩散中受益),并且因为我们使用树来处理bin中发生的大量碰撞的情况,所以我们尽可能以代价最低的方式 * 对一些位移进行异或运算以减少系统损失, 以及合并由于hashmap容量边界而不会被用于散列运算的最高位的影响。 * * todo 扰动函数 */ static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } 可以看出,这里并不是简单的使用了key的hashCode,而是将它的高16位与低16位做了一个异或操作。(“>>>”是无符号右移的意思,即右移的时候左边空出的部分用0填充)这是一个扰动函数,具体效果后面会说明。接下来再看看之前的putval方法: 1 final V putVal(int hash, K key, V value, boolean onlyIfAbsent, 2 boolean evict) { 3 Node<K,V>[] tab; Node<K,V> p; int n, i; 4 //如果当前table未初始化,则先重新调整大小至初始容量 5 if ((tab = table) == null || (n = tab.length) == 0) 6 n = (tab = resize()).length; 7 //(n-1)& hash 这个地方即根据hash求序号,想了解更多散列相关内容可以查看下一篇 8 if ((p = tab[i = (n - 1) & hash]) == null) 9 //不存在,则新建节点 10 tab[i] = newNode(hash, key, value, null); 11 else { 12 Node<K,V> e; K k; 13 //先找到对应的node 14 if (p.hash == hash && 15 ((k = p.key) == key || (key != null && key.equals(k)))) 16 e = p; 17 else if (p instanceof TreeNode) 18 //如果是树节点,则调用相应的putVal方法,这部分放在第三篇内容里 19 //todo putTreeVal 20 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); 21 else { 22 //如果是链表则之间遍历查找 23 for (int binCount = 0; ; ++binCount) { 24 if ((e = p.next) == null) { 25 //如果没有找到则在该链表新建一个节点挂在最后 26 p.next = newNode(hash, key, value, null); 27 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st 28 //如果链表长度达到树化的最大长度,则进行树化,该函数内容也放在第三篇 29 //todo treeifyBin 30 treeifyBin(tab, hash); 31 break; 32 } 33 if (e.hash == hash && 34 ((k = e.key) == key || (key != null && key.equals(k)))) 35 break; 36 p = e; 37 } 38 } 39 //如果已存在该key的映射,则将值进行替换 40 if (e != null) { // existing mapping for key 41 V oldValue = e.value; 42 if (!onlyIfAbsent || oldValue == null) 43 e.value = value; 44 afterNodeAccess(e); 45 return oldValue; 46 } 47 } 48 //修改次数加一 49 ++modCount; 50 if (++size > threshold) 51 resize(); 52 afterNodeInsertion(evict); 53 return null; 54 } 注意看第八行的代码: tab[i = (n - 1) & hash] (n - 1) & hash即通过key的hash值来取对应的数组下标,并非是对table的size进行取余操作。 那么,为什么要这样做呢?首先,扰动函数的目的就是为了扩大高位的影响,使得计算出来的数值包含了高 16 位和第 16 位的特性,让 hash 值更加深不可测来降低碰撞的概率。从hash方法的注释中,我们也可以找到答案,一般的散列,其实都是做取余处理,但是HashMap中的table大小是2的整数次幂,也就是说,肯定不是质数,那么在取余的时候,偶数的映射范围势必就要小了一半,这样效果显然就差很多,而且,除法和取余其实是很慢的操作,所以在JDK8中,使用了一种很巧妙的方式来进行散列。首先,table的大小size设置成了2的整数次幂,这样使用size-1就变成了掩码,下面是我找的一张图,能很好的解释这个过程: n是table的大小,默认是16,二进制即为10000,n - 1对应的二进制则为1111,这样再与hash值做“与”操作时,就变成了掩码,除了最后四位全部被置为0,而最后四位的范围肯定会落在(0~n-1)之间,正好是数组的大小范围,散列函数的妙处就在于此了。简直不能更稳,一波操作猛如虎。 那么我们继续上一篇的栗子,我们来一步一步分析一下,小明和小李的hash值的映射过程: 小明的hash值是756692,转换为二进制为10111000101111010100,table的大小是32,n-1=31,对应的二进制为:11111,做“与”运算之后,得到的结果是10100,即为20。 小李的hash值是757012,转换为二进制为10111000110100010100,与11111做与运算后,得到的结果也是10100,即20,于是就与小明发生了冲突,但还是要先来后到,于是小李就挂在了小明后面。 散列函数看完了,我们接下来再看看扩容函数。 扩容函数 扩容函数其实之前也已经见过了,就在上面的putVal方法里,往上面翻一翻,第六行可以看到resize函数,这就是扩容函数,让我们来看看它的庐山真面目: 1 /** 2 * 初始化或将table的大小进行扩容。 如果table为null,则按照字段threshold中的初始容量目标进行分配。 3 * 否则,因为我们使用2次幂进行扩容,所以在新表中,来自每个bin中的元素必须保持在相同的索引处,或者以原偏移量的2次幂进行移动。 4 */ 5 final Node<K,V>[] resize() { 6 Node<K,V>[] oldTab = table; 7 int oldCap = (oldTab == null) ? 0 : oldTab.length; 8 int oldThr = threshold; 9 int newCap, newThr = 0; 10 if (oldCap > 0) { 11 if (oldCap >= MAXIMUM_CAPACITY) { 12 threshold = Integer.MAX_VALUE; 13 return oldTab; 14 } 15 //新的容量扩展成原来的两倍 16 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && 17 oldCap >= DEFAULT_INITIAL_CAPACITY) 18 //阈值也调整为原来的两倍 19 newThr = oldThr << 1; // double threshold 20 } 21 else if (oldThr > 0) // initial capacity was placed in threshold 22 newCap = oldThr; 23 else { // zero initial threshold signifies using defaults 24 newCap = DEFAULT_INITIAL_CAPACITY; 25 newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); 26 } 27 if (newThr == 0) { 28 float ft = (float)newCap * loadFactor; 29 newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? 30 (int)ft : Integer.MAX_VALUE); 31 } 32 threshold = newThr; 33 @SuppressWarnings({"rawtypes","unchecked"}) 34 Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; 35 table = newTab; 36 //将旧数组中的node重新散列到新数组中 37 if (oldTab != null) { 38 for (int j = 0; j < oldCap; ++j) { 39 Node<K,V> e; 40 if ((e = oldTab[j]) != null) { 41 oldTab[j] = null; 42 if (e.next == null) 43 newTab[e.hash & (newCap - 1)] = e; 44 else if (e instanceof TreeNode) 45 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); 46 else { // preserve order 47 Node<K,V> loHead = null, loTail = null; 48 Node<K,V> hiHead = null, hiTail = null; 49 Node<K,V> next; 50 do { 51 next = e.next; 52 if ((e.hash & oldCap) == 0) { 53 if (loTail == null) 54 loHead = e; 55 else 56 loTail.next = e; 57 loTail = e; 58 } 59 else { 60 if (hiTail == null) 61 hiHead = e; 62 else 63 hiTail.next = e; 64 hiTail = e; 65 } 66 } while ((e = next) != null); 67 if (loTail != null) { 68 loTail.next = null; 69 newTab[j] = loHead; 70 } 71 if (hiTail != null) { 72 hiTail.next = null; 73 newTab[j + oldCap] = hiHead; 74 } 75 } 76 } 77 } 78 } 79 return newTab; 80 } 这里可以看到,如果原来的table还未被初始化的话,调用该函数后就会被扩容到默认大小(16),上一篇中也已经说过,HashMap也是使用了懒加载的方式,在构造函数中并没有初始化table,而是在延迟到了第一次插入元素之后。 当使用put插入元素的时候,如果发现目前的bins占用程度已经超过了Load Factor所设置的比例,那么就会发生resize,简单来说就是把原来的容量和阈值都调整为原来的2倍,之后重新计算index,把节点再放到新的bin中。因为index值的计算与table数组的大小有关,所以扩容后,元素的位置有可能会调整: 以上图为例,如果对应的hash值第五位是0,那么做与操作后,得到的序号不会变,那么它的位置就不会改变,相反,如果是1,那么它的新序号就会变成原来的序号+16,。 好像也不是很多嘛,嗯,算法部分就先介绍到这里了,之后的一篇再来说说HashMap中的EntrySet,KeySet和values(如果时间够的话顺便把迭代器也说一说)。 好了,本篇就此愉快的结束了,最后祝大家端午节快乐!如果觉得内容还不错的话记得动动小手点关注哦,你的支持就是我最大的动力! 真正重要的东西,用眼睛是看不见的。

优秀的个人博客,低调大师

10小时大数据入门实战(三)-分布式文件系统HDFS

HDFS 环境搭建 HDFS 伪分布式环境搭建 CentOS 环境安装步骤 MacOS安装环境 安装jdk jdk安装路径 /usr/libexec/java_home -V:列出所有版本的JAVA_HOME 设置 JAVA_HOME 添加java_home到.bash_profile文件中 export JAVA_HOME=$(/usr/libexec/java_home) export PATH=$JAVA_HOME/bin:$PATH export CLASS_PATH=$JAVA_HOME/lib Mac OS X ssh设置 输入命令ssh localhost,可能遇到如下问题 原因是没打开远程登录,进入系统设置->共享->远程登录打开就好 这时你再ssh localhost一下 下载 Hadoop 解压到soft目录 官方指南 编辑 hadoop-env.sh 文件 Hadoop也可以在伪分布模式下的单节点上运行,其中每个Hadoop守护进程都在单独的Java进程中运行 具体更改 新建一个临时文件目录 编辑 hdfs/core-site.xml 文件 配置 datanode 节点数 启动 hdfs 查看进程 访问 http://localhost:50070/ 表示HDFS已经安装成功 存活节点 步骤小结 关闭 HDFS Shell 操作 官网指南 先启动 HDFS 配置 hadoop 环境变量 成功 指令集 dfs fs 无差异 上传一个 txt 文件 创建文件夹 多层次文件夹 遍历所有文件夹 删除文件/文件夹 所上传的文件 Java 操作 HDFS 开发环境搭建 pom 文件 JavaAPI 操作 HDFS文件系统 测试通过 测试创建文件方法 查看 HDFS 文件的内容 上传文件到 HDFS 上传文件到 HDFS(带进度条) 测试通过 下载文件到本地 测试通过 查看某个目录下的所有文件 测试通过 删除文件/文件夹

优秀的个人博客,低调大师

一个简单的网络爬虫入门python(包括开发环境搭建和pycharm激活)

基本任务: I 搭建python开发环境 II 写一个简单的网络爬虫,在某一个网站将一部小说各章节(一般是一个章节一个网页)粘贴到一个文本文件内。 1 首先了解几个概念 1.1 网络爬虫 网络爬虫是一个自动提取网页的程序,它为搜索引擎从万维网上下载网页,是搜索引擎的重要组成。 网络爬虫可以模拟浏览器浏览网页,自动批量下载网络资源(能够访问到的,放在网络服务器的文件)。 传统爬虫从一个或若干初始网页的URL开始,获得初始网页上的URL,在抓取网页的过程中,不断从当前页面上抽取新的URL放入队列,直到满足系统的一定停止条件。聚焦爬虫的工作流程较为复杂,需要根据一定的网页分析算法过滤与主题无关的链接,保留有用的链接并将其放入等待抓取的URL队列。然后,它将根据一定的搜索策略从队列中选择下一步要抓取的网页URL,并重复上述过程,直到达到系统的某一条件时停止。另外,所有被爬虫抓取的网页将会被系统存贮,进行一定的分析、过滤,并建立索引,以便之后的查询和检索;对于聚焦爬虫来说,这一过程所得到的分析结果还可能对以后的抓取过程给出反馈和指导。 1.2 小说网站的基本结构 首页(总目录)→分类→小说目录页→小说各章节; 1.3 与网站的交互 通常都是用户通过浏览器(当IE)访问网站(网络上的服务器)。 浏览器:网址(href)、请求(requests)→网站服务器:响应(response)给浏览器→浏览器:缓存并呈现回复的内容。 1.4 开发环境 用高级语言写的代码需要解释或编码到机器代码,才能被计算机执行。 所以开发一个程序,最基本的需求是一个文本编辑器(写)和解释器或编译器。 开发复杂的程序,需要调试查错、需要引入第三方库,需要边结各文件,所以,一般来说,一个简单的文本编辑器加一个解释器或编译器是不够的,需要一个支持某种高级语言的调试器的较复杂的编辑器。这样的编辑器同时还支持可以方便写代码(如颜色分类显示、代码提示)的插件加入。 2 搭建python开发环境 2.1 下载和安装解释器python3.6 想学习可以加Python学习(q-u-n )-二二七,四三五,四五零 即可获取,内附:开发工具和安装包,以及视频资料系统学习路线图 2.1.2 关于环境变量path,就是可以保证执行python命令时,不需要输入安装路径。在安装时如果有直接勾选了"add python to path"则在安装时即自动配置了path的安装路径; path变量中有了安装路径后,即可直接在CMD中输入python,可以看到相关反馈相信,即表示安装和环境变量配置OK了; 如果不安装代码编辑器或专用IDE,也可以在CMD中输入python,进行交互式开发; 2.2 下载和安装代码编辑器加调试器:pycharm4 PyCharm 由著名的JetBrains公司开发,带有一整套可以帮助用户在使用Python语言开发时提高其效率的工 具,比如调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制。此外,该IDE提供了一些高级功能,以用于支持Django框架下的专业Web开发。 2.2.2进入激活界面,选择第二个License server,之后在License server address中随意输入下面两个注册码中的任意一个即可,http://idea.liyang.io或http://xidea.online。 激活后的状态: 3 添加第三方模块 第三方模块添加工具pip必须在安装python3.5时有勾选安装。 编辑或新建C:Userswwuhnwu01pippip.ini: [global] respect-virtualenv = true download-cache = f:/Python/pip/cache log-file = f:/Python/pip/pip.log [install] timeout = 30 find-links = http://pypi.douban.com find-links = http://pypi.douban.com/simple pycharm→文件→setting→project interpreter→pip→右边加号+→manage repositories→右边加号+→添加第三方模块的镜像服务器http://pypi.douban.com/和http://pypi.douban.com/simple 如果还是连接不上第三方模块服务器,有时需要重启一下pycharm。 4 一个简单的网络爬虫 3.1 新建项目 3.2 设置项目编译器 基本的思路: 进入一个小说的目录页,请求到目录(包括各章节的href和章节标题)的内容,提取到全部的href,再通过各href请求到各网页的内容,经过数据清洗和适当的回到,写入到一个文本文件 代码: 以上代码运行后,即可把整部小说的内容写入文本文件。 不同的网站,内面内容的写法会有差别,在提取数据和清洗数据时要做相应调整变化。 一些网站会禁止爬虫,当你尝试爬取网站资源时,一些网站会有反爬策略,将你的IP加入黑名单,当你爬取或访问时,页面会响应为:“HTTP Error 403: Forbidden错误”。 附源代码: import requests import re # 1 获取小说目录页 url = 'http://www.xiaoshuotxt.org/wuxia/1617/' response = requests.get(url) response.encoding = 'utf-8' html = response.text title = re.findall(r'<meta name="keywords" content="(.*?),',html)[0] # 4 数据持久化,写入txt) fb = open('%s.txt'%title, 'w', encoding='utf-8'); # 2 提取章节 menu = re.findall(r'正文(.*?)</table>',html)[0] chapter_info_list = re.findall(r'<a href="(.*?)" title=".*?">(.*?)</a>',menu) # 3 循环访问章节,并获取内容 for chapter_info in chapter_info_list: chapter_url = chapter_info[0] chapter_title = chapter_info[1] if 'http' not in chapter_url: chapter_url = 'http://www.xiaoshuotxt.org%s' % chapter_url chapter_response = requests.get(chapter_url) chapter_response.encoding = 'utf-8' chapter_html = chapter_response.text # 数据提取 chapter_content = re.findall(r'<div class="panel-body" id="htmlContent">(.*?)</div>',chapter_html) # 数据清洗(按页面规律) chapter_content = chapter_content.replace(' ','') chapter_content = chapter_content.replace('<br /><br /><br>','') # 数据持久化(写入txt),先要在前面新建文件 fb.write(chapter_title) fb.write(' ') fb.write(chapter_content) fb.write(' ') #用以下语句可以看到动态过程 print(chapter_url) -End-

优秀的个人博客,低调大师

深度学习入门该用PyTorch还是Keras?热门公开课换框架背后的学问

本文来自AI新媒体量子位(QbitAI) 你知道fast.ai么? 他们以提供零基础的深度学习课程而闻名,宣称:只要你有高中数学基础、一年的编程经验,就能通过七周的学习,具备一流的深度学习实践能力。 刚刚,他们宣布了一件事。 下一个fast.ai的课程,将完全基于一个使用PyTorch开发的框架,抛弃原来的TensorFlow和Keras框架。这是为什么? △Jeremy Howard 且听创始人Jeremy Howard详解缘由,也相当于分析了初学者该选择什么样的框架。量子位节选编译如下: 我们为什么开始尝试PyTorch? 当我们开发第二门课《面向程序员的前沿深度学习》的时候,原来选的TensorFlow和Keras框架开始让我们处处碰壁。 比如说,现在自然语言处理中最重要的技术,大概是attention模型。可是我们发现,当时在Keras上

优秀的个人博客,低调大师

来自小姐姐的入门推荐:7个基本机器学习算法Python实现

本文来自AI新媒体量子位(QbitAI) 有位美女小姐姐,刚刚在GitHub上放出一份福利。 嗯,正经的福利。 她总结了一份基本的机器学习算法,全部以纯Python(版本3.6+)实现。其中包括线性回归等七套算法,具体地址在此: https://github.com/zotroneneis/machine_learning_basics 这7个算法,及代码实现地址: 线性回归,http://t.cn/REk46x1 逻辑回归,http://t.cn/REk4Kpp 感知器,http://t.cn/REk40Y5 K近邻,http://t.cn/REk4H9I k平均聚类,http://t.cn/REk4rqI 只有一个隐层的简单神经网络,http://t.cn/REk4kHb 多类别逻辑回归,http://t.cn/REkbxOe 所有的算法都是从0开始实

优秀的个人博客,低调大师

echarts入门,5分钟上手写ECharts的第一个图表

1.新建一个echarts.html文件,为ECharts准备一个具备大小(宽高)的Dom。 <!DOCTYPE html> <head> <meta charset="utf-8"> <title>ECharts</title> </head> <body> <!-- 为ECharts准备一个具备大小(宽高)的Dom --> <div id="main" style="height:400px"></div> </body> 2.新建script标签引入模块化单文件echarts.js <!DOCTYPE html> <head> <meta charset="utf-8"> <title>ECharts</title> </head> <body> <!-- 为ECharts准备一个具备大小(宽高)的Dom --> <div id="main" style="height:400px"></div> <!-- ECharts单文件引入 --> <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> </body> 3.新建script标签中为模块加载器配置echarts和所需图表的路径 <!DOCTYPE html> <head> <meta charset="utf-8"> <title>ECharts</title> </head> <body> <!-- 为ECharts准备一个具备大小(宽高)的Dom --> <div id="main" style="height:400px"></div> <!-- ECharts单文件引入 --> <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> <script type="text/javascript"> // 路径配置 require.config({ paths: { echarts: 'http://echarts.baidu.com/build/dist' } }); </script> </body> 4.script标签内动态加载echarts和所需图表,回调函数中可以初始化图表并驱动图表的生成 <!DOCTYPE html> <head> <meta charset="utf-8"> <title>ECharts</title> </head> <body> <!-- 为ECharts准备一个具备大小(宽高)的Dom --> <div id="main" style="height:400px"></div> <!-- ECharts单文件引入 --> <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> <script type="text/javascript"> // 路径配置 require.config({ paths: { echarts: 'http://echarts.baidu.com/build/dist' } }); // 使用 require( [ 'echarts', 'echarts/chart/bar' // 使用柱状图就加载bar模块,按需加载 ], function (ec) { // 基于准备好的dom,初始化echarts图表 var myChart = ec.init(document.getElementById('main')); var option = { tooltip: { show: true }, legend: { data:['销量'] }, xAxis : [ { type : 'category', data : ["衬衫","羊毛衫","雪纺衫","裤子","高跟鞋","袜子"] } ], yAxis : [ { type : 'value' } ], series : [ { "name":"销量", "type":"bar", "data":[5, 20, 40, 10, 10, 20] } ] }; // 为echarts对象加载数据 myChart.setOption(option); } ); </script> </body> 5.查看效果 参考上述内容+官方相关案例,调整修改,应用到自己的项目中 <!DOCTYPE html> <head> <meta charset="utf-8"> <title>支付方式统计</title> <style> #main { margin: 30px; } </style> </head> <body> <!-- 为ECharts准备一个具备大小(宽高)的Dom --> <div id="main" style="height:500px"></div> <!-- ECharts单文件引入 --> <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> <script type="text/javascript"> // 路径配置 require.config({ paths: { echarts: 'http://echarts.baidu.com/build/dist' } }); // 使用 require( [ 'echarts', 'echarts/chart/pie' // 使用柱状图就加载bar模块,按需加载 ], function (ec) { // 基于准备好的dom,初始化echarts图表 var myChart = ec.init(document.getElementById('main')); option = { tooltip: { trigger: 'item', formatter: "{a} <br/>{b}: {c} ({d}%)" }, legend: { orient: 'vertical', x: 'left', data:['微信','支付宝','积分','其他'] }, series: [ { name:'支付方式', type:'pie', radius: ['50%', '70%'], avoidLabelOverlap: false, label: { normal: { show: false, position: 'center' }, emphasis: { show: true, textStyle: { fontSize: '30', fontWeight: 'bold' } } }, labelLine: { normal: { show: false } }, data:[ {value:'{sh:$payment.weixin}', name:'微信'}, {value:'{sh:$payment.alipay}', name:'支付宝'}, {value:'{sh:$payment.integral}', name:'积分'}, {value:'{sh:$payment.other}', name:'其他'} ] } ] }; // 为echarts对象加载数据 myChart.setOption(option); } ); </script> </body> 获取相关数据 <?php /** * 支付方式管理 */ class PaymentAction extends AgentAction{ protected function _initialize() { parent::_initialize(); } // 付款方式统计 public function index(){ // 统计订单的支付方式 $orderModel = D('Order'); $payment['weixin'] = $orderModel->getPaytypeOrderNum('weixin'); $payment['alipay'] = $orderModel->getPaytypeOrderNum('alipay'); $payment['integral'] = $orderModel->getPaytypeOrderNum('integral'); $payment['other'] = $orderModel->getPaytypeOrderNum(); $this->assign('payment',$payment); $this->display(); } } ?> 本文转自TBHacker博客园博客,原文链接:http://www.cnblogs.com/jiqing9006/p/5736135.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

分布式协调服务中间件ZooKeeper 入门(1)-ZK的介绍与特性

一、Zookeeper简介 Zookeeper是一个服务,是一个分布式协调技术,他提供高性能,分布式的协调服务。主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成“脏数据”的后果。它也提供了其他简单的功能,这样分布式系统可以基于它来实现更好的服务,比如同步,配置管理,集群等等。他使用文件系统目录树作为数据模型。服务端可以跑在java程序上,他提供java和C的客户端api。 什么是分布式系统? 1.由多台计算机组成一个整体2.计算机之间可以互相通信(rest/rpc)3.用户的一次请求可能由多台计算机共同计算得出结果 二、分布式系统所存在的瓶颈: ZK通过协调服务来对各个系统进行有序的管理三大特性:一致性、可用性、容错性

优秀的个人博客,低调大师

RHEL6入门系列之二十六,利用rpm进行软件包管理

在上篇博文中介绍的yum是目前在RHEL系统中安装软件的首选方式,传统的rpm则主要是用作查询,如查询系统中是否已经安装了某个软件等。今天我们就一起来了解一下rpm的一些常用用法。 一、rpm软件包 RPM软件包是将程序源代码经过编译和封装以后形成的包文件,在软件包里会封装软件的程序、配置文件、帮助手册等组件。 使用 RPM机制封装的软件包文件拥有约定俗成的命名格式,一般使用“软件名-软件版本-发布号.硬件平台类型.rpm”的文件名形式。 如上图中的“vsftpd-2.2.2-11.el6.x86_64.rpm”软件包,软件名称是“vsftpd”,版本号是“2.2.2”,发布号是“11.el6”(更新发布号主要是对软件存在的bug或漏洞进行了修补,在软件功能上则并没有增强,el6是指在rhel6系统中发布),硬件平台是“x86_64”(“x86_64”是指64位的PC架构,另外还有“i386”或“i686”等都是指32位的PC架构,noarch是指不区分硬件架构)。 RHEL6系统中所有的内置软件全都是以 RPM软件包的形式存储在系统光盘中。将RHEL6的系统光盘挂载到/mnt/cdrom,进入挂载目录,在Packages子目录中存放了所有的rpm软件包。 二、安装\卸载软件包 利用 rpm命令安装软件首先必须进入存放rpm软件包的目录,安装软件包所使用的命令是“rpm –ivh”。 选项的含义: -i 安装软件包 -v 显示安装过程 -h 显示安装进度, rpm每执行了2%就会显示一个#号。 如,利用 rpm安装vsftpd程序(在输入软件包名字时可以用Tab键补全)。 使用“rpm –e”命令可以删除一个已经安装过的软件,如将刚才安装的vsftpd删除:[root@localhost ~]# rpm -e vsftpd 三、查询软件包 rpm命令现在主要用来进行软件查询,用到的相关选项是“ -q”(query,查询)。 1、“rpm –q”,查询是否安装了某个软件 例:查询系统中是否已经安装了 httpd和vsftpd软件。 [root@localhost ~]# rpm -q httpd httpd-2.2.15-15.el6_2.1.x86_64 ‘表明已经安装 [root@localhost ~]# rpm -q vsftpd package vsftpd is not installed ‘表明尚未安装 在用“ rpm –q”命令查询时必须指定软件的完整名字,否则将无法查询出正确结果。 如查询系统中是否安装了逻辑卷 lvm的图形化管理工具,输入软件的完整名字“system-config-lvm”可以正确查询,只输入“lvm”则无法查询到结果 2、“rpm –qa”,查询系统中已经安装的所有rpm软件包 例:统计系统中已经安装的 rpm软件包的个数。 [root@localhost ~]# rpm -qa | wc -l 1147 如果只能记住软件的部分名称,就可以使用“ rpm –qa”结合管道符和grep命令来进行查找。 例:查找系统中已经安装的所有跟“ lvm”有关的软件包。 3、“rpm –qi”,查看某个已经安装的软件包的详细信息。 例:查看 httpd软件的详细信息。 4、“rpm –ql”,查看某个软件包将会安装哪些程序文件,并把文件安装到系统的哪个位置。 在Linux中安装软件不像Windows那样可以由用户指定软件安装目录,由于Linux默认的目录结构是固定的,每个默认目录都有专门的分工,所以在Linux中安装软件时,会自动分门别类地向相应的目录中复制对应的程序文件,并进行相关设置。 在 Linux系统中,典型的应用程序通常由以下几部分组成: 普通的可执行程序文件,一般保存在“/usr/bin”目录中,普通用户即可执行。 服务器程序、管理程序文件,一般保存在“/usr/sbin”目录中,需要管理员才能执行。 配置文件,一般保存在“/etc”目录中,配置文件较多时会建立相应的子目录。 日志文件,一般保存在“/var/log”目录中。 关于应用程序的参考文档等数据,一般保存在“/usr/share/doc”目录中。 执行文件及配置文件的man手册,一般保存在“/usr/share/man”目录中。 例:查询 httpd软件在系统的什么位置安装了文件。 [root@localhost ~]# rpm -ql httpd | more 5、“rpm -qf”命令,查询系统中的某个文件是由哪个软件包提供的 当系统中的某个程序文件损坏或丢失时,通过“ rpm -qf”的查询结果,可以重新安装相应的软件包。 下面以一个实例来进行说明。我们首先将 mkdir命令对应的程序文件删除,这样就无法正常执行mkdir命令。然后通过“rpm -qf”命令查询出mkdir程序文件对应的软件包,重新安装相应的软件包,生成mkdir程序文件。 首先用 which命令查找mkdir命令对应的程序文件: [root@localhost ~]# which mkdir /bin/mkdir 查询提供 /bin/mkdir文件的软件包: [root@localhost ~]# rpm -qf /bin/mkdir coreutils-8.4-19.el6.x86_64 将程序文件删除, mkdir命令无法执行: [root@localhost ~]# rm -f /bin/mkdir [root@localhost ~]# mkdir test -bash: mkdir: command not found 安装 coreutils软件包,重新生成/bin/mkdir文件: 注意,在执行“rpm -ivh”命令安装coreutils软件包时,系统提示coreutils已经安装了,所以此时需要加上“—force”选项强制将软件重新安装一遍。 coreutils软件包装完之后,会重新生成程序文件“/bin/mkdir”,mkdir命令就可以正常使用了。 本文转自 yttitan 51CTO博客,原文链接:http://blog.51cto.com/yttitan/1128166

优秀的个人博客,低调大师

Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceB...

声明: 大数据中,最重要的算子操作是:join !!! 典型的transformation和action val nums = sc.parallelize(1 to 10) //根据集合创建RDD map适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 + item) mapped.collect.foreach(println) } }map源码 /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } filter适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 + item) val filtered = nums.filter(item => item%2 == 0) filtered.collect.foreach(println) } } filter源码 /** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true) } flatMap适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 + item) // mapped.collect.foreach(println) val filtered = nums.filter(item => item%2 == 0) // filtered.collect.foreach(println) val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon") val bigDataString = sc.parallelize(bigData) val words = bigDataString.flatMap(line => line.split(" ")) words.collect.foreach(println) sc.stop() } } flatMap源码 /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } 成为大牛,必写的写法 -> groupByKey适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val sc = sparkContext("Transformations Operations") //创建SparkContext // mapTransformation(sc)//map案例 // filterTransformation(sc)//filter案例 // flatMapTransformation(sc)//flatMap案例 groupByKeyTransformation(sc) sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源 } def sparkContext(name:String)={ val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) sc } def mapTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理 mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def filterTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 20) //根据集合创建RDD val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。 filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def flatMapTransformation(sc:SparkContext){ val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon} words.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def groupByKeyTransformation(sc:SparkContext){ val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase")) val dataRDD = sc.parallelize(data) val grouped = dataRDD.groupByKey() grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } } groupByKey源码 ** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. The ordering of elements within * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { groupByKey(new HashPartitioner(numPartitions)) } reduceByKey适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val sc = sparkContext("Transformations Operations") //创建SparkContext // mapTransformation(sc)//map案例 // filterTransformation(sc)//filter案例 // flatMapTransformation(sc)//flatMap案例 // groupByKeyTransformation(sc)//groupByKey案例 reduceByKeyTransformation(sc)//reduceByKey案例 sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源 } def sparkContext(name:String)={ val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) sc } def mapTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理 mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def filterTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 20) //根据集合创建RDD val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。 filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def flatMapTransformation(sc:SparkContext){ val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon} words.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def groupByKeyTransformation(sc:SparkContext){ val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据 val dataRDD = sc.parallelize(data)//根据集合创建RDD val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合 grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def reduceByKeyTransformation(sc:SparkContext){ val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md") val words = lines.flatMap{ line => line.split(" ")} val pairs = words.map { word => (word,1) } val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce) wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印 } } reduceByKey源码 /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) } join适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val sc = sparkContext("Transformations Operations") //创建SparkContext // mapTransformation(sc)//map案例 // filterTransformation(sc)//filter案例 // flatMapTransformation(sc)//flatMap案例 // groupByKeyTransformation(sc)//groupByKey案例 // reduceByKeyTransformation(sc)//reduceByKey案例 joinTransformation(sc)//join案例 sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源 } def sparkContext(name:String)={ val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) sc } def mapTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理 mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def filterTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 20) //根据集合创建RDD val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。 filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def flatMapTransformation(sc:SparkContext){ val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon} words.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def groupByKeyTransformation(sc:SparkContext){ val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据 val dataRDD = sc.parallelize(data)//根据集合创建RDD val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合 grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def reduceByKeyTransformation(sc:SparkContext){ val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md") val words = lines.flatMap{ line => line.split(" ")} val pairs = words.map { word => (word,1) } val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce) wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印 } def joinTransformation(sc:SparkContext){ val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop")) val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65)) val names = sc.parallelize(studentNames) val scores = sc.parallelize(studentScores) val studentNamesAndScores = names.join(scores) studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印 } } join源码 /** * Cartesian join with another [[DataFrame]]. * * Note that cartesian joins are very expensive without an extra filter that can be pushed down. * * @param right Right side of the join operation. * @group dfops * @since 1.3.0 */ def join(right: DataFrame): DataFrame = { Join(logicalPlan, right.logicalPlan, joinType = Inner, None) } /** * Inner equi-join with another [[DataFrame]] using the given column. * * Different from other join functions, the join column will only appear once in the output, * i.e. similar to SQL's `JOIN USING` syntax. * * {{{ * // Joining df1 and df2 using the column "user_id" * df1.join(df2, "user_id") * }}} * * Note that if you perform a self-join using this function without aliasing the input * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. * * @param right Right side of the join operation. * @param usingColumn Name of the column to join on. This column must exist on both sides. * @group dfops * @since 1.4.0 */ def join(right: DataFrame, usingColumn: String): DataFrame = { join(right, Seq(usingColumn)) } /** * Inner equi-join with another [[DataFrame]] using the given columns. * * Different from other join functions, the join columns will only appear once in the output, * i.e. similar to SQL's `JOIN USING` syntax. * * {{{ * // Joining df1 and df2 using the columns "user_id" and "user_name" * df1.join(df2, Seq("user_id", "user_name")) * }}} * * Note that if you perform a self-join using this function without aliasing the input * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since * there is no way to disambiguate which side of the join you would like to reference. * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. * @group dfops * @since 1.4.0 */ def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] // Project only one of the join columns. val joinedCols = usingColumns.map(col => joined.right.resolve(col)) val condition = usingColumns.map { col => catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col)) }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) => catalyst.expressions.And(cond, eqTo) } Project( joined.output.filterNot(joinedCols.contains(_)), Join( joined.left, joined.right, joinType = Inner, condition) ) } /** * Inner join with another [[DataFrame]], using the given join expression. * * {{{ * // The following two are equivalent: * df1.join(df2, $"df1Key" === $"df2Key") * df1.join(df2).where($"df1Key" === $"df2Key") * }}} * @group dfops * @since 1.3.0 */ def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner") /** * Join with another [[DataFrame]], using the given join expression. The following performs * a full outer join between `df1` and `df2`. * * {{{ * // Scala: * import org.apache.spark.sql.functions._ * df1.join(df2, $"df1Key" === $"df2Key", "outer") * * // Java: * import static org.apache.spark.sql.functions.*; * df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer"); * }}} * * @param right Right side of the join. * @param joinExprs Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @group dfops * @since 1.3.0 */ def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { // Note that in this function, we introduce a hack in the case of self-join to automatically // resolve ambiguous join conditions into ones that might make sense [SPARK-6231]. // Consider this case: df.join(df, df("key") === df("key")) // Since df("key") === df("key") is a trivially true condition, this actually becomes a // cartesian join. However, most likely users expect to perform a self join using "key". // With that assumption, this hack turns the trivially true condition into equality on join // keys that are resolved to both sides. // Trigger analysis so in the case of self-join, the analyzer will clone the plan. // After the cloning, left and right side will have distinct expression ids. val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) .queryExecution.analyzed.asInstanceOf[Join] // If auto self join alias is disabled, return the plan. if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) { return plan } // If left/right have no output set intersection, return the plan. val lanalyzed = this.logicalPlan.queryExecution.analyzed val ranalyzed = right.logicalPlan.queryExecution.analyzed if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) { return plan } // Otherwise, find the trivially true predicates and automatically resolves them to both sides. // By the time we get here, since we have already run analysis, all attributes should've been // resolved and become AttributeReference. val cond = plan.condition.map { _.transform { case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name)) }} plan.copy(condition = cond) } cogroup的scala版,适用于 package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/27. */ object Transformations { def main(args: Array[String]) { val sc = sparkContext("Transformations Operations") //创建SparkContext // mapTransformation(sc)//map案例 // filterTransformation(sc)//filter案例 // flatMapTransformation(sc)//flatMap案例 // groupByKeyTransformation(sc)//groupByKey案例 // reduceByKeyTransformation(sc)//reduceByKey案例 // joinTransformation(sc)//join案例 cogroupTransformation(sc)//cogroup案例 sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源 } def sparkContext(name:String)={ val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) sc } def mapTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 10) //根据集合创建RDD val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理 mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def filterTransformation(sc:SparkContext){ val nums = sc.parallelize(1 to 20) //根据集合创建RDD val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。 filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def flatMapTransformation(sc:SparkContext){ val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon} words.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def groupByKeyTransformation(sc:SparkContext){ val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据 val dataRDD = sc.parallelize(data)//根据集合创建RDD val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合 grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def reduceByKeyTransformation(sc:SparkContext){ val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md") val words = lines.flatMap{ line => line.split(" ")} val pairs = words.map { word => (word,1) } val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce) wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印 } def joinTransformation(sc:SparkContext){ val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop")) val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65)) val names = sc.parallelize(studentNames) val scores = sc.parallelize(studentScores) val studentNamesAndScores = names.join(scores) studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印 } def cogroupTransformation(sc:SparkContext){ val namesLists = Array(Tuple2(1,"xiaoming"),Tuple2(2,"xiaozhou"),Tuple2(3,"xiaoliu")) val scoresLists = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,85),Tuple2(1,75),Tuple2(2,65),Tuple2(3,55)) val names = sc.parallelize(namesLists) val scores = sc.parallelize(scoresLists) val namesListsAndScores = names.cogroup(scores) namesListsAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印 } } cogroup源码 /** * For each key k in `this` or `other1` or `other2` or `other3`, * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) cg.mapValues { case Array(vs, w1s, w2s, w3s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W1]], w2s.asInstanceOf[Iterable[W2]], w3s.asInstanceOf[Iterable[W3]]) } } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Array(vs, w1s, w2s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W1]], w2s.asInstanceOf[Iterable[W2]]) } } /** * For each key k in `this` or `other1` or `other2` or `other3`, * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other)) } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W]( other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, new HashPartitioner(numPartitions)) } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** * For each key k in `this` or `other1` or `other2` or `other3`, * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, new HashPartitioner(numPartitions)) } 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913846.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理。即若没被处理的,得让我们知道。 public void nextTuple() { num++; System.out.println("spout:"+num); int messageid = num; //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段 //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护 this.collector.emit(new Values(num),messageid); Utils.sleep(1000); } 编写代码 StormTopologyAcker.java package zhouls.bigdata.stormDemo; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class StormTopologyAcker { public static class MySpout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.collector = collector; this.context = context; } int num = 0; public void nextTuple() { num++; System.out.println("spout:"+num); int messageid = num; //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段 //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护 this.collector.emit(new Values(num),messageid); Utils.sleep(1000); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } @Override public void ack(Object msgId) { System.out.println("处理成功!"+msgId); } @Override public void fail(Object msgId) { System.out.println("处理失败!"+msgId); //TODO 可以吧这个数据单独记录下来 } } public static class MyBolt extends BaseRichBolt{ private Map stormConf; private TopologyContext context; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf = stormConf; this.context = context; this.collector = collector; } int sum = 0; public void execute(Tuple input) { try{ Integer num = input.getIntegerByField("num"); sum += num; System.out.println("sum="+sum); this.collector.ack(input); }catch(Exception e){ this.collector.fail(input); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spout_id = MySpout.class.getSimpleName(); String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout()); topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id); Config config = new Config(); config.setMaxSpoutPending(1000);//如果设置了这个参数,必须要保证开启了acker机制才有效 String topology_name = StormTopologyAcker.class.getSimpleName(); if(args.length==0){ //在本地运行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology()); }else{ //在集群运行 try { StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } } } 停掉,我们复制粘贴来分析分析 16244 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:host.name=WIN-BQOBV63OBNM 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.version=1.8.0_66 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.home=C:\Program Files\Java\jre1.8.0_66 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.class.path=D:\Code\eclipseMarsCode\stormDemo\target\classes;D:\SoftWare\maven\repository\org\apache\storm\storm-core\1.0.2\storm-core-1.0.2.jar;D:\SoftWare\maven\repository\com\esotericsoftware\kryo\3.0.3\kryo-3.0.3.jar;D:\SoftWare\maven\repository\com\esotericsoftware\reflectasm\1.10.1\reflectasm-1.10.1.jar;D:\SoftWare\maven\repository\org\ow2\asm\asm\5.0.3\asm-5.0.3.jar;D:\SoftWare\maven\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;D:\SoftWare\maven\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\SoftWare\maven\repository\org\clojure\clojure\1.7.0\clojure-1.7.0.jar;D:\SoftWare\maven\repository\com\lmax\disruptor\3.3.2\disruptor-3.3.2.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-api\2.1\log4j-api-2.1.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-core\2.1\log4j-core-2.1.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-slf4j-impl\2.1\log4j-slf4j-impl-2.1.jar;D:\SoftWare\maven\repository\org\slf4j\log4j-over-slf4j\1.6.6\log4j-over-slf4j-1.6.6.jar;D:\SoftWare\maven\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;D:\SoftWare\maven\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.library.path=C:\Program Files\Java\jre1.8.0_66\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files/Java/jre1.8.0_66/bin/server;C:/Program Files/Java/jre1.8.0_66/bin;C:/Program Files/Java/jre1.8.0_66/lib/amd64;%WEKA39_HOME%\lib\mysql-connector-java-5.1.21-bin.jar;%WEKA37_HOME%\lib\mysql-connector-java-5.1.21-bin.jar;C:\Program Files\Java\jdk1.8.0_66\jre\lib\ext\mysql-connector-java-5.1.21-bin.jar;C:\ProgramData\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;D:\SoftWare\MATLAB R2013a\runtime\win64;D:\SoftWare\MATLAB R2013a\bin;C:\Program Files (x86)\IDM Computer Solutions\UltraCompare;C:\Program Files\Java\jdk1.8.0_66\bin;C:\Program Files\Java\jdk1.8.0_66\jre\bin;D:\SoftWare\apache-ant-1.9.0\bin;HADOOP_HOME\bin;D:\SoftWare\apache-maven-3.3.9\bin;D:\SoftWare\Scala\bin;D:\SoftWare\Scala\jre\bin;%MYSQL_HOME\bin;D:\SoftWare\MySQL\mysql-5.7.11-winx64;;D:\SoftWare\apache-tomcat-7.0.69\bin;%C:\Windows\System32;%C:\Windows\SysWOW64;;D:\SoftWare\apache-maven-3.3.9\bin;D:\SoftWare\apache-tomcat-7.0.69\bin;D:\SoftWare\apache-tomcat-7.0.69\bin;D:\SoftWare\Anaconda2;D:\SoftWare\Anaconda2\Scripts;D:\SoftWare\Anaconda2\Library\bin;D:\SoftWare\MySQL Server\MySQL Server 5.0\bin;D:\SoftWare\Python\Python36\Scripts\;D:\SoftWare\Python\Python36\;D:\SoftWare\SSH Secure Shell;D:\SoftWare\eclipse;;. 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\ 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA> 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Windows 7 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=amd64 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=6.1 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=Administrator 16246 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.home=C:\Users\Administrator 16247 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.dir=D:\Code\eclipseMarsCode\stormDemo 16320 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir C:\Users\ADMINI~1\AppData\Local\Temp\0d2c165c-61e3-45ea-957f-8ecaeea3b694\version-2 snapdir C:\Users\ADMINI~1\AppData\Local\Temp\0d2c165c-61e3-45ea-957f-8ecaeea3b694\version-2 16626 [main] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000 16630 [main] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2001 16637 [main] INFO o.a.s.zookeeper - Starting inprocess zookeeper at port 2001 and dir C:\Users\ADMINI~1\AppData\Local\Temp\0d2c165c-61e3-45ea-957f-8ecaeea3b694 16848 [main] INFO o.a.s.d.nimbus - Starting Nimbus with conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2001, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" false, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\d7cdc68c-f54c-4677-ac0e-d73c3b2effb3", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "Administrator", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" [6700 6701 6702 6703], "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["localhost"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "storm.blobstore.acl.validation.enabled" false, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "ui.http.x-frame-options" "DENY", "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600} 17556 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 17612 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 17613 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:host.name=WIN-BQOBV63OBNM 17613 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.version=1.8.0_66 17613 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.vendor=Oracle Corporation 17613 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.home=C:\Program Files\Java\jre1.8.0_66 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.class.path=D:\Code\eclipseMarsCode\stormDemo\target\classes;D:\SoftWare\maven\repository\org\apache\storm\storm-core\1.0.2\storm-core-1.0.2.jar;D:\SoftWare\maven\repository\com\esotericsoftware\kryo\3.0.3\kryo-3.0.3.jar;D:\SoftWare\maven\repository\com\esotericsoftware\reflectasm\1.10.1\reflectasm-1.10.1.jar;D:\SoftWare\maven\repository\org\ow2\asm\asm\5.0.3\asm-5.0.3.jar;D:\SoftWare\maven\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;D:\SoftWare\maven\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\SoftWare\maven\repository\org\clojure\clojure\1.7.0\clojure-1.7.0.jar;D:\SoftWare\maven\repository\com\lmax\disruptor\3.3.2\disruptor-3.3.2.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-api\2.1\log4j-api-2.1.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-core\2.1\log4j-core-2.1.jar;D:\SoftWare\maven\repository\org\apache\logging\log4j\log4j-slf4j-impl\2.1\log4j-slf4j-impl-2.1.jar;D:\SoftWare\maven\repository\org\slf4j\log4j-over-slf4j\1.6.6\log4j-over-slf4j-1.6.6.jar;D:\SoftWare\maven\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;D:\SoftWare\maven\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.library.path=C:\Program Files\Java\jre1.8.0_66\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files/Java/jre1.8.0_66/bin/server;C:/Program Files/Java/jre1.8.0_66/bin;C:/Program Files/Java/jre1.8.0_66/lib/amd64;%WEKA39_HOME%\lib\mysql-connector-java-5.1.21-bin.jar;%WEKA37_HOME%\lib\mysql-connector-java-5.1.21-bin.jar;C:\Program Files\Java\jdk1.8.0_66\jre\lib\ext\mysql-connector-java-5.1.21-bin.jar;C:\ProgramData\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;D:\SoftWare\MATLAB R2013a\runtime\win64;D:\SoftWare\MATLAB R2013a\bin;C:\Program Files (x86)\IDM Computer Solutions\UltraCompare;C:\Program Files\Java\jdk1.8.0_66\bin;C:\Program Files\Java\jdk1.8.0_66\jre\bin;D:\SoftWare\apache-ant-1.9.0\bin;HADOOP_HOME\bin;D:\SoftWare\apache-maven-3.3.9\bin;D:\SoftWare\Scala\bin;D:\SoftWare\Scala\jre\bin;%MYSQL_HOME\bin;D:\SoftWare\MySQL\mysql-5.7.11-winx64;;D:\SoftWare\apache-tomcat-7.0.69\bin;%C:\Windows\System32;%C:\Windows\SysWOW64;;D:\SoftWare\apache-maven-3.3.9\bin;D:\SoftWare\apache-tomcat-7.0.69\bin;D:\SoftWare\apache-tomcat-7.0.69\bin;D:\SoftWare\Anaconda2;D:\SoftWare\Anaconda2\Scripts;D:\SoftWare\Anaconda2\Library\bin;D:\SoftWare\MySQL Server\MySQL Server 5.0\bin;D:\SoftWare\Python\Python36\Scripts\;D:\SoftWare\Python\Python36\;D:\SoftWare\SSH Secure Shell;D:\SoftWare\eclipse;;. 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\ 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:java.compiler=<NA> 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:os.name=Windows 7 17614 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:os.arch=amd64 17615 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:os.version=6.1 17615 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:user.name=Administrator 17615 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:user.home=C:\Users\Administrator 17615 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Client environment:user.dir=D:\Code\eclipseMarsCode\stormDemo 17620 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@6824b913 17786 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 17796 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60042 17801 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 17915 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60042 17938 [SyncThread:0] INFO o.a.s.s.o.a.z.s.p.FileTxnLog - Creating new log file: log.1 17963 [main] INFO o.a.s.b.FileBlobStoreImpl - Creating new blob store based in C:\Users\ADMINI~1\AppData\Local\Temp\d7cdc68c-f54c-4677-ac0e-d73c3b2effb3\blobs 18047 [main] INFO o.a.s.d.nimbus - Using default scheduler 18138 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 18140 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820000 with negotiated timeout 20000 for client /127.0.0.1:60042 18141 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820000, negotiated timeout = 20000 18187 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@21a9a705 18279 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 18282 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60045 18282 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 18283 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60045 18327 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 18388 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820001 with negotiated timeout 20000 for client /127.0.0.1:60045 18389 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820001, negotiated timeout = 20000 18393 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 18395 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 18459 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 18460 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@753fd7a1 18536 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 18538 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60048 18538 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 18540 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60048 18653 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820002 with negotiated timeout 20000 for client /127.0.0.1:60048 18653 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820002, negotiated timeout = 20000 18654 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 18655 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 19007 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 19035 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b848820002 19078 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b848820002 closed 19078 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 19082 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60048 which had sessionid 0x15d87b848820002 19084 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 19085 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@70025b99 19091 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 19092 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 19093 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60051 19093 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60051 19095 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 19096 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@3ba3d4b6 19103 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 19105 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 19105 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60054 19106 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60054 19108 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820003 with negotiated timeout 20000 for client /127.0.0.1:60051 19111 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820003, negotiated timeout = 20000 19112 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 19180 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820004 with negotiated timeout 20000 for client /127.0.0.1:60054 19180 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820004, negotiated timeout = 20000 19182 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 19990 [main] INFO o.a.s.zookeeper - Queued up for leader lock. 20060 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x15d87b848820001 type:create cxid:0x1 zxid:0x12 txntype:-1 reqpath:n/a Error Path:/storm/leader-lock Error:KeeperErrorCode = NoNode for /storm/leader-lock 20249 [Curator-Framework-0] WARN o.a.s.s.o.a.c.u.ZKPaths - The version of ZooKeeper being used doesn't support Container nodes. CreateMode.PERSISTENT will be used instead. 20504 [main] INFO o.a.s.d.m.MetricsUtils - Using statistics reporter plugin:org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter 20508 [main] INFO o.a.s.d.m.r.JmxPreparableReporter - Preparing... 20616 [main-EventThread] INFO o.a.s.zookeeper - WIN-BQOBV63OBNM gained leadership, checking if it has all the topology code locally. 20621 [main] INFO o.a.s.d.common - Started statistics report plugin... 20693 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 20695 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@1b9d9a2b 20760 [main-EventThread] INFO o.a.s.zookeeper - active-topology-ids [] local-topology-ids [] diff-topology [] 20760 [main-EventThread] INFO o.a.s.zookeeper - Accepting leadership, all active topology found localy. 20769 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 20771 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 20771 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60057 20773 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60057 20857 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820005 with negotiated timeout 20000 for client /127.0.0.1:60057 20858 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820005, negotiated timeout = 20000 20858 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 20859 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 20867 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 20870 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b848820005 20903 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60057 which had sessionid 0x15d87b848820005 20907 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b848820005 closed 20909 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 20911 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@7c281eb8 20921 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 20934 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 20941 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@4a8ffd75 20994 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 20995 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 20995 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60062 20996 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60062 20998 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 21002 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60063 21003 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 21005 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60063 21065 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820006, negotiated timeout = 20000 21065 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820006 with negotiated timeout 20000 for client /127.0.0.1:60062 21070 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 21086 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820007, negotiated timeout = 20000 21087 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820007 with negotiated timeout 20000 for client /127.0.0.1:60063 21089 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 21089 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 21100 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 21107 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b848820007 21131 [Curator-ConnectionStateManager-0] WARN o.a.s.s.o.a.c.f.s.ConnectionStateManager - There are no ConnectionStateListeners registered. 21151 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b848820007 closed 21154 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 21158 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@3e05586b 21153 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] WARN o.a.s.s.o.a.z.s.NIOServerCnxn - caught end of stream exception org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15d87b848820007, likely client has closed socket at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.0.2.jar:1.0.2] at java.lang.Thread.run(Unknown Source) [?:1.8.0_66] 21180 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60063 which had sessionid 0x15d87b848820007 21184 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 21203 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 21205 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 21205 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60066 21206 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60066 21261 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820008, negotiated timeout = 20000 21261 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820008 with negotiated timeout 20000 for client /127.0.0.1:60066 21272 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 21408 [main] INFO o.a.s.d.supervisor - Starting Supervisor with conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2001, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" false, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\fc1c162d-e299-4a9e-8599-3ca874fdcb76", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "Administrator", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" (1024 1025 1026), "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["localhost"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "storm.blobstore.acl.validation.enabled" false, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "ui.http.x-frame-options" "DENY", "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600} 21952 [main] INFO o.a.s.l.Localizer - Reconstruct localized resource: C:\Users\ADMINI~1\AppData\Local\Temp\fc1c162d-e299-4a9e-8599-3ca874fdcb76\supervisor\usercache 21952 [main] WARN o.a.s.l.Localizer - No left over resources found for any user during reconstructing of local resources at: C:\Users\ADMINI~1\AppData\Local\Temp\fc1c162d-e299-4a9e-8599-3ca874fdcb76\supervisor\usercache 21963 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 21964 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@294aba23 21977 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 21978 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 21979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60069 21980 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60069 22009 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820009 with negotiated timeout 20000 for client /127.0.0.1:60069 22009 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820009, negotiated timeout = 20000 22010 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 22011 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 22016 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 22018 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b848820009 22054 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b848820009 closed 22054 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60069 which had sessionid 0x15d87b848820009 22055 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 22056 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 22056 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@5f5827d0 22064 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 22065 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 22065 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60072 22066 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60072 22084 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000a with negotiated timeout 20000 for client /127.0.0.1:60072 22084 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000a, negotiated timeout = 20000 22084 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 22217 [main] INFO o.a.s.d.supervisor - Starting supervisor with id a8e2250d-ca68-48bb-bb91-53a2e7ba9080 at host WIN-BQOBV63OBNM 22224 [main] INFO o.a.s.d.supervisor - Starting Supervisor with conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2001, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" false, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\afb39ee7-3111-41c1-9c53-fc75cec05822", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "Administrator", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" (1027 1028 1029), "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["localhost"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "storm.blobstore.acl.validation.enabled" false, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "ui.http.x-frame-options" "DENY", "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600} 22233 [main] INFO o.a.s.l.Localizer - Reconstruct localized resource: C:\Users\ADMINI~1\AppData\Local\Temp\afb39ee7-3111-41c1-9c53-fc75cec05822\supervisor\usercache 22234 [main] WARN o.a.s.l.Localizer - No left over resources found for any user during reconstructing of local resources at: C:\Users\ADMINI~1\AppData\Local\Temp\afb39ee7-3111-41c1-9c53-fc75cec05822\supervisor\usercache 22236 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 22237 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@c689973 22242 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 22243 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 22243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60075 22243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60075 22321 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000b with negotiated timeout 20000 for client /127.0.0.1:60075 22321 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000b, negotiated timeout = 20000 22322 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 22322 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 22325 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 22327 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b84882000b 22355 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b84882000b closed 22357 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 22357 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60075 which had sessionid 0x15d87b84882000b 22358 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 22389 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@2b148329 22407 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 22409 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 22410 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60078 22411 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60078 22445 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000c, negotiated timeout = 20000 22445 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 22445 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000c with negotiated timeout 20000 for client /127.0.0.1:60078 22592 [main] INFO o.a.s.d.supervisor - Starting supervisor with id 22a78b3c-3d5f-4bac-9d9f-392742596655 at host WIN-BQOBV63OBNM 22757 [main] INFO o.a.s.l.ThriftAccessLogger - Request ID: 1 access from: principal: operation: submitTopology 23083 [main] INFO o.a.s.d.nimbus - Received topology submission for StormTopologyAcker with conf {"topology.max.task.parallelism" nil, "topology.submitter.principal" "", "topology.acker.executors" nil, "topology.eventlogger.executors" 0, "topology.max.spout.pending" 1000, "storm.zookeeper.superACL" nil, "topology.users" (), "topology.submitter.user" "Administrator", "topology.kryo.register" nil, "topology.kryo.decorators" (), "storm.id" "StormTopologyAcker-1-1501220593", "topology.name" "StormTopologyAcker"} 23345 [main] INFO o.a.s.d.nimbus - uploadedJar 23412 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 23413 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@460b50df 23421 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 23422 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 23422 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60081 23423 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60081 23455 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000d with negotiated timeout 20000 for client /127.0.0.1:60081 23456 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000d, negotiated timeout = 20000 23456 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 23461 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x15d87b84882000d type:create cxid:0x2 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/storm/blobstoremaxkeysequencenumber Error:KeeperErrorCode = NoNode for /storm/blobstoremaxkeysequencenumber 23584 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 23587 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b84882000d 23636 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b84882000d closed 23636 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60081 which had sessionid 0x15d87b84882000d 23636 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 23638 [main] INFO o.a.s.cluster - setup-path/blobstore/StormTopologyAcker-1-1501220593-stormconf.ser/WIN-BQOBV63OBNM:6627-1 23806 [main] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 23807 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@2cd388f5 23816 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 23818 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 23818 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60084 23818 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60084 23880 [main-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000e, negotiated timeout = 20000 23880 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000e with negotiated timeout 20000 for client /127.0.0.1:60084 23881 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 23947 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 23950 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b84882000e 23990 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b84882000e closed 23991 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 23993 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] WARN o.a.s.s.o.a.z.s.NIOServerCnxn - caught end of stream exception org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15d87b84882000e, likely client has closed socket at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.0.2.jar:1.0.2] at java.lang.Thread.run(Unknown Source) [?:1.8.0_66] 23993 [main] INFO o.a.s.cluster - setup-path/blobstore/StormTopologyAcker-1-1501220593-stormcode.ser/WIN-BQOBV63OBNM:6627-1 23994 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60084 which had sessionid 0x15d87b84882000e 24096 [main] INFO o.a.s.d.nimbus - desired replication count 1 achieved, current-replication-count for conf key = 1, current-replication-count for code key = 1, current-replication-count for jar key = 1 24396 [main] INFO o.a.s.d.nimbus - Activating StormTopologyAcker: StormTopologyAcker-1-1501220593 32768 [timer] INFO o.a.s.s.EvenScheduler - Available slots: (["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1024] ["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1025] ["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1026] ["22a78b3c-3d5f-4bac-9d9f-392742596655" 1027] ["22a78b3c-3d5f-4bac-9d9f-392742596655" 1028] ["22a78b3c-3d5f-4bac-9d9f-392742596655" 1029]) 32878 [timer] INFO o.a.s.d.nimbus - Setting new assignment for topology id StormTopologyAcker-1-1501220593: #org.apache.storm.daemon.common.Assignment{:master-code-dir "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\d7cdc68c-f54c-4677-ac0e-d73c3b2effb3", :node->host {"a8e2250d-ca68-48bb-bb91-53a2e7ba9080" "WIN-BQOBV63OBNM"}, :executor->node+port {[2 2] ["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1024], [1 1] ["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1024], [3 3] ["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1024]}, :executor->start-time-secs {[1 1] 1501220603, [2 2] 1501220603, [3 3] 1501220603}, :worker->resources {["a8e2250d-ca68-48bb-bb91-53a2e7ba9080" 1024] [0.0 0.0 0.0]}} 33108 [Thread-7] INFO o.a.s.d.supervisor - Downloading code for storm id StormTopologyAcker-1-1501220593 33116 [Thread-7] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 33117 [Thread-7] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@284a8f8c 33127 [Thread-7] INFO o.a.s.b.FileBlobStoreImpl - Creating new blob store based in C:\Users\ADMINI~1\AppData\Local\Temp\d7cdc68c-f54c-4677-ac0e-d73c3b2effb3\blobs 33143 [Thread-7-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 33145 [Thread-7-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 33145 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60087 33146 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60087 33202 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b84882000f with negotiated timeout 20000 for client /127.0.0.1:60087 33202 [Thread-7-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b84882000f, negotiated timeout = 20000 33203 [Thread-7-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 33261 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 33266 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b84882000f 33322 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] WARN o.a.s.s.o.a.z.s.NIOServerCnxn - caught end of stream exception org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15d87b84882000f, likely client has closed socket at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.0.2.jar:1.0.2] at java.lang.Thread.run(Unknown Source) [?:1.8.0_66] 33323 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60087 which had sessionid 0x15d87b84882000f 33325 [Thread-7] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b84882000f closed 33344 [Thread-7-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 34504 [Thread-7] INFO o.a.s.d.supervisor - Finished downloading code for storm id StormTopologyAcker-1-1501220593 34591 [Thread-8] INFO o.a.s.d.supervisor - Launching worker with assignment {:storm-id "StormTopologyAcker-1-1501220593", :executors [[2 2] [1 1] [3 3]], :resources #object[org.apache.storm.generated.WorkerResources 0x53f66d42 "WorkerResources(mem_on_heap:0.0, mem_off_heap:0.0, cpu:0.0)"]} for this supervisor a8e2250d-ca68-48bb-bb91-53a2e7ba9080 on port 1024 with id 715c933e-5f3a-4a1c-b2f3-b4ace693b7fc 34611 [Thread-8] INFO o.a.s.d.worker - Launching worker for StormTopologyAcker-1-1501220593 on a8e2250d-ca68-48bb-bb91-53a2e7ba9080:1024 with id 715c933e-5f3a-4a1c-b2f3-b4ace693b7fc and conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2001, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" false, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\fc1c162d-e299-4a9e-8599-3ca874fdcb76", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "Administrator", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" (1024 1025 1026), "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["localhost"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "storm.blobstore.acl.validation.enabled" false, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "ui.http.x-frame-options" "DENY", "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600} 34626 [Thread-8] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 34629 [Thread-8] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@fe8f4c9 34651 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 34652 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60090 34653 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 34653 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60090 34789 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820010 with negotiated timeout 20000 for client /127.0.0.1:60090 34790 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820010, negotiated timeout = 20000 34790 [Thread-8-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 34791 [Thread-8-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none 34793 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting 34795 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d87b848820010 34935 [Thread-8] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d87b848820010 closed 34935 [Thread-8-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down 34936 [Thread-8] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting 34936 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] WARN o.a.s.s.o.a.z.s.NIOServerCnxn - caught end of stream exception org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15d87b848820010, likely client has closed socket at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.0.2.jar:1.0.2] at java.lang.Thread.run(Unknown Source) [?:1.8.0_66] 34936 [Thread-8] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2001/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@12401e6d 34937 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:60090 which had sessionid 0x15d87b848820010 34940 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2001. Will not attempt to authenticate using SASL (unknown error) 34941 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2001, initiating session 34941 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60093 34942 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60093 34965 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x15d87b848820011 with negotiated timeout 20000 for client /127.0.0.1:60093 34965 [Thread-8-SendThread(127.0.0.1:2001)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2001, sessionid = 0x15d87b848820011, negotiated timeout = 20000 34965 [Thread-8-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 34974 [Thread-8] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 34978 [Thread-8] INFO o.a.s.d.worker - Reading Assignments. 35151 [Thread-8] INFO o.a.s.d.worker - Registering IConnectionCallbacks for a8e2250d-ca68-48bb-bb91-53a2e7ba9080:1024 35230 [Thread-8] INFO o.a.s.d.executor - Loading executor MySpout:[2 2] 35300 [Thread-8] INFO o.a.s.d.executor - Loaded executor tasks MySpout:[2 2] 35774 [Thread-8] INFO o.a.s.d.executor - Finished loading executor MySpout:[2 2] 35800 [Thread-8] INFO o.a.s.d.executor - Loading executor __acker:[3 3] 35805 [Thread-8] INFO o.a.s.d.executor - Loaded executor tasks __acker:[3 3] 35846 [Thread-8] INFO o.a.s.d.executor - Timeouts disabled for executor __acker:[3 3] 35846 [Thread-8] INFO o.a.s.d.executor - Finished loading executor __acker:[3 3] 35855 [Thread-8] INFO o.a.s.d.executor - Loading executor MyBolt:[1 1] 35856 [Thread-8] INFO o.a.s.d.executor - Loaded executor tasks MyBolt:[1 1] 35859 [Thread-8] INFO o.a.s.d.executor - Finished loading executor MyBolt:[1 1] 35872 [Thread-8] INFO o.a.s.d.executor - Loading executor __system:[-1 -1] 35874 [Thread-8] INFO o.a.s.d.executor - Loaded executor tasks __system:[-1 -1] 35879 [Thread-8] INFO o.a.s.d.executor - Finished loading executor __system:[-1 -1] 35899 [Thread-8] INFO o.a.s.d.worker - Started with log levels: {"" #object[org.apache.logging.log4j.Level 0x2c9c9097 "INFO"], "org.apache.zookeeper" #object[org.apache.logging.log4j.Level 0xd4f4340 "WARN"]} 35912 [Thread-8] INFO o.a.s.d.worker - Worker has topology config {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2001, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" false, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "topology.submitter.principal" "", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "C:\\Users\\ADMINI~1\\AppData\\Local\\Temp\\d7cdc68c-f54c-4677-ac0e-d73c3b2effb3", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" 1000, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "storm.zookeeper.superACL" nil, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "Administrator", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" [6700 6701 6702 6703], "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "topology.users" [], "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.submitter.user" "Administrator", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "topology.kryo.register" nil, "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["localhost"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "storm.blobstore.acl.validation.enabled" false, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "ui.http.x-frame-options" "DENY", "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "topology.kryo.decorators" [], "storm.id" "StormTopologyAcker-1-1501220593", "topology.name" "StormTopologyAcker", "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600} 35912 [Thread-8] INFO o.a.s.d.worker - Worker 715c933e-5f3a-4a1c-b2f3-b4ace693b7fc for storm StormTopologyAcker-1-1501220593 on a8e2250d-ca68-48bb-bb91-53a2e7ba9080:1024 has finished loading 35913 [Thread-8] INFO o.a.s.config - SET worker-user 715c933e-5f3a-4a1c-b2f3-b4ace693b7fc 36092 [refresh-active-timer] INFO o.a.s.d.worker - All connections are ready for worker a8e2250d-ca68-48bb-bb91-53a2e7ba9080:1024 with id 715c933e-5f3a-4a1c-b2f3-b4ace693b7fc 36199 [Thread-14-MySpout-executor[2 2]] INFO o.a.s.d.executor - Opening spout MySpout:(2) 36206 [Thread-14-MySpout-executor[2 2]] INFO o.a.s.d.executor - Opened spout MySpout:(2) 36211 [Thread-14-MySpout-executor[2 2]] INFO o.a.s.d.executor - Activating spout MySpout:(2) spout:1 36215 [Thread-18-MyBolt-executor[1 1]] INFO o.a.s.d.executor - Preparing bolt MyBolt:(1) 36221 [Thread-18-MyBolt-executor[1 1]] INFO o.a.s.d.executor - Prepared bolt MyBolt:(1) 36315 [Thread-20-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1) 36334 [Thread-20-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1) 36335 [Thread-16-__acker-executor[3 3]] INFO o.a.s.d.executor - Preparing bolt __acker:(3) 36337 [Thread-16-__acker-executor[3 3]] INFO o.a.s.d.executor - Prepared bolt __acker:(3) sum=1 处理成功!1 spout:2 sum=3 处理成功!2 spout:3 sum=6 处理成功!3 spout:4 sum=10 处理成功!4 spout:5 sum=15 处理成功!5 spout:6 sum=21 处理成功!6 spout:7 sum=28 处理成功!7 spout:8 sum=36 处理成功!8 spout:9 sum=45 处理成功!9 spout:10 sum=55 处理成功!10 spout:11 sum=66 处理成功!11 spout:12 sum=78 处理成功!12 spout:13 sum=91 处理成功!13 spout:14 sum=105 处理成功!14 spout:15 sum=120 处理成功!15 spout:16 sum=136 处理成功!16 spout:17 sum=153 处理成功!17 spout:18 sum=171 处理成功!18 spout:19 sum=190 处理成功!19 spout:20 sum=210 处理成功!20 spout:21 sum=231 处理成功!21 spout:22 sum=253 处理成功!22 spout:23 sum=276 处理成功!23 spout:24 sum=300 处理成功!24 spout:25 sum=325 处理成功!25 spout:26 sum=351 处理成功!26 spout:27 sum=378 处理成功!27 spout:28 sum=406 处理成功!28 spout:29 sum=435 处理成功!29 spout:30 sum=465 处理成功!30 spout:31 sum=496 处理成功!31 spout:32 sum=528 处理成功!32 spout:33 sum=561 处理成功!33 spout:34 sum=595 处理成功!34 spout:35 sum=630 处理成功!35 spout:36 sum=666 处理成功!36 spout:37 sum=703 处理成功!37 spout:38 sum=741 处理成功!38 spout:39 sum=780 处理成功!39 spout:40 sum=820 处理成功!40 spout:41 sum=861 处理成功!41 spout:42 sum=903 处理成功!42 spout:43 sum=946 处理成功!43 spout:44 sum=990 处理成功!44 spout:45 sum=1035 处理成功!45 spout:46 sum=1081 处理成功!46 spout:47 sum=1128 处理成功!47 spout:48 sum=1176 处理成功!48 spout:49 sum=1225 处理成功!49 spout:50 sum=1275 处理成功!50 spout:51 sum=1326 处理成功!51 spout:52 sum=1378 处理成功!52 spout:53 sum=1431 处理成功!53 spout:54 sum=1485 处理成功!54 spout:55 sum=1540 处理成功!55 spout:56 sum=1596 处理成功!56 spout:57 sum=1653

优秀的个人博客,低调大师

SQL Server 2008入门系列之设置数据库服务器的访问权限

1、SQL Server的身份验证模式 在SQL Server中身份验证的方式分为两种,及Windows身份验证和混合身份验证。 Windows身份验证模式:Windows身份验证模式是通过Windows用户验证连接SQL Server服务器的,Windows用户或组被映射到SQL Server的登录账户。 混合身份验证模式:混合身份验证模式也称SQL Server和Windows身份验证模式,它允许用户使用Windows身份验证或SQL Server身份验证进行连接SQL Server数据库。 我们可以在SQL Server Management Studio中修改SQL Server的身份验证模式,右击SQL Server实例,选择属性,在安全性选项中可以选择SQL Server的身份验证模式。 2、创建SQL Server的登录用户。 在SQL Server Management Studio中展开“安全性”——“登录名”右击选择新建登录名即可创建SQL Server的登录用户,创建用户时可以选择SQL Server的身份验证模式以及密码的强制策略。 在右边的“服务器角色”中可以选择用户映射的服务器角色,服务器角色拥有大多数常用管理功能,使管理员可以很方便地给用户授予权限,它们作用于整个服务器,而不是单独的哪个数据库。 固定服务器角色及其描述如下所示:(注意:服务器角色不可以添加或删除) 在右边的“用户映射中”可以为登录用户创建一个隶属于某个数据库的数据库用户。 3、创建数据库用户 实际上,数据库用户是映射到登录账户上的,如下:查看刚才我们创建登录用户时创建的数据库用户。 我们也可以为一个数据库创建多个数据库用户,但是需要映射多个登录名,如下创建一个数据库用户。在指定的数据库中展开“安全性”——“用户名”右击新建用户即可,创建时需要指定用户的架构以及数据库的角色。 固定数据库角色是一组SQL Server预定义的数据库角色,具有数据库级别的管理权力,用以完成常用的数据库任务。如下是固定数据库角色的解释说明。 4、设置数据库对象访问权限 我们可以把SQL Server想象为一栋大楼,要进入到这栋大楼需要一把钥匙(无论是金属钥匙还是电子卡),这就是登录账户,而这把钥匙的类型主要取决于锁(即身份验证模式)的类型。数据库则可以想象为大楼里的房间,进入大楼之后却不可以访问每个房间,要访问那个房间还需要那个房间的钥匙,这就是数据库用户。最后,每个房间里都包含一些对象(如桌子,椅子等),我们可以想象为数据库中的表,并不是每个进入房间的人都有权限使用这些对象,因此,可以分配用户权限去访问这些对象。 例如:给某个表设定指定的用户权限,展开数据库下的表,在指定表上右击,选择属性,在权限选项中可以为指定的用户设置指定的权限。 5、为数据库设置权限 数据库本身也是对象,为数据库授权时,可以为其授予在数据库中创建其他各种对象的权限。 右击指定的数据库,选择属性,在权限选项中,可以为指定的用户选择权限。 本文转自yun5277 51CTO博客,原文链接:http://blog.51cto.com/dengqi/1221371,如需转载请自行联系原作者

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册