Graphx处理janusGraph数据实现
声明:
此方案是在spark直接执行gremlinSQL方案实现受阻的情况下的备选方案,不涉及工作机密,不存在泄密可能,纯属个人思考,希望抛砖引玉
方案:
将gremlinSql的查询结果转化为startGraph,然后转写到HDFS,spark读取hdfs的starGraphJSon构建graphx可用的图,然后就可以调用graphx丰富的图计算算法;从而将实现graphX操作janusgraph的目的
1. gremlinSql的查询结果转换成starGraphJson
由于org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter保存的graphSon格式无法满足需求,所以将查询出的带path的点边数据自己转换成单点图的json结构,转化方法如下,然后存到hdfs,存储方法不再赘述。
public StringBuilder generatorStarGraphJson(Vertex vertex, Edge edge, StringBuilder starGraphJson){ String inVId; String outVId; String VId; starGraphJson.append("{").append("\"id\":"+vertex.id()+","+"\"label\":\""+vertex.label()+"\","); //这种情况有outE和inE,outE中会有inV信息,inE会有一个inV inVId = edge.inVertex().id().toString(); outVId = edge.outVertex().id().toString(); VId = vertex.id().toString(); if(inVId.equalsIgnoreCase(VId)){ starGraphJson.append("\"outE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",") .append("\"inV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},"); }else if(outVId.equalsIgnoreCase(VId)){ starGraphJson.append("\"inE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",") .append("\"outV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},"); }else{ throw new Exception("点边不对应数据错误!!!"); } //拼接点的properties starGraphJson.append("\"properties\":{").append(concatVertexProperties(vertex)).append("}}"); return starGraphJson; }
2. spark读取指定路径的starGraph转成graph
class GraphSon2GraphXRDD() extends Serializable { def getGraphConf(HDFSFilePath : String): BaseConfiguration ={ val inputGraphConf = new BaseConfiguration inputGraphConf.setProperty("gremlin.graph", classOf\[HadoopGraph\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_GRAPH\_READER, classOf\[GraphSONInputFormat\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_INPUT\_LOCATION, HDFSFilePath) inputGraphConf.setProperty(Constants.MAPREDUCE\_INPUT\_FILEINPUTFORMAT_INPUTDIR, HDFSFilePath) inputGraphConf } def getSc(sparkHost:String ,isRemote:Boolean): SparkContext ={ var sparkConf = new SparkConf() if(isRemote){ //待完善 }else{ sparkConf.setMaster("local\[*\]").setAppName("GraphSon2GraphX") } val sc = new SparkContext(sparkConf) sc } def getJavaRDD(conf : BaseConfiguration, sc : SparkContext): JavaPairRDD\[AnyRef, VertexWritable\] ={ val jsc = JavaSparkContext.fromSparkContext(sc) val graphRDDInput = new InputFormatRDD val vertexWritableJavaPairRDD = graphRDDInput.readGraphRDD(conf, jsc) vertexWritableJavaPairRDD } def getVertexRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] ={ vertexWritableJavaPairRDD.rdd.map((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { // Get the center vertex val v = tuple2._2.get val g = StarGraph.of(v) // In case the vertex id in TinkerGraph is not long type // val vid = convertStringIDToLongID([v.id](http://v.id)().toString) val vid = [v.id](http://v.id)().toString.toLong // Pass the vertex properties to GraphX vertex value map and remain the original vertex id var graphxValueMap : util.HashMap\[String,java.io.Serializable\] = new util.HashMapString,java.io.Serializable graphxValueMap.put("originalID",[v.id](http://v.id)().toString) graphxValueMap.putAll(g.traversal.V([v.id](http://v.id)).valueMap().next(1).get(0)) (vid,graphxValueMap) }) } def getEdgeRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[graphx.Edge\[util.HashMap\[String, java.io.Serializable\]\]\] ={ val edge = vertexWritableJavaPairRDD.rdd.flatMap((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { val v = tuple2._2.get val g = StarGraph.of(v) val edgelist:util.List\[Edge\] = g.traversal.V([v.id](http://v.id)).outE().toList // Put all edges of the center vertex into the list val list = new collection.mutable.ArrayBuffer[graphx.Edge[util.HashMap[String,java.io.Serializable]]]() var x = 0 for(x <- 0 until edgelist.size()){ var srcId = edgelist.get(x).inVertex.id().toString var dstId = edgelist.get(x).outVertex.id().toString // val md1 = convertStringIDToLongID(srcId) // val md2 = convertStringIDToLongID(dstId) val md1 = srcId.toLong val md2 = dstId.toLong // Get the properties of the edge var edgeAttr = new util.HashMap[String,java.io.Serializable]() var perporties : util.Iterator[Property[Nothing]] = edgelist.get(x).properties() while(perporties.hasNext){ val property = perporties.next() edgeAttr.put(property.key(),property.value().toString) } list.append(graphx.Edge(md1,md2,edgeAttr)) } list }) val edgeRDD = edge.distinct() edgeRDD } def doLAP(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], iterationNum : Int): Array\[Array\[String\]\] = { val vertexRDD = getVertexRDD(vertexWritableJavaPairRDD) val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD) val graph = graphx.Graph[util.HashMap[String,java.io.Serializable], util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]()) val LVMRsult = lib.LabelPropagation.run(graph , iterationNum).vertices.collect.sortWith (_._1 < _._1).map(f => { println(f.toString()) f}) getFinalCommunit(LVMRsult) } def getFinalCommunit(LVMRsult:Array\[(Long,Long)\]): Array\[Array\[String\]\] ={ var result = new Array[Array\[String\]](LVMRsult.length) var tmp = new ArrayBufferString for(i <- 0 until LVMRsult.length){ var k = 0 val array = new ArrayBufferString //社区中包含多个值 for(j <- (i+1) until LVMRsult.length) { if(LVMRsult(i)._2.equals(LVMRsult(j)._2)){ if(!tmp.contains(LVMRsult(i)._1.toString)){ array += LVMRsult(i)._1.toString tmp += LVMRsult(i)._1.toString } if(!tmp.contains(LVMRsult(j)._1.toString)){ array += LVMRsult(j)._1.toString tmp += LVMRsult(j)._1.toString } k = k+1 } } //自己为一个社区 if(k.equals(0)){ if(!tmp.contains(LVMRsult(i)._1.toString)){ array += LVMRsult(i)._1.toString tmp += LVMRsult(i)._1.toString } } if(array.length > 0){ result.update(i,array.toArray.distinct) } } result.filter(f => { println(if (f.length >0) f.mkString("(",",",")")) f != null }) } def doPageRank(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], stopThreshold : Double): Array\[Array\[Any\]\] = { val vertexRDD:RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] = getVertexRDD(vertexWritableJavaPairRDD) val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD) val graph = graphx.Graph[util.HashMap[String,java.io.Serializable], util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]()) val gpgraph = graph.pageRank(stopThreshold).cache() val titleAndPrGraph = graph.outerJoinVertices(gpgraph.vertices) { (v, title, rank) => (rank.getOrElse(0.0), title) } //倒序 false 正序 true // titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) => entry.\_2.\_1, false).foreach(f => println(f.\_1+":"+f.\_2._1)) val pageRank = titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) => entry._2._1, false).map(f => { println(f._1+":"+f._2._1) Array(f._1.toString,f._2._1) }) pageRank.collect() } }
这样就贯通了janusgraph和graphx,调用graphx的丰富的图计算功能就畅通无阻,就是实现有点挫,希望抛砖引玉
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Caused by: com.ctc.wstx.exc.WstxParsingException: Received non-all-whi...
java client 调用webservice, 返回以下错误: 返回message: <env:Envelope xmlns:env="http://schemas.xmlsoap.org/soap/envelope/"><env:Header/><env:Body><env:Fault>java.lang.Exception: Invalid SOAP formart for xml! 日志: Caused by: com.ctc.wstx.exc.WstxParsingException: Received non-all-whitespace CHARACTERS or CDATA event in nextTag(). at [row,col {unknown-source}]: [3,528] 原因是发送的message中缺少了 <env:Header/> soap envelope schema定义中, header似乎是任意的, 可以没有.
- 下一篇
阿里云服务器Centos 7.4配置之安装JDK
准备工作:下载工具 下载ssh工具Xftp6 ,Xshell6工具,并运行SSH Xftp6下载jdk1.8(linux版)也可以自行百度,进行下载。至于使用 Xshell 6 连接阿里云服务器,可查看我另一篇文章:(二)使用SSH 工具 Xshell 6连接阿里云服务CentOS 7.4直接开始正文!第一步: 首先将下载好的JDK安装包通过xftp上传到服务器,可自定义一个目录存(/home/temp)放并进行解压,执行命令行如下:mkdir /home/temp使用 Xftp6 点击此处调用 Xshell 6运行下面的命令tar -zxvf jdk-8u191-linux-x64.tar.gz # 解压到本地文件夹命令为 tar -zxvf 【JDK版本】.tar.gz 第二步:将解压后的文件jdk1.8.0191 移到/usr/local(其他文件夹也可以),运行下面的命令:sudo mv jdk1.8.0191 /usr/local(建议不要在 /usr/local下面新建java文件夹,再把jdk放在java里面这样会导致安装败。我是在usr下新建的java文件夹)第三步: ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Hadoop3单机部署,实现最简伪集群
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Docker使用Oracle官方镜像安装(12C,18C,19C)