首页 文章 精选 留言 我的

精选列表

搜索[API集成],共10000篇文章
优秀的个人博客,低调大师

MapReduce、Hbase接口API实践

读取hdfs中文件并做处理,取出卡号,通过卡号连接hbase查询出对应客户号,写入redis,因为不用输出,所以不调用context.write方法,整个操作在一个map中便可完成 protected HTable connect //setup方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高! protected void setup(Context context) throws IOExcption,InterruptedException{ super.setup(context) String jobName = context.getJobName(); //文件索引值 cartNoIndex = conf.get(jobName + "source.key","7"); //创建hbase连接,hbase-site.xml配置文件需要在jar包中 Configuration config = HBaseConfiguration.create(); connect = new HTable(config,"tableName") } protected void map(writable key,Text value,Context context){ if(value == null || value.toString().trim().isEmpty()){ //计数器,记录处理的条数 context.getCounter(....).increment(1); }else{ String[] values = Utils.split(value,separator,true); //业务逻辑处理 int i = Integer.parseInt(cartNoIndex); if(i<values.length){ cardNo = values[i]; }else{ logger.error("cardNo cannot find"); } //从hbase中查询出对应客户号 String rowkey = HTableManager.generatRowkey(cardNo); Get getResult = new Get(rowkey.getBytes()); Result rs = connect.get(getResult); String curNo = Bytes.toString(rs.getValue("f1".getBytes(),"column_name".getBtes());RedisClient.getRedisClient().zincrbyset("spending:rank",countNum,custNo);protected void cleanup(context context)throws IOException,InterruptedException{ super.cleanup(context); connect.close();} public static String[] split(String value,String separator,boolean trimSpace){ String[] rtn = split(value.separator); if(trimSpace && rtn != null){ for(int i=0;i<rtn.length;i++){ rtn[i] = rtn[i].trim(); } } return rtn; } public static String[] split(String value,String separator){ String[] rtn = null; if(value != null){ boolean endBlank = false; if(value.endsWith(separator)){ value +=" "; endBlank = true; } separator = escapeExprSpecialWord(deparator); if(endBlank){ rtn(rtn.length-1) = ""; } } return rtn; } public static String escapeExprSpecialWord(String keyWord){ if(keyword != null && !keyword.isEmpty()){ String[] fbsArr = {"\\","|","(",")"}; for(String key : fbsArr){ if(keyword.contains(key){ keyword = keyword.replace(key,"\\"+key); } } } return keyword; }

优秀的个人博客,低调大师

MapReduce API 基本概念

