Advanced Graph Algorithms in Spark Using GraphX Aggregated Messages And Collective Communication Techniques

GraphX

GraphX is a distributed graph processing framework on top of Apache Spark.

  • Ease of programming. By considering vertices and edges as first-class citizens, programmers can design their algorithms using properties attached to edges/nodes.
  • Memory Efficient: Most of the graph algorithms can be completed using a space complexity of O(V+E), distributed over the cluster, thus reducing memory burden on computational nodes and scales with the size of the graph.
  • Efficiency on graph problems. It supports iterative computations more efficiently than MapReduce because it keeps the dataset in memory rather than writing to disk after every iteration. Since most of the graph algorithms are iterative. It also handles the fact that graph algorithms generally have poor memory access locality, by locating different vertices on different machines and passing messages between machines as necessary.
// With Syntactic sugar
private def inDegreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = ctx.sendToDst(1)
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}
// Without syntactic sugar
private def inDegreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(_.sendToDst(1), _+_, TripletFields.None)
}
// With Syntactic sugar
private def outDegreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = ctx.sendToSrc(1)
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}
// Without syntactic sugar
private def outDegreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(_.sendToSrc(1), _+_, TripletFields.None)
}
// With Syntactic sugar
private def degreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = {
ctx.sendToSrc(1), ctx.sendToDst(1)}
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}
// Without syntactic sugar
private def degreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(
{ _.sendToSrc(1),_.sendToDst(1)},
_+_,
TripletFields.None)
}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Graph, VertexRDD, Edge => GXEdge}
import org.apache.spark.sql.types.{IntegerType, LongType}
import org.graphframes.GraphFrame

/**
* Created by greddy on 09/21/16.
*/

object Hits {

case class VertexAttr(srcId: Long, authScore: Double, hubScore:Double)

case class EdgeAttr(srcId: Long, dstId: Long)

case class HitsMsg(authScore:Double, hubScore:Double)

def reducer(a:HitsMsg,b:HitsMsg):HitsMsg = HitsMsg(a.authScore + b.authScore, a.hubScore + b.hubScore)

def runHits(g: GraphFrame, maxIter:Int = 10): GraphFrame = {

val gx0 = g.toGraphX

val vColsMap = g.vertexColumnMap
val eColsMap = g.edgeColumnMap

// Convert vertex attributes to nice case classes.
// Initialize each node with hubScore = 1 and authScore = 1
val gx1: Graph[VertexAttr, Row] = gx0.mapVertices { case (_, attr) =>
VertexAttr(attr.getLong(vColsMap("id")), authScore = 1.0, hubScore = 1.0)
}

// Convert edge attributes to nice case classes.
val extractEdgeAttr: (GXEdge[Row] => EdgeAttr) = { e =>
val src = e.attr.getLong(eColsMap("src"))
val dst = e.attr.getLong(eColsMap("dst"))
EdgeAttr(src, dst)
}

var gx: Graph[VertexAttr, EdgeAttr] = gx1.mapEdges(extractEdgeAttr)
for (iter <- Range(1,maxIter)) {
val totalHubScores = gx.vertices
val msgs: VertexRDD[HitsMsg] = gx.aggregateMessages(
ctx =>
// Can send to source or destination since edges are treated as undirected.
{
ctx.sendToDst(HitsMsg(0.0,ctx.srcAttr.hubScore));
ctx.sendToSrc(HitsMsg(ctx.dstAttr.authScore,0.0))
}, reducer)

// Update authority and hub scores of each node
gx = gx.outerJoinVertices(msgs) {
case (vID, vAttr, optMsg) => {
val msg = optMsg.getOrElse(HitsMsg(1.0, 1.0))
VertexAttr(vAttr.srcId, if (msg.authScore == 0.0) 1.0 else msg.authScore , if (msg.hubScore == 0.0) 1.0 else msg.hubScore)
}
}
//println("Iter ", iter)
}

// Convert back to GraphFrame with a new column "belief" for vertices DataFrame.
// Inorder to deal with disconnected components
val gxFinal: Graph[(Double,Double), Unit] = gx.mapVertices((_, attr) => (attr.authScore, attr.hubScore) )
.mapEdges( _ => ())

GraphFrame.fromGraphX(g, gxFinal, vertexNames = Seq("auth", "hub"))

}
object PageRank {

case class VertexAttr(srcId: Long, outDegree: Int, pageScore:Double)

case class PageMsg(pageScore:Double)

def reducer(a:PageMsg,b:PageMsg):PageMsg= PageMsg(a.pageScore + b.pageScore)

def runPageRank(g: GraphFrame, resetProb:Double = 0.2, maxIter:Int = 10)
: GraphFrame = {

val gx0 = g.toGraphX

val vColsMap = g.vertexColumnMap
val eColsMap = g.edgeColumnMap


// Convert vertex attributes to nice case classes.
// Initialize each node with hubScore = 1 and authScore = 1
val gx1: Graph[VertexAttr, Row] = gx0.mapVertices { case (_, attr) =>
VertexAttr(attr.getLong(vColsMap("id")), attr.getInt(vColsMap("outDegree")), resetProb)
}

val extractEdgeAttr: (GXEdge[Row] => EdgeAttr) = { e =>
val src = e.attr.getLong(eColsMap("src"))
val dst = e.attr.getLong(eColsMap("dst"))
EdgeAttr(src, dst)
}

var gx: Graph[VertexAttr, EdgeAttr] = gx1.mapEdges(extractEdgeAttr)

for (iter <- Range(1,maxIter)) {

val msgs: VertexRDD[PageMsg] = gx.aggregateMessages (
ctx =>
ctx.sendToDst(PageMsg(ctx.srcAttr.pageScore / ( math.max(ctx.srcAttr.outDegree, 1)))),
reducer )

// Update page rank scores of each node
gx = gx.outerJoinVertices(msgs) {
case (vID, vAttr, optMsg) => {
val msg = optMsg.getOrElse(PageMsg(0.0))
VertexAttr(vAttr.srcId, vAttr.outDegree , resetProb + (1.0 - resetProb)*msg.pageScore)
}
}
println("Iter ", iter)
}

// Convert back to GraphFrame with a new column "belief" for vertices DataFrame.
// Inorder to deal with disconnected components
val gxFinal: Graph[Double, Unit] = gx.mapVertices((_, attr) => attr.pageScore )
.mapEdges( _ => ())

//gxFinal.edges.foreach(println)
gxFinal.vertices.foreach(println)
GraphFrame.fromGraphX(g, gxFinal, vertexNames = Seq("pageRank"))
}
If you like my work, buy me a coffee.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store