首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

Storm的数据处理编程单元:Bolt 学习整理

Bolt是Topology中的数据处理的单元,也是Storm针对处理过程的编程单元。Topology中所有的处理都是在这些Bolt中完成的,编程人员可以实现自定义的处理过程,例如,过滤、函数、聚集、连接等计算。如果是复杂的计算过程,往往需要多个步骤和使用多个Bolt。 Bolt可以将数据项发送至多个数据流(Stream)。编程人员首先可以使用OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送到的流,然后使用SpoutOutputCollector的emit方法将数据发送。 当声明了一个Bolt的输入流后,可以从其他的组件中接收这些指定的流。当接收某个组件的所有流时,需要在程序中逐个声明接收的过程。InputDeclarer对象默认接收来自某组件默认的流。 //从名称为"1"的组件中接收默认的流。 declarer.shuffleGrouping("1") IBolt 和 IComponent接口 IBolt接口: //在组件的任务初被初始化时,由集群中的工作进程(worker)调用,prepare()用于实例化Bolt的已给运行时任务,被集群中的某一个进程调用,提供Bolt运行的环境。//sormConf对象维护Storm中针对该Bolt的配置信息。(来自Topology);context对象是一个上下文对象,用于获取该组件运行时任务的信息。(例如Topology中该Bolt所有任务的位置,包括任务的id、组件id和输入输出信息等)//collector对象用于从该Bolt发送数据项。数据项可以在任意时刻发送,包括调用open()和close()方法。 void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector) //接收一个数据项并处理//该方法用来接收一个数据项(Tuple),并可以将处理的结果作为新的数据项发送(emit),是Bolt需要实现的最重要的方法。//参数imput是一个数据项对象,包含了众多的元数据(metadata),包括它来自的组件、流、任务等。数据项中的值,可以通过Tuple类的getValue()方法获得。 void execute(Tuple input) //在IBolt将关闭时调用 void cleanup() Tuple类的方法,这个类的对象作为execute()方法的输入。(方法举例: int size() ; int fieldIndex(java.lang.String field) ; ......) 方法众多,可以整理分为以下五类: 1、获取属性的方法。 (size()、fieldIndex()和contains()三个方法) 2、获取元数据的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法) 其中MessageId是在数据项被创建时,通过一定的规则赋值的。 3、根据域获取值的方法。(getValue()和多个get具体数据类型的方法) 4、根据域的名称获取值的方法。(这一类包括getFields()、getValues()和select()方法) 5、获取Tuple的值或域列表的方法。(getFields()、getValues()和select()方法) 分别获取该数据项的所有域列表、值列表和值列表子集。 简单的案例: class SplitSentence implements IRichBolt { private OutputCollector collector; public void prepare(Map conf,TopologyContext context,OutputCollector collector){ this.collector = collector; } public void execute(Tuple tuple){ String sentence = tuple.getString(0); for(String word : sentence.split(" "){ collector.emit(new Values(word)); } } public void cleanup(){ } public void declareOutputFields(OutpuFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } } 这里说下declareOutputFields()函数参数,声明了输出流的数据项的结构,也即Tuple的域。 结合上节给的Spout的示例,可以在Topology类的main函数中加入相关代码,增加Bolt。 Topology builder builder = new TopologyBuilder(); Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1); builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");

优秀的个人博客,低调大师

storm的数据源编程单元Spout学习整理

