首页 文章 精选 留言 我的

精选列表

搜索[hadoop],共8432篇文章
优秀的个人博客,低调大师

[Hadoop]Reducer总是能复用为Combiner?

Combiner函数是一个可选的中间函数,发生在Map阶段,Mapper执行完成后立即执行。使用Combiner有如下两个优势: Combiner可以用来减少发送到Reducer的数据量,从而提高网络效率。 Combiner可以用于减少发送到Reducer的数据量,这将提高Reduce端的效率,因为每个reduce函数将处理相比于未使用Combiner之前更少的记录。 Combiner与Reducer结构相同,因为Combiner和Reducer都对Mapper的输出进行处理。这给了我们一个复用Reducer作为Combiner的好机会。但问题是,复用Reducer作为Combiner总是可行的吗? 1. Reducer作为Combiner的适用场景 假设我们正在编写一个MapReduce程序来计算股票数据集中每个股票代码的最大收盘价。Mapper将数据集中每个股票记录的股票代码作为key和收盘价作为value。Reducer然后将循环遍历股票代码对应的所有收盘价,并从收盘价列表中计算最高收盘价。假设Mapper 1处理股票代码为ABC的3个记录,收盘价分别为50,60和111。让我们假设Mapper 2处理股票代码为ABC的2个记录,收盘价分别为100和31。那么Reducer将收到股票代码ABC五个收盘价---50,60,111,100和31。Reducer的工作非常简单,它将简单地循环遍历所有收盘价,并将计算最高收盘价为111。 我们可以在每个Mapper之后使用相同的Reducer作为Combiner。Mapper 1上的Combiner将处理3个收盘价格--50,60和111,并且仅输出111,因为它是3个收盘价的最大值。Mapper 2上的Combiner将处理2个收盘价格--100和31,并且仅输出100,因为它是2个收盘价的最大值。现在使用Combiner之后,Reducer仅处理股票代码ABC的2个收盘价(原先需要处理5个收盘价),即来自Mapper 1的111和来自Mapper 2的100,并且将从这两个值中计算出最大收盘价格为111。 正如我们看到的,使用Combiner情况下Reducer输出与没有使用Combiner的输出结果是相同的,因此在这种情况下复用Reducer作为Combiner是没有问题。 2. Reducer作为Combiner的不适用场景 假设我们正在编写一个MapReduce程序来计算股票数据集中每个股票代码的平均交易量(average volume for each symbol)。Mapper将数据集中每个股票记录的股票代码作为key和交易量(volume)作为value。Reducer然后将循环遍历股票代码对应的所有交易量,并从交易量列表中计算出平均交易量(average volume from the list of volumes for that symbol)。假设Mapper 1处理股票代码为ABC的3个记录,收盘价分别为50,60和111。让我们假设Mapper 2处理股票代码为ABC的2个记录,收盘价分别为100和31。那么Reducer将收到股票代码ABC五个收盘价---50,60,111,100和31。Reducer的工作非常简单,它将简单地循环遍历所有交易量,并将计算出平均交易量为70.4。 50 + 60 + 111 + 100 + 31 / 5 = 352 / 5 = 70.4 让我们看看如果我们在每个Mapper之后复用Reducer作为Combiner会发生什么。Mapper 1上的Combiner将处理3个交易量--50,60和111,并计算出三个交易量的平均交易量为73.66。Mapper 2上的Combiner将处理2个交易量--100和31,并计算出两个交易量的平均交易量为65.5。那么在复用Reducer作为Combiner的情况下,Reducer仅处理股票代码ABC的2个平均交易量,来自Mapper1的73.66和来自Mapper2的65.5,并计算股票代码ABC最终的平均交易量为69.58。 73.66 + 65.5 /2 = 69.58 这与我们不复用Reducer作为Combiner得出的结果不一样,因此复用Reducer作为Combiner得出平均交易量是不正确的。 所以我们可以看到Reducer不能总是被用于Combiner。所以,当你决定复用Reducer作为Combiner的时候,你需要问自己这样一个问题:使用Combiner与不使用Combiner的输出结果是否一样? 3. 区别 Combiner需要实现Reducer接口。Combiner只能用于特定情况。 与Reducer不同,Combiner有一个约束,Combiner输入/输出键和值类型必须与Mapper的输出键和值类型相匹配。而Reducer只是输入键和值类型与Mapper的输出键和值类型相匹配。 Combiner只能用于满足交换律(a.b = b.a)和结合律(a.(b.c)= (a.b).c)的情况。这也意味着Combiner可能只能用于键和值的一个子集或者可能不能使用。 Reducer可以从多个Mapper获取数据。Combiner只能从一个Mapper获取其输入。 原文:http://hadoopinrealworld.com/can-reducer-always-be-reused-for-combiner/

优秀的个人博客,低调大师

Hadoop YARN 的工作流程简述

1、Client 向 YARN 提交应用程序,其中包括 ApplicationMaster 程序及启动 ApplicationMaster 命令2、ResourceManager 为该 ApplicationMaster 分配第一个 Container,并与对应的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster3、ApplicationMaster 向ResourceManager 注册4、ApplicationMaster 为 Application 的任务申请并领取资源5、获取到资源后,要求对应的 NodeManager 在 Container 中启动任务6、NodeManager 收到 ApplicationMaster 的请求后,为任务设置好运行环境(包括环境变量、JAR 包等),将任务启动脚本写到一个脚本中,并通过运行该脚本启动任务7、各个任务通过 RPC 协议向 ApplicationMaster 汇报自己的状态和进度,以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在失败时重启任务8、应用程序完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己实际中,集群可能并没有那么多资源来满足 ApplicationMaster 的资源请求,这是 ApplicationMaster 会采用轮循的方式不断申请资源,直到申请到资源或 Application 结束

优秀的个人博客,低调大师

Hadoop Yarn事件处理框架源码分析

由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。 在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。 要使用事件处理,首先需要创建Dispatcher,示例代码如下: dispatcher = new AsyncDispatcher();//创建 addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作 dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法 然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下: @Override public EventHandler getEventHandler() { if (handlerInstance == null) { handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理 } return handlerInstance; } class GenericEventHandler implements EventHandler<Event> { public void handle(Event event) { if (blockNewEvents) { return; } /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { LOG.info("Size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event);//放进队列 drained = false; } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } throw new RuntimeException(e); } }; } 上述完成生产,再看消费如下实现的: @Override protected void serviceStart() throws Exception { //start all the components super.serviceStart(); eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程 eventHandlingThread.setName("AsyncDispatcher event handler"); eventHandlingThread.start(); } 查看createThread()方法,如下所示: Runnable createThread() { return new Runnable() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { if (drained) { waitForDrained.notify(); } } } Event event; try { event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) { dispatch(event);//分发事件 } } } }; } 从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示: protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString()); } Class<? extends Enum> type = event.getType().getDeclaringClass(); try{ EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler if(handler != null) { handler.handle(event); //EventHandler处理事件event } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { LOG.info("Exiting, bbye.."); System.exit(-1); } } }整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册