graphx初涉,结合源码学习一
Graphx中的重要概念
graph
1.graph成员变量有:vertices,edges,triplets.
2.在triplets中,同时记录着edge和vertex
成员函数
函数分成几大类
- 对所有顶点或边的操作,但不改变图结构本身,如mapEdges, mapVertices
- 子图,类似于集合操作中的filter subGraph
- 图的分割,即paritition操作,这个对于Spark计算来说,很关键,正是因为有了不同的Partition,才有了并行处理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash来将整个图分成多个区域。
- outerJoinVertices 顶点的外连接操作
图的运算和操作 GraphOps
图的常用算法是集中抽象到GraphOps这个类中,在Graph里作了隐式转换,将Graph转换为GraphOps,源码如下:
/*** Implicitly extracts the [[GraphOps]] member from a graph.** To improve modularity the Graph type only contains a small set of basic operations.* All the convenience operations are defined in the [[GraphOps]] class which may be* shared across multiple graph implementations.*/implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
支持的操作如下
- collectNeighborIds
- collectNeighbors
- collectEdges
- joinVertices
- filter
- pickRandomVertex
- pregel
- pageRank
- staticPageRank
- connectedComponents
- triangleCount
- stronglyConnectedComponents
源码如下:
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.spark.graphximport scala.reflect.ClassTag
import scala.util.Randomimport org.apache.spark.SparkException
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDDimport org.apache.spark.graphx.lib._/*** Contains additional functionality for [[Graph]]. All operations are expressed in terms of the* efficient GraphX API. This class is implicitly constructed for each Graph object.** @tparam VD the vertex attribute type* @tparam ED the edge attribute type*/
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {/** The number of edges in the graph. */@transient lazy val numEdges: Long = graph.edges.count()/** The number of vertices in the graph. */@transient lazy val numVertices: Long = graph.vertices.count()/*** The in-degree of each vertex in the graph.* @note Vertices with no in-edges are not returned in the resulting RDD.*/@transient lazy val inDegrees: VertexRDD[Int] =degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")/*** The out-degree of each vertex in the graph.* @note Vertices with no out-edges are not returned in the resulting RDD.*/@transient lazy val outDegrees: VertexRDD[Int] =degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")/*** The degree of each vertex in the graph.* @note Vertices with no edges are not returned in the resulting RDD.*/@transient lazy val degrees: VertexRDD[Int] =degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")/*** Computes the neighboring vertex degrees.** @param edgeDirection the direction along which to collect neighboring vertex attributes*/private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {if (edgeDirection == EdgeDirection.In) {graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)} else if (edgeDirection == EdgeDirection.Out) {graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)} else { // EdgeDirection.Eithergraph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,TripletFields.None)}}/*** Collect the neighbor vertex ids for each vertex.** @param edgeDirection the direction along which to collect* neighboring vertices** @return the set of neighboring ids for each vertex*/def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {val nbrs =if (edgeDirection == EdgeDirection.Either) {graph.aggregateMessages[Array[VertexId]](ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },_ ++ _, TripletFields.None)} else if (edgeDirection == EdgeDirection.Out) {graph.aggregateMessages[Array[VertexId]](ctx => ctx.sendToSrc(Array(ctx.dstId)),_ ++ _, TripletFields.None)} else if (edgeDirection == EdgeDirection.In) {graph.aggregateMessages[Array[VertexId]](ctx => ctx.sendToDst(Array(ctx.srcId)),_ ++ _, TripletFields.None)} else {throw new SparkException("It doesn't make sense to collect neighbor ids without a " +"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")}graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[VertexId])}} // end of collectNeighborIds/*** Collect the neighbor vertex attributes for each vertex.** @note This function could be highly inefficient on power-law* graphs where high degree vertices may force a large amount of* information to be collected to a single location.** @param edgeDirection the direction along which to collect* neighboring vertices** @return the vertex set of neighboring vertex attributes for each vertex*/def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {val nbrs = edgeDirection match {case EdgeDirection.Either =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => {ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))},(a, b) => a ++ b, TripletFields.All)case EdgeDirection.In =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),(a, b) => a ++ b, TripletFields.Src)case EdgeDirection.Out =>graph.aggregateMessages[Array[(VertexId, VD)]](ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),(a, b) => a ++ b, TripletFields.Dst)case EdgeDirection.Both =>throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +"EdgeDirection.Either instead.")}graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])}} // end of collectNeighbor/*** Returns an RDD that contains for each vertex
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