Spout呢,是Topology中数据流的源头,也是Storm针对数据源的编程单元。一般数据的来源,是通过外部数据源来读取数据项(Tuple),并读取的数据项传输至作业的其他组件。编程人员一般可通过OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送的流,然后使用SpoutOutputCollector的emit方法将数据发送。 这里整理了下ISpout和IComponent接口。 ISpout声明了Spout的核心方法,用于向Topology供给数据项。对于每一个发出的数据项,Storm通过Spout,可以追踪它经历处理过程的有向无环图(竟然也是DAG)。 void open (java,util.Map conf,TopologyContext context,SpoutOutputCollector collector) 用于实例化Spout的一个运行时任务,被急群众的某一进程调用 (conf对象维护Storm中针对该Spout的配置信息,context是一个上下文对象,可用于获取该组件运行时任务的信息,collector用于从该Spout发送数据项) void close() 用于停止一个Spout void activate() 在Spout从非激活状态转换为激活状态时被调用 void deactivate() 在Spout的非激活状态被调用void ack(java.lang.Object msgId)Storm用于确认该Spout发送的这个数据项已经被完整处理 void fail(java.lang.Object msgId) Storm用于确认该Spout发送的这个数据项已经失败 void nextTuple() 当这个方法被调用时,Storm要求Spout发送一个数据项至output collector(nextTuple是Spout向Topology中发送一个数据项,是Spout需要实现的最重要的方法。在可靠的Spout的一个任务中,nextTuple()、ack()、fail()三个方法的调用在一个单独线程中循环。当不存在数据项需要发送时,nextTuple()将会休眠一小段间隔,确保不会浪费过多的CPU资源) IComponent接口,声明了Topology组件的通用方法。使用JAVA语言的Spout和Bolt都必须实现这个接口。 void declareOutputFields(OutputFieldsDeclarer declarer) 声明指定输出流的数据项结构。(这里指定了输出流的数据项结构(schema)。参数declarer被用来声明输出流(stream)的id,域。 java.util.Map getComponentConfiguration() 获取组件的配置信息 以Storm官网的WordCount来说明就是: public class WordCount extends BaseRichSpout { public static Logger log = logger.getLogger(backtype/storm/testing/WordCount); boolean_isDistributed; SpoutOutputCollector_collector; public WordCount(){ this(true); } public WordCount(boolean isDistributed){ _isDistributed = isDistributed; } public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){ _collector = collector; } public void close(){ } public void nextTuple(){ Utils.sleep(100L); String words[] = { "nathan","mike","jackson","golda","bertels" }; public void ack(Object obj){ } public void fail(Object obj){ } public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declarer(new Fields(new String[] { "word" })); } public Map getComponentConfiguration(){ if(!_isDistributed_) { Map ret = new HashMap(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,Integer.valueOf(1)); }else{ return null; } } } 1、类中有对WordCount的两个重载的构造函数,其中_isDistributed指明了Spout的并行度,若_isDistributed=false,则意味着这个Spout运行时仅有一份任务实例。 2、open()函数的实现,将传入的collector赋值给局部变量,使之后通过该局部变量来操作数据项的发送。 3、declareOutputFields()函数,生命了输出流的数据项结构。 4、nextTuple函数,让一只执行的线程休眠100毫秒,再继续执行下述函数体,通过线程的休眠,控制nextTuple()产生数据项的周期为0.1秒。并且在维护字符串数组中,随机挑选一个字符串,作为"word"的域,交给变量collector作为一个Tuple发送。 (ack的作用是确认数据项是否被完整处理,这里没做处理) 5、getComponentConfiguration()函数则返回组建的配置信息(这个实例中只有在_isDistributed=false时,才返回包含该配置项的Map数据结构。 6、其他重载函数都为空实现。 那么在Topology实现类的main函数使其作为一个spout: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentenceGenSpout",new WordCount());

优秀的个人博客,低调大师

Hadoop学习之HBase的伪分布式安装

HBase的伪分布式安装 1.HBase单台机器上的安装 a)使用winscp将win下的HBase软件(hbase-0.94.7-security.tar.gz)包复制到linux下的/usr/local/下 b)解压文件hbase-0.94.7-security.tar.gz #tar -xzvf hbase-0.94.7-security.tar.gz 更名为hbase #mv hbase-0.94.7-security hbase c)增加hbase的环境变量 #vim /etc/profile #source /etc/profile d)修改配置文件 Hbase-env.sh和hbase-site.xml、regionservers(可选) 修改hbase-env.sh 修改30行为自己的JAVA_HOME地址 将118行前面的注释去掉(如图) 修改hbase-site.xml 在configuration标签中添加如下内容: <property> <name>hbase.rootdir</name> <value>hdfs://hadoop0:9000/hbase</value>##hadoop0修改为自己的主机名 </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hadoop0</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> 注意:$HBASE_HOME/conf/hbase-site.xml的hbase.rootdir的主机和端口号与$HADOOP_HOME/conf/core-site.xml的fs.default.name的主机和端口号一致 e)启动HBase,执行start-hbase.sh …启动hbase之前先确定hadoop 是运行正常的,并且不在安全模式中 f)验证: 执行jps,如果发现有HMaster、HRegionServer、QuorumPeerMain则证明HBase运行成功。 或者在浏览器中输入hadoop1:60010 ,首先浏览器所在的主机的hosts文件中有hadoop1和IP地址的绑定,如果能正常显示网页也证明成功

优秀的个人博客,低调大师

Solon AI 开发学习 3 - chat - 模型配置与请求选项