1.序列化 序列化是指将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程。反序列化指的是将字节流转为结构化对象的过程。 在 Hadoop MapReduce 中, 序列化的主 要 作用有两个: 永久存储和进程间通信。为了能够读取或者存储 Java 对象, MapReduce 编程模型要求用户输入和输出数据中的 key 和 value 必须是可序列化的。 在 Hadoop MapReduce 中 , 使一个 Java 对象可序列化的方法是让其对应的类实现 Writable 接口 。 但对于 key 而言,由于它是数据排序的关键字, 因此还需要提供比较两个 key 对象的方法。 为此,key对应类需实现WritableComparable 接口 , 它的类如图: 在package org.apache.hadoop.io 中的WritableComparable.java文件中定义: public interface WritableComparable<T> extends Writable, Comparable<T> { } 再来看看Writable接口的定义: public interface Writable { /** * Serialize the fields of this object to <code>out</code>. * * @param out <code>DataOuput</code> to serialize this object into. * @throws IOException */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param in <code>DataInput</code> to deseriablize this object from. * @throws IOException */ void readFields(DataInput in) throws IOException; } 可以很明显的看出,write(DataOutput out)方法的作用是将指定对象的域序列化为out相同的类型;readFields(DataInput in)方法的作用是将in对象中的域反序列化,考虑效率因素,实现接口的时候应该使用已经存在的对象存储。 DataInput接口定义源代码如下: public interface DataInput { void readFully(byte b[]) throws IOException; void readFully(byte b[], int off, int len) throws IOException; int skipBytes(int n) throws IOException; boolean readBoolean() throws IOException; byte readByte() throws IOException; int readUnsignedByte() throws IOException; short readShort() throws IOException; int readUnsignedShort() throws IOException; char readChar() throws IOException; int readInt() throws IOException; long readLong() throws IOException; float readFloat() throws IOException; double readDouble() throws IOException; String readLine() throws IOException; String readUTF() throws IOException; } 每个方法的含义差不多,具体可参见java jdk源码 DataOutput接口定义源代码如下: public interface DataOutput { void write(int b) throws IOException; void write(byte b[]) throws IOException; void write(byte b[], int off, int len) throws IOException; void writeBoolean(boolean v) throws IOException; void writeByte(int v) throws IOException; void writeShort(int v) throws IOException; void writeChar(int v) throws IOException; void writeInt(int v) throws IOException; void writeLong(long v) throws IOException; void writeFloat(float v) throws IOException; void writeDouble(double v) throws IOException; void writeBytes(String s) throws IOException; void writeChars(String s) throws IOException; void writeUTF(String s) throws IOException; } WritableComparable可以用来比较,通常通过Comparator . 在hadoop的Map-Reduce框架中任何被用作key的类型都要实现这个接口。 看一个例子: public class MyWritableComparable implements WritableComparable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public int compareTo(MyWritableComparable w) { int thisValue = this.value; int thatValue = ((IntWritable)o).value; return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } } 2.Reporter 参数 Reporter 是 MapReduce 提供给应用程序的工具。 如图所示,应用程序可使用Reporter 中的方法报告完成进度(progress)、设定状态消息(setStatus 以及更新计数器( incrCounter)。 Reporter 是一个基础参数。 MapReduce 对外提供的大部分组件, 包括 InputFormat、Mapper 和 Reducer 等,均在其主要方法中添加了该参数。 3.回调机制 回调机制是一种常见的设计模式。它将工作流内的某个功能按照约定的接口暴露给外部使用者, 为外部使用者提供数据,或要求外部使用者提供数据。 Hadoop MapReduce 对外提供的 5 个组件( InputFormat、 Mapper、 Partitioner、 Reducer 和OutputFormat) 实际上全部属于回调接口 。 当用户按照约定实现这几个接口后, MapReduce运行时环境会自 动调用它们。如图所示,MapReduce 给用户暴露了接口 Mapper, 当用户按照自己的应用程序逻辑实现自己的 MyMapper 后,Hadoop MapReduce 运行时环境会将输入数据解析成 key/value 对, 并调用 map() 函数迭代处理。

优秀的个人博客,低调大师

Java 封装 HDFS API 操作

代码下载地址:点击下载 一:环境介绍 hadoop:2.6 Ubuntu:15.10 eclipse:3.8.1 二:操作包括 判断某个文件夹是否存在 isExist(folder); 创建文件夹 mkdir(folder); 删除文件夹 rmr(folder); 列出所有文件夹 ls(folder); 递归列出所有文件夹 lsr(folder); 上传文件 put(local, folder); 下载文件 get(folder,local1); 删除文件 rm(folder); 显示文件 cat(folder); 三:代码演示 package user_thing_tuijian; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class hdfsGYT { private static final String HDFS = "hdfs://127.0.0.1:9000/"; public hdfsGYT(String hdfs, Configuration conf ){ this.hdfsPath = hdfs; this.conf = conf; } public hdfsGYT() { // TODO Auto-generated constructor stub } private String hdfsPath; private Configuration conf = new Configuration() ; public static void main(String[] args) throws IOException, URISyntaxException{ hdfsGYT hdfsgyt = new hdfsGYT(); String folder = HDFS + "mr/groom_system/small2.csv"; String local = "/home/thinkgamer/Java/hadoop_shizhan/src/user_thing_tuijian/small2.csv"; String local1 = "/home/thinkgamer/Java/hadoop_shizhan/src/user_thing_tuijian"; //判断某个文件夹是否存在 //hdfsgyt.isExist(folder); //创建文件夹 //hdfsgyt.mkdir(folder); //删除文件夹 //hdfsgyt.rmr(folder); //列出所有文件夹 //hdfsgyt.ls(folder); //递归列出所有文件夹 //hdfsgyt.lsr(folder); //上传文件 //hdfsgyt.put(local, folder); //下载文件 //hdfsgyt.get(folder,local1); //删除文件 //hdfsgyt.rm(folder); //显示文件 //hdfsgyt.cat(folder); } //显示文件 private void cat(String folder) throws IOException, URISyntaxException { // 与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); FSDataInputStream fsdis = null; System.out.println("cat: " + folder); try { fsdis =fs.open(path); IOUtils.copyBytes(fsdis, System.out, 4096, false); } finally { IOUtils.closeStream(fsdis); fs.close(); } } //删除文件 private void rm(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); if(fs.deleteOnExit(path)){ fs.delete(path); System.out.println("delete:" + folder); }else{ System.out.println("The fiel is not exist!"); } fs.close(); } //下载文件 private void get(String remote, String local) throws IllegalArgumentException, IOException, URISyntaxException { // 建立联系 FileSystem fs = FileSystem.get(new URI(HDFS), new Configuration()); fs.copyToLocalFile(new Path(remote), new Path(local)); System.out.println("Get From : " + remote + " To :" + local); fs.close(); } //上传文件 private void put(String local, String remote) throws IOException, URISyntaxException { // 建立联系 FileSystem fs = FileSystem.get(new URI(HDFS), new Configuration()); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("Put :" + local + " To : " + remote); fs.close(); } //递归列出所有文件夹 private void lsr(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); //得到该目录下的所有文件 FileStatus[] fileList = fs.listStatus(path); for (FileStatus f : fileList) { System.out.printf("name: %s | folder: %s | size: %d\n", f.getPath(), f.isDir() , f.getLen()); try{ FileStatus[] fileListR = fs.listStatus(f.getPath()); for(FileStatus fr:fileListR){ System.out.printf("name: %s | folder: %s | size: %d\n", fr.getPath(), fr.isDir() , fr.getLen()); } }finally{ continue; } } fs.close(); } //列出所有文件夹 private void ls(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); //得到该目录下的所有文件 FileStatus[] fileList = fs.listStatus(path); for (FileStatus f : fileList) { System.out.printf("name: %s | folder: %s | size: %d\n", f.getPath(), f.isDir() , f.getLen()); } fs.close(); } //删除文件夹 private void rmr(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); fs.delete(path); System.out.println("delete:" + folder); fs.close(); } //创建文件夹 public void mkdir(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); }else{ System.out.println("it is have exist:" + folder); } fs.close(); } //判断某个文件夹是否存在 private void isExist(String folder) throws IOException, URISyntaxException { //与hdfs建立联系 FileSystem fs = FileSystem.get(new URI(HDFS),new Configuration()); Path path = new Path(folder); if(fs.exists(path)){ System.out.println("it is have exist:" + folder); }else{ System.out.println("it is not exist:" + folder); } fs.close(); } }

优秀的个人博客,低调大师

Lazarus 3.0 发布,Pascal 集成开发环境

Lazarus是以Free Pascal编译器为基础的 Pascal 语言的整合开发环境(IDE),和Delphi高度兼容,被视作后者的开源替代品。目前已有中文界面。 3.0 版本已经发布,这个版本是用 Free Pascal 3.2.2 构建的,上一个版本 Lazarus2.2.6 也是用 Free Pascal 3.2.2 构建的,以下是 Lazarus 和 Free Pascal 的更改列表: Lazarus 3.0发布说明 Free Pascal 3.2.2发布详情 视频演示: https://video.getlazarus.org/tutorial-intro.mp4 选择属于自己的 CPU、操作系统、发行版,然后选择 “Lazarus 3.0” 目录,下载地址: https://www.lazarus-ide.org/index.php?page=checksums#3_0

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册