graphx初涉,结合源码学习一

Graphx中的重要概念

graph

1.graph成员变量有:vertices,edges,triplets.

2.在triplets中,同时记录着edge和vertex

成员函数

函数分成几大类

  1. 对所有顶点或边的操作,但不改变图结构本身,如mapEdges, mapVertices
  2. 子图,类似于集合操作中的filter subGraph
  3. 图的分割,即paritition操作,这个对于Spark计算来说,很关键,正是因为有了不同的Partition,才有了并行处理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash来将整个图分成多个区域。
  4. outerJoinVertices 顶点的外连接操作

图的运算和操作 GraphOps

图的常用算法是集中抽象到GraphOps这个类中,在Graph里作了隐式转换,将Graph转换为GraphOps,源码如下:

/*** Implicitly extracts the [[GraphOps]] member from a graph.** To improve modularity the Graph type only contains a small set of basic operations.* All the convenience operations are defined in the [[GraphOps]] class which may be* shared across multiple graph implementations.*/implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops

支持的操作如下

  1. collectNeighborIds
  2. collectNeighbors
  3. collectEdges
  4. joinVertices
  5. filter
  6. pickRandomVertex
  7. pregel
  8. pageRank
  9. staticPageRank
  10. connectedComponents
  11. triangleCount
  12. stronglyConnectedComponents

源码如下:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.spark.graphximport scala.reflect.ClassTag
import scala.util.Randomimport org.apache.spark.SparkException
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDDimport org.apache.spark.graphx.lib._/*** Contains additional functionality for [[Graph]]. All operations are expressed in terms of the* efficient GraphX API. This class is implicitly constructed for each Graph object.** @tparam VD the vertex attribute type* @tparam ED the edge attribute type*/
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {/** The number of edges in the graph. */@transient lazy val numEdges: Long = graph.edges.count()/** The number of vertices in the graph. */@transient lazy val numVertices: Long = graph.vertices.count()/*** The in-degree of each vertex in the graph.* @note Vertices with no in-edges are not returned in the resulting RDD.*/@transient lazy val inDegrees: VertexRDD[Int] =degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")/*** The out-degree of each vertex in the graph.* @note Vertices with no out-edges are not returned in the resulting RDD.*/@transient lazy val outDegrees: VertexRDD[Int] =degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")/*** The degree of each vertex in the graph.* @note Vertices with no edges are not returned in the resulting RDD.*/@transient lazy val degrees: VertexRDD[Int] =degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")/*** Computes the neighboring vertex degrees.** @param edgeDirection the direction along which to collect neighboring vertex attributes*/private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {if (edgeDirection == EdgeDirection.In) {graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)} else if (edgeDirection == EdgeDirection.Out) {graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)} else { // EdgeDirection.Eithergraph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,TripletFields.None)}}/*** Collect the neighbor vertex ids for each vertex.** @param edgeDirection the direction along which to collect* neighboring vertices** @return the set of neighboring ids for each vertex*/def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {val nbrs =if (edgeDirection == EdgeDirection.Either) {graph.aggregateMessages[Array[VertexId]](ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },_ ++ _, TripletFields.None)} else if (edgeDirection == EdgeDirection.Out) {graph.aggregateMessages[Array[VertexId]](ctx => ctx.sendToSrc(Array(ctx.dstId)),_ ++ _, TripletFields.None)} else if (edgeDirection == EdgeDirection.In) {graph.aggregateMessages[Array[VertexId]](ctx => ctx.sendToDst(Array(ctx.srcId)),_ ++ _, TripletFields.None)} else {throw new SparkException("It doesn't make sense to collect neighbor ids without a " +"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")}graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[VertexId])}} // end of collectNeighborIds/*** Collect the neighbor vertex attributes for each vertex.** @note This function could be highly inefficient on power-law* graphs where high degree vertices may force a large amount of* information to be collected to a single location.** @param edgeDirection the direction along which to collect* neighboring vertices** @return the vertex set of neighboring vertex attributes for each vertex*/def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {val nbrs = edgeDirection match {case EdgeDirection.Either =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => {ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))},(a, b) => a ++ b, TripletFields.All)case EdgeDirection.In =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),(a, b) => a ++ b, TripletFields.Src)case EdgeDirection.Out =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),(a, b) => a ++ b, TripletFields.Dst)case EdgeDirection.Both =>throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +"EdgeDirection.Either instead.")}graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])}} // end of collectNeighbor/*** Returns an RDD that contains for each vertex 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部