1、聊天模型配置(ChatConfig) ChatConfig,聊天模型配置 属性 要求 描述 apiUrl:String 必须 模型服务接口地址(是完整地址) apiKey:String 接口令牌 provider:String 服务提供者(如ollama、dashscope、openai),默认为 openai model:String 必须 模型名字 headers:Map<String, String> 头信息(其它配置,都以头信息形式添加) timeout:Duration 请求超时(默认:60s) proxy:ProxyDesc 网络代理 defaultTools:Map 默认工具(每次请求,都会附上) defaultToolsContext:Map 默认工具上下文(每次请求,都会附上) defaultInterceptors:List 默认拦截器(每次请求,都会附上) defaultOptions:Map 默认选项(每次请求,都会附上) 关于 model 配置: 如果是 ollama ,运行什么模型即填什么(如:ollama run deepseek-r1:7b,则填:deepseek-r1:7b) 如果是其它服务平台,请按平台的接口说明填写 更多可参考:《模型实例的构建和简单调用》 示例: //用 ChatConfig 构建聊天模型(ChatConfig 一般用于接收配置注入) public void case1(@Inject("${xxx.yyy}") ChatConfig config) { ChatModel chatModel = ChatModel.of(config).build(); } //直接构建聊天模型(ChatModel.Builder 内置 ChatConfig) public void case2() { ChatModel chatModel = ChatModel.of(apiUrl).apiKey(apiKey).model(model).build(); } 2、聊天模型请求选项(ChatOptions) ChatOptions,聊天请求选项(不同模型,支持情况不同) 方法 描述 toolsContext():Map 获取工具上下文(附加参数)。v3.4.0 后支持 toolsContext(Map):self 设置工具上下文(附加参数)。v3.4.0 后支持 tools():FunctionTool[] 获取所有函数工具(内部构建请求时用) tool(name):FunctionTool 获取函数工具 toolsAdd(FunctionTool):self 添加函数工具 toolsAdd(Iterable<FunctionTool>):self 添加函数工具 toolsAdd(ToolProvider):self 添加函数工具 toolsAdd(Object):self 添加函数工具(@ToolMappingobject) toolsAdd(String, Consumer<FunctionToolDesc>):self 添加函数工具(构建模式) interceptors():ChatInterceptor[] 获取所有聊天拦截器(内部构建请求时用) interceptorAdd(ChatInterceptor):self 添加聊天拦截器 interceptorAdd(int, ChatInterceptor):self 添加聊天拦截器 options():Map<String, Object> 获取所有选项(内部构建请求时用) option(key):Object 获取选项 optionAdd(key,val):self 添加选项(常用选项之外的选项) tool_choice(choiceOrName):self 常用选项:工具选择(可选:none,auto,required,或 tool-name) max_tokens(val):self 常用选项:最大提示语令牌数限制 max_completion_tokens(val):self 常用选项:最大完成令牌数限制 temperature(val):self 常用选项:temperature 采样 top_p(val):self 常用选项:top_p 采样 top_k(val):self 常用选项:top_k 采样 frequency_penalty(val):self 常用选项:频率惩罚 presence_penalty(val):self 常用选项:存在惩罚 response_format(map):self 常用选项:响应格式 user(user):self 常用选项:用户 示例: public void case1(ChatConfig config) { ChatModel chatModel = ChatModel.of(config).build(); //使用聊天配置 chatModel.prompt("hello") .options(o->o.max_tokens(500)) //使用聊天选项 .call(); } public void case2(ChatConfig config, String user) { ChatModel chatModel = ChatModel.of(config).build(); //使用聊天配置 chatModel.prompt("hello") .options(o->o.toolsAdd(new WeatherTool()).toolsContext(Utils.asMap("user", user))) //使用聊天选项 .call(); } //user 参数不加 @Param(即不要求 llm 输出),由 toolsContext 传入(扩展参数)! public class WeatherTool { @ToolMapping(description = "获取指定城市的天气情况") public String get_weather(@Param(description = "根据用户提到的地点推测城市") String location, String user) { return "晴,24度"; //可使用 “数据库” 或 “网络” 接口根据 location 查询合适数据; } } 3、关于 response_format (模型的响应格式) 每个模型可能不同,一般:默认为 md 格式。其它格式参考(具体要看模型的说明): chatModel.prompt("hello") .options(o->o.response_format(Utils.asMap("type", "json_object"))) //使用聊天选项 .call(); chatModel.prompt("hello") .options(o->o.response_format(Utils.asMap("type", "json_schema", "json_schema", Utils.asMap("type","object","properties",Utils.asMap()), "strict", true))) //使用聊天选项 .call();

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册