首页 文章 精选 留言 我的

精选列表

搜索[读写分离],共10005篇文章
优秀的个人博客,低调大师

一招教你无阻塞读写Golang channel

无论是无缓冲通道,还是有缓冲通道,都存在阻塞的情况,教你一招再也不遇到channel阻塞的问题。 这篇文章会介绍,哪些情况会存在阻塞,以及如何使用select解决阻塞。 阻塞场景阻塞场景共4个,有缓存和无缓冲各2个。 无缓冲通道的特点是,发送的数据需要被读取后,发送才会完成,它阻塞场景: 通道中无数据,但执行读通道。 通道中无数据,向通道写数据,但无协程读取。 1// 场景1 2func ReadNoDataFromNoBufCh() { 3 noBufCh := make(chan int) 4 5 <-noBufCh 6 fmt.Println("read from no buffer channel success") 7 8 // Output: 9 // fatal error:

优秀的个人博客,低调大师

大数据查询——HBase读写设计与实践

背景介绍 本项目主要解决 check 和 opinion2 张历史数据表(历史数据是指当业务发生过程中的完整中间流程和结果数据)的在线查询。原实现基于 Oracle 提供存储查询服务,随着数据量的不断增加,在写入和读取过程中面临性能问题,且历史数据仅供业务查询参考,并不影响实际流程,从系统结构上来说,放在业务链条上游比较重。本项目将其置于下游数据处理 Hadoop 分布式平台来实现此需求。下面列一些具体的需求指标: 1、数据量:目前 check 表的累计数据量为 5000w+ 行,11GB;opinion 表的累计数据量为 3 亿 +,约 100GB。每日增量约为每张表 50 万 + 行,只做 insert,不做 update。 2、查询要求:check 表的主键为 id(Oracle 全局 id),查询键为 check_id,一个 check_id 对应多条记录,所以需返回对应记录的 list; opinion 表的主键也是 id,查询键是 bussiness_no 和 buss_type,同理返回 list。单笔查询返回 List 大小约 50 条以下,查询频率为 100 笔 / 天左右,查询响应时间 2s。 技术选型 从数据量及查询要求来看,分布式平台上具备大数据量存储,且提供实时查询能力的组件首选 HBase。根据需求做了初步的调研和评估后,大致确定 HBase 作为主要存储组件。将需求拆解为写入和读取 HBase 两部分。 读取 HBase 相对来说方案比较确定,基本根据需求设计 RowKey,然后根据 HBase 提供的丰富 API(get,scan 等)来读取数据,满足性能要求即可。 写入 HBase 的方法大致有以下几种: 1、Java 调用 HBase 原生 API,HTable.add(List(Put))。 2、MapReduce 作业,使用 TableOutputFormat 作为输出。 3、Bulk Load,先将数据按照 HBase 的内部数据格式生成持久化的 HFile 文件,然后复制到合适的位置并通知 RegionServer ,即完成海量数据的入库。其中生成 Hfile 这一步可以选择 MapReduce 或 Spark。 本文采用第 3 种方式,Spark + Bulk Load 写入 HBase。该方法相对其他 2 种方式有以下优势: 1、BulkLoad 不会写 WAL,也不会产生 flush 以及 split。 2、如果我们大量调用 PUT 接口插入数据,可能会导致大量的 GC 操作。除了影响性能之外,严重时甚至可能会对 HBase 节点的稳定性造成影响,采用 BulkLoad 无此顾虑。 3、过程中没有大量的接口调用消耗性能。 4、可以利用 Spark 强大的计算能力。 图示如下: 设计 环境信息 Hadoop2.5-2.7 HBase0.98.6 Spark2.0.0-2.1.1 Sqoop1.4.6 表设计 本段的重点在于讨论 HBase 表的设计,其中 RowKey 是最重要的部分。为了方便说明问题,我们先来看看数据格式。以下以 check 举例,opinion 同理。 check 表(原表字段有 18 个,为方便描述,本文截选 5 个字段示意) 如上图所示,主键为 id,32 位字母和数字随机组成,业务查询字段 check_id 为不定长字段(不超过 32 位),字母和数字组成,同一 check_id 可能对应多条记录,其他为相关业务字段。众所周知,HBase 是基于 RowKey 提供查询,且要求 RowKey 是唯一的。RowKey 的设计主要考虑的是数据将怎样被访问。初步来看,我们有 2 种设计方法。 1、拆成 2 张表,一张表 id 作为 RowKey,列为 check 表对应的各列;另一张表为索引表,RowKey 为 check_id,每一列对应一个 id。查询时,先找到 check_id 对应的 id list,然后根据 id 找到对应的记录。均为 HBase 的 get 操作。 2、将本需求可看成是一个范围查询,而不是单条查询。将 check_id 作为 RowKey 的前缀,后面跟 id。查询时设置 Scan 的 startRow 和 stopRow,找到对应的记录 list。 第一种方法优点是表结构简单,RowKey 容易设计,缺点为 1)数据写入时,一行原始数据需要写入到 2 张表,且索引表写入前需要先扫描该 RowKey 是否存在,如果存在,则加入一列,否则新建一行,2)读取的时候,即便是采用 List, 也至少需要读取 2 次表。第二种设计方法,RowKey 设计较为复杂,但是写入和读取都是一次性的。综合考虑,我们采用第二种设计方法。 RowKey 设计 热点问题 HBase 中的行是以 RowKey 的字典序排序的,其热点问题通常发生在大量的客户端直接访问集群的一个或极少数节点。默认情况下,在开始建表时,表只会有一个 region,并随着 region 增大而拆分成更多的 region,这些 region 才能分布在多个 regionserver 上从而使负载均分。对于我们的业务需求,存量数据已经较大,因此有必要在一开始就将 HBase 的负载均摊到每个 regionserver,即做 pre-split。常见的防治热点的方法为加盐,hash 散列,自增部分(如时间戳)翻转等。 RowKey 设计 Step1:确定预分区数目,创建 HBase Table 不同的业务场景及数据特点确定数目的方式不一样,我个人认为应该综合考虑数据量大小和集群大小等因素。比如 check 表大小约为 11G,测试集群大小为 10 台机器,hbase.hregion.max.filesize=3G(当 region 的大小超过这个数时,将拆分为 2 个),所以初始化时尽量使得一个 region 的大小为 1~2G(不会一上来就 split),region 数据分到 11G/2G=6 个,但为了充分利用集群资源,本文中 check 表划分为 10 个分区。如果数据量为 100G,且不断增长,集群情况不变,则 region 数目增大到 100G/2G=50 个左右较合适。Hbase check 表建表语句如下: create'tinawang:check', {NAME=>'f',COMPRESSION=>'SNAPPY',DATA_BLOCK_ENCODING=>'FAST_DIFF',BLOOMFILTER=>'ROW'}, {SPLITS=>['1','2','3','4','5','6','7','8','9']} 其中,Column Family =‘f’,越短越好。 COMPRESSION => 'SNAPPY',HBase 支持 3 种压缩 LZO, GZIP and Snappy。GZIP 压缩率高,但是耗 CPU。后两者差不多,Snappy 稍微胜出一点,cpu 消耗的比 GZIP 少。一般在 IO 和 CPU 均衡下,选择 Snappy。 DATA_BLOCK_ENCODING => 'FAST_DIFF',本案例中 RowKey 较为接近,通过以下命令查看 key 长度相对 value 较长。 ./hbaseorg.apache.hadoop.hbase.io.hfile.HFile-m-f/apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2_ Step2:RowKey 组成 Salt 让数据均衡的分布到各个 Region 上,结合 pre-split,我们对查询键即 check 表的 check_id 求 hashcode 值,然后 modulus(numRegions) 作为前缀,注意补齐数据。 StringUtils.leftPad(Integer.toString(Math.abs(check_id.hashCode()%numRegion)),1,’0’) 说明:如果数据量达上百 G 以上,则 numRegions 自然到 2 位数,则 salt 也为 2 位。 Hash 散列 因为 check_id 本身是不定长的字符数字串,为使数据散列化,方便 RowKey 查询和比较,我们对 check_id 采用 SHA1 散列化,并使之 32 位定长化。 MD5Hash.getMD5AsHex(Bytes.toBytes(check_id)) 唯一性 以上 salt+hash 作为 RowKey 前缀,加上 check 表的主键 id 来保障 RowKey 唯一性。综上,check 表的 RowKey 设计如下:(check_id=A208849559) 为增强可读性,中间还可以加上自定义的分割符,如’+’,’|’等。 7+7c9498b4a83974da56b252122b9752bf+56B63AB98C2E00B4E053C501380709AD 以上设计能保证对每次查询而言,其 salt+hash 前缀值是确定的,并且落在同一个 region 中。需要说明的是 HBase 中 check 表的各列同数据源 Oracle 中 check 表的各列存储。 WEB 查询设计 RowKey 设计与查询息息相关,查询方式决定 RowKey 设计,反之基于以上 RowKey 设计,查询时通过设置 Scan 的 [startRow,stopRow], 即可完成扫描。以查询 check_id=A208849559 为例,根据 RowKey 的设计原则,对其进行 salt+hash 计算,得前缀。 startRow=7+7c9498b4a83974da56b252122b9752bf stopRow=7+7c9498b4a83974da56b252122b9752bg 代码实现关键流程 Spark write to HBase Step0: prepare work 因为是从上游系统承接的业务数据,存量数据采用 sqoop 抽到 hdfs;增量数据每日以文件的形式从 ftp 站点获取。因为业务数据字段中包含一些换行符,且 sqoop1.4.6 目前只支持单字节,所以本文选择’0x01’作为列分隔符,’0x10’作为行分隔符。 Step1: Spark read hdfs text file SparkContext.textfile() 默认行分隔符为”\n”,此处我们用“0x10”,需要在 Configuration 中配置。应用配置,我们调用 newAPIHadoopFile 方法来读取 hdfs 文件,返回 JavaPairRDD,其中 LongWritable 和 Text 分别为 Hadoop 中的 Long 类型和 String 类型(所有 Hadoop 数据类型和 java 的数据类型都很相像,除了它们是针对网络序列化而做的特殊优化)。我们需要的数据文件放在 pairRDD 的 value 中,即 Text 指代。为后续处理方便,可将 JavaPairRDD转换为 JavaRDD< String >。 Step2: Transfer and sort RDD ① 将 avaRDD< String>转换成 JavaPairRDD<tuple2,String>,其中参数依次表示为,RowKey,col,value。做这样转换是因为 HBase 的基本原理是基于 RowKey 排序的,并且当采用 bulk load 方式将数据写入多个预分区(region)时,要求 Spark 各 partition 的数据是有序的,RowKey,column family(cf),col name 均需要有序。在本案例中因为只有一个列簇,所以将 RowKey 和 col name 组织出来为 Tuple2格式的 key。请注意原本数据库中的一行记录(n 个字段),此时会被拆成 n 行。 ② 基于 JavaPairRDD<tuple2,String>进行 RowKey,col 的二次排序。如果不做排序,会报以下异常: java.io.IOException:Addedakeynotlexicallylargerthanpreviouskey ③ 将数据组织成 HFile 要求的 JavaPairRDDhfileRDD。 Step3:create hfile and bulk load to HBase ①主要调用 saveAsNewAPIHadoopFile 方法: hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class, KeyValue.class,HFileOutputFormat2.class,config); ② hfilebulk load to HBase finalJobjob=Job.getInstance(); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HFileOutputFormat2.configureIncrementalLoad(job,htable); LoadIncrementalHFilesbulkLoader=newLoadIncrementalHFiles(config); bulkLoader.doBulkLoad(newPath(hfilePath),htable); 注:如果集群开启了 kerberos,step4 需要放置在 ugi.doAs()方法中,在进行如下验证后实现 UserGroupInformationugi=UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath); UserGroupInformation.setLoginUser(ugi); 访问 HBase 集群的 60010 端口 web,可以看到 region 分布情况。 Read from HBase 本文基于 spring boot 框架来开发 web 端访问 HBase 内数据。 use connection pool(使用连接池) 创建连接是一个比较重的操作,在实际 HBase 工程中,我们引入连接池来共享 zk 连接,meta 信息缓存,region server 和 master 的连接。 HConnectionconnection=HConnectionManager.createConnection(config); HTableInterfacetable=connection.getTable("table1"); try{ //Usethetableasneeded,forasingleoperationandasinglethread }finally{ table.close(); } 也可以通过以下方法,覆盖默认线程池。 HConnectioncreateConnection(org.apache.hadoop.conf.Configurationconf,ExecutorServicepool); process query Step1: 根据查询条件,确定 RowKey 前缀 根据 3.3 RowKey 设计介绍,HBase 的写和读都遵循该设计规则。此处我们采用相同的方法,将 web 调用方传入的查询条件,转化成对应的 RowKey 前缀。例如,查询 check 表传递过来的 check_id=A208849559,生成前缀 7+7c9498b4a83974da56b252122b9752bf。 Step2:确定 scan 范围 A208849559 对应的查询结果数据即在 RowKey 前缀为 7+7c9498b4a83974da56b252122b9752bf 对应的 RowKey 及 value 中。 scan.setStartRow(Bytes.toBytes(rowkey_pre));//scan,7+7c9498b4a83974da56b252122b9752bf byte[]stopRow=Bytes.toBytes(rowkey_pre); stopRow[stopRow.length-1]++; scan.setStopRow(stopRow);//7+7c9498b4a83974da56b252122b9752bg Step3:查询结果组成返回对象 遍历 ResultScanner 对象,将每一行对应的数据封装成 table entity,组成 list 返回。 测试 从原始数据中随机抓取 1000 个 check_id,用于模拟测试,连续发起 3 次请求数为 2000(200 个线程并发,循环 10 次),平均响应时间为 51ms,错误率为 0。 如上图,经历 N 次累计测试后,各个 region 上的 Requests 数较为接近,符合负载均衡设计之初。 踩坑记录 1、kerberos 认证问题 如果集群开启了安全认证,那么在进行 Spark 提交作业以及访问 HBase 时,均需要进行 kerberos 认证。 本文采用 yarn cluster 模式,像提交普通作业一样,可能会报以下错误。 ERRORStartApp:jobfailure, java.lang.NullPointerException atcom.tinawang.spark.hbase.utils.HbaseKerberos.<init>(HbaseKerberos.java:18) atcom.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60) 定位到 HbaseKerberos.java:18,代码如下: this.keytabPath=(Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath(); 这是因为 executor 在进行 HBase 连接时,需要重新认证,通过 --keytab 上传的 tina.keytab 并未被 HBase 认证程序块获取到,所以认证的 keytab 文件需要另外通过 --files 上传。示意如下 --keytab/path/tina.keytab\ --principaltina@GNUHPC.ORG\ --files"/path/tina.keytab.hbase" 其中 tina.keytab.hbase 是将 tina.keytab 复制并重命名而得。因为 Spark 不允许同一个文件重复上传。 2、序列化 org.apache.spark.SparkException:Tasknotserializable atorg.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) atorg.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) atorg.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) atorg.apache.spark.SparkContext.clean(SparkContext.scala:2101) atorg.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) atorg.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) ... org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) Causedby:java.io.NotSerializableException:org.apache.spark.api.java.JavaSparkContext Serializationstack: -objectnotserializable(class:org.apache.spark.api.java.JavaSparkContext,value:org.apache.spark.api.java.JavaSparkContext@24a16d8c) -field(class:com.tinawang.spark.hbase.processor.SparkReadFileRDD,name:sc,type:classorg.apache.spark.api.java.JavaSparkContext) ... 解决方法一: 如果 sc 作为类的成员变量,在方法中被引用,则加 transient 关键字,使其不被序列化。 privatetransientJavaSparkContextsc; 解决方法二: 将 sc 作为方法参数传递,同时使涉及 RDD 操作的类 implements Serializable。 代码中采用第二种方法。详见代码。 3、批量请求测试 Exceptioninthread"http-nio-8091-Acceptor-0"java.lang.NoClassDefFoundError:org/apache/tomcat/util/ExceptionUtils 或者 Exceptioninthread"http-nio-8091-exec-34"java.lang.NoClassDefFoundError:ch/qos/logback/classic/spi/ThrowableProxy 查看下面 issue 以及一次排查问题的过程,可能是 open file 超过限制。 https://github.com/spring-projects/spring-boot/issues/1106 http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg 使用 ulimit-a 查看每个用户默认打开的文件数为 1024。 在系统文件 /etc/security/limits.conf 中修改这个数量限制,在文件中加入以下内容, 即可解决问题。 soft nofile 65536 hard nofile 65536 作者介绍 汪婷,中国民生银行大数据开发工程师,专注于 Spark 大规模数据处理和 Hbase 系统设计。 大数据参考学习:http://www.roncoo.com/course/list.html?tid1=&tid2=5824e5d9104b49229f7e15aabf81688f

优秀的个人博客,低调大师

Android系统中的任意文件读写方法

最近用了一个root工具,从中学习到从Android系统中上传、下载任意文件的方法: echooff cls adbshellmv/data/local/tmp/data/local/tmp.bak adbshellln-s/data/data/local/tmp adbreboot echoRebooting(1/3)-Continueoncedevicefinishesrebooting echo正在重启手机(第1次,共3次)-请等待重启完毕,之后按任意键继续 pause adbshellrm/data/local.prop>nul adbshell"echo\"ro.kernel.qemu=1\">/data/local.prop" adbreboot echoRebooting(2/3)-Continueoncedevicefinishesrebooting echo正在重启平板(第2次,共3次)-请等待重启完毕,之后按任意键继续 pause adbshellid echoIftheidis0/rootthencontinue,otherwisectrl+ctocancelandstartover echo如果上面显示的id为0或者root,按任意键继续;否则按Ctrl-C并回复Y来取消本次root尝试,然后重试 pause adbremount adbpushsu/system/bin/su adbshellchown0.0/system/bin/su adbshellchmod06755/system/bin/su adbpushbusybox/system/bin/busybox adbshellchown0.0/system/bin/busybox adbshellchmod0755/system/bin/busybox adbpushSuperuser.apk/system/app/Superuser.apk adbshellchown0.0/system/app/Superuser.apk adbshellchmod0644/system/app/Superuser.apk adbpushRootExplorer.apk/system/app/RootExplorer.apk adbshellchown0.0/system/app/RootExplorer.apk adbshellchmod0644/system/app/RootExplorer.apk echoRemovingchangesexceptROOT echo正在进行清理和恢复 adbshellrm/data/local.prop adbshellrm/data/local/tmp adbshellmv/data/local/tmp.bak/data/local/tmp adbreboot echoRebooting(3/3)-YoushouldnowbeRooted echo正在重启平板(第3次,共3次)-root成功 pause echoon 关键就是在/data/local.prop中添加"ro.kernel.qemu=1" 本文转自fatshi51CTO博客,原文链接:http://blog.51cto.com/duallay/1101494,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop 利用FileSystem API 执行hadoop文件读写操作

因为HDFS不同于一般的文件系统,所以Hadoop提供了强大的FileSystem API来操作HDFS. 核心类是FSDataInputStream和FSDataOutputStream 读操作: 我们用FSDataInputStream来读取HDFS中的指定文件(第一个实验),另外我们还演示了这个类的定位文件位置的能力,然后从指定位置开始读取文件(第二个实验)。 代码如下: /* */ packagecom.charles.hadoop.fs; importjava.net.URI; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FSDataInputStream; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IOUtils; /** * *Description:查看Hadoop文件系统中的文件,利用hadoopFileSystem接口中的FSDataInputStream *FSDataInputStream还具有流定位的能力,可以从文件的任意位置开始读取 * *@authorcharles.wang *@createdMay26,201212:28:49PM * */ publicclassReadFromHadoopFileSystem{ /** *@paramargs */ publicstaticvoidmain(String[]args)throwsException{ //TODOAuto-generatedmethodstub //第一个参数传递进来的是hadoop文件系统中的某个文件的URI,以hdfs://ip的theme开头 Stringuri=args[0]; //读取hadoop文件系统的配置 Configurationconf=newConfiguration(); conf.set("hadoop.job.ugi","hadoop-user,hadoop-user"); //FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统 FileSystemfs=FileSystem.get(URI.create(uri),conf); FSDataInputStreamin=null; try{ //实验一:输出全部文件内容 System.out.println("实验一:输出全部文件内容"); //让FileSystem打开一个uri对应的FSDataInputStream文件输入流,读取这个文件 in=fs.open(newPath(uri)); //用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上 IOUtils.copyBytes(in,System.out,50,false); System.out.println(); //实验二:展示FSDataInputStream文件输入流的流定位能力,用seek进行定位 System.out.println("实验二:展示FSDataInputStream文件输入流的流定位能力,用seek进行定位"); //假如我们要吧文件输出3次 //第一次输入全部内容,第二次输入从第20个字符开始的内容,第3次输出从第40个字符开始的内容 for(inti=1;i<=3;i++){ in.seek(0+20*(i-1)); System.out.println("流定位第"+i+"次:"); IOUtils.copyBytes(in,System.out,4096,false); } }finally{ IOUtils.closeStream(in); } } } 我们传入的命令行参数为我们要读的HDFS文件系统中某文件的URI: hdfs://192.168.129.35:9000/user/hadoop-user/textfile.txt 最终输出结果为: 实验一:输出全部文件内容 Thisisatextfileeditedbycharlestotestthehadoopdistributedfilesystem'sfeatures. 实验二:展示FSDataInputStream文件输入流的流定位能力,用seek进行定位 流定位第1次: Thisisatextfileeditedbycharlestotestthehadoopdistributedfilesystem'sfeatures. 流定位第2次: editedbycharlestotestthehadoopdistributedfilesystem'sfeatures. 流定位第3次: 写操作: 我们用FSDataOutputStream来写文件到HDFS系统中,或者说从本地文件系统中复制文件到HDFS文件系统中。其中这个本地文件系统是相对于运行这段java代码的宿主系统。 代码如下: /* */ packagecom.charles.hadoop.fs; importjava.io.BufferedInputStream; importjava.io.FileInputStream; importjava.io.InputStream; importjava.io.OutputStream; importjava.net.URI; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IOUtils; importorg.apache.hadoop.util.Progressable; /** * *Description:这个类展示如何将一个文件从本地文件系统复制到HDFS * *@authorcharles.wang *@createdMay26,20121:00:39PM * */ publicclassWriteToHadoopFileSystem{ /** *@paramargs */ publicstaticvoidmain(String[]args)throwsException{ //TODOAuto-generatedmethodstub //两个参数分别是本地文件系统的的输入文件路径和HDFS中的输出文件位置 //如果这段代码最终运行在Hadoop所在的服务器上,那么本地文件系统是相对于那台服务器的本地文件系统 //如果这段代码运行在我们WindowsPC上,那么本地文件系统是这台WindowPC的文件系统 StringlocalSrc=args[0]; Stringdst=args[1]; //因为本地文件系统是基于java.io包的,所以我们创建一个本地文件输入流 InputStreamin=newBufferedInputStream(newFileInputStream(localSrc)); //读取hadoop文件系统的配置 Configurationconf=newConfiguration(); conf.set("hadoop.job.ugi","hadoop-user,hadoop-user"); //仍然用FileSystem和HDFS打交道 //获得一个对应HDFS目标文件的文件系统 FileSystemfs=FileSystem.get(URI.create(dst),conf); //创建一个指向HDFS目标文件的输出流 OutputStreamout=fs.create(newPath(dst)); //用IOUtils工具将文件从本地文件系统复制到HDFS目标文件中 IOUtils.copyBytes(in,out,4096,true); System.out.println("复制完成"); } } 我们传入2个命令行参数,一个是本地文件系统中被复制的文件路径,第二个要复制到的HDFS文件系统中的目标文件路径: copyMe.txt hdfs://192.168.129.35:9000/user/hadoop-user/copyMe.txt 我们去文件系统中去检查文件,果然文件被复制上去了: 打开这个目标文件,果然内容与预期一样: 本文转自 charles_wang888 51CTO博客,原文链接:http://blog.51cto.com/supercharles888/878921,如需转载请自行联系原作者

优秀的个人博客,低调大师

HanLP代码与词典分离方案与流程

之前在spark环境中一直用的是portable版本,词条数量不是很够,且有心想把jieba,swcs词典加进来, 其他像ik,ansi-seg等分词词典由于没有词性并没有加进来. 本次修改主要是采用jar包方包将词典目录 data与hanlp.properties合成一个data.jar文件. 1. pom.xml 过滤资源文件的配置 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>${maven-jar-plugin.version}</version> <configuration> <excludes> <exclude>**/*.properties</exclude> </excludes> </configuration> </plugin> 这里把properties文件从jar包文件中去掉,因而结果文件是没有properties文件的. 可根据需要来确定是否把properties加入jar包中.由于我打算把hanlp.properties与词典目录写在一起 这里是要过滤掉hanlp.properties文件 2. 修改hanlp.properties文件 root= #将根目录置为空,或者注释掉root CustomDictionaryPath=data/dictionary/custom/CustomDictionary.txt; scws.txt; jieba.txt; 现代汉语补充词库.txt; 全国地名大全.txt ns; 人名词典.txt; 机构名词典.txt; 上海地名.txt ns;data/dictionary/person/nrf.txt nrf; #增加更多的配置文件,这里增加了结巴分词,scws分词 #IOAdapter=com.hankcs.hanlp.corpus.io.FileIOAdapter IOAdapter=com.hankcs.hanlp.corpus.io.JarIOAdapter #修改IOAdapter,以便使用jar包形式加载词典 3. 修改HanLP.java if ( root.length() != 0 && !root.endsWith("/")) root += "/"; 当root的长度为0时,不用在root字符串后面添加'/' 4. 增加处理词典jar包的代码文件: JarIOAdapter.java package com.hankcs.hanlp.corpus.io; import java.io.*; /** * 基于普通文件系统的IO适配器 * * @author hankcs */ public class JarIOAdapter implements IIOAdapter { @Override public InputStream open(String path) throws FileNotFoundException { /* 采用第一行的方式加载资料会在分布式环境报错 改用第二行的方式 */ //return ClassLoader.getSystemClassLoader().getResourceAsStream(path); return JarIOAdapter.class.getClassLoader().getResourceAsStream(path); } @Override public OutputStream create(String path) throws FileNotFoundException { return new FileOutputStream(path); } } 在跑DemoStopWord时,发现 java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoStopWord 报错,原因是接口不统一导致. 修改 DMAG.java如下: public MDAG(File dataFile) throws IOException { BufferedReader dataFileBufferedReader = new BufferedReader(new InputStreamReader(IOAdapter == null ? new FileInputStream(dataFile) : //IOAdapter.open(dataFile.getAbsolutePath()) IOAdapter.open(dataFile.getPath()) , "UTF-8")); 即可. 5. 如何将词典与配置文件打成一个jar包 最好是把txt格式的文件做成bin或dat格式的文件,然后做成jar包,否则打包运行后无法再写成bin或dat格式文件. 简单的办法是跑一下示例,即可生成相应的bin或dat格式文件. java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoAtFirstSight java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoChineseNameRecognition java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoJapaneseNameRecognition java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoPinyin java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoPlaceRecognition java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoOrganizationRecognition java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoTokenizerConfig #命名实体识别,包括上面的人名,地名等 java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoTraditionalChinese2SimplifiedChinese java -cp .:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoStopWord 或者用以下shell脚本完成 :>a;while read cl; do echo $cl; echo "=========="$cl"=======" >>a;java -cp .:test.jar:hanlp-1.3.2.jar $cl 1>> a 2>&1;done < <(jar tvf test.jar | awk '$(NF)~"Demo"{print $(NF)}' | sed 's/.class$//;s/\//./g') 我们把data目录与hanlp.properties文件放在一个目录,比如xxx目录 cd xxx jar cvf data.jar . 即可生成data.jar包 6. 如何运行 [dxp@Flyme-SearchTag-32-220 makeNewDict]$ ls data.jar hanlp-1.3.2.jar README.md test test.jar [dxp@Flyme-SearchTag-32-220 makeNewDict]$ java -cp data.jar:hanlp-1.3.2.jar:test.jar com.hankcs.demo.DemoAtFirstSight 7. 在spark中应用 IDE如(intellij idea)中maven项目 引入以下依赖: <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>1.3.2</version> <scope>system</scope> <systemPath>${LocalPath}/hanlp-1.3.2.jar</systemPath> </dependency> spark-submit提交任务时增加 --jar hanlp-1.3.2.jar,data.jar 转载自cicido的个人空间

优秀的个人博客,低调大师

计算与存储分离实践—swift消息系统

1. 相关背景 搜索事业部与计算平台事业部目前使用消息队列主要有以下三种场景: 1. 每天有上万张表需要通过Build Service来构建索引。这些表主要来自主搜索,IGRAPH,Rank Service等业务,且每个表包含的文档数差别很大。总数据量为PB级别,总文档数达万亿级。文档的大小不一,小到几十Byte大到几百KB。在Build Service内部,文档处理与索引构建需要一个消息队列来传送消息。因此在build时,容易产生突发大流量(几百G/秒,几千万条/秒)持续消息写入与读取。 2. 搜索的在线服务如主搜索查询服务,RankService打分服务或IGRAPH服务需要毫秒级的实时文档更新。这些服务引擎基本上是多行多列结构,即每一行是一个完整的服务单元,由多台机器组成,多行提升服务

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。