8000 Add Graphframe Type Degree Methods by joelrobin18 · Pull Request #742 · graphframes/graphframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions core/src/main/scala/org/graphframes/GraphFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,97 @@ class GraphFrame private (
.agg(count("*").cast("int").as("degree"))
}

/**
* The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
* - [[GraphFrame.ID]] the ID of the vertex
* - "outDegrees" a struct with a field for each edge type, storing the out-degree count
*
* @param edgeTypeCol
* Name of the column in edges DataFrame that contains edge types
* @param edgeTypes
* Optional sequence of edge type values. If None, edge types will be discovered
* automatically.
* @group degree
*/
def typeOutDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = {
val pivotDF = edgeTypes match {
case Some(types) =>
edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol, types)
case None =>
edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol)
}
val countDF = pivotDF.agg(count(lit(1))).na.fill(0)
val structCols = countDF.columns
.filter(_ != ID)
.map { colName =>
col(colName).cast("int").as(colName)
}
.toSeq
countDF.select(col(ID), struct(structCols: _*).as("outDegrees"))
}

/**
* The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
* - [[GraphFrame.ID]] the ID of the vertex
* - "inDegrees" a struct with a field for each edge type, storing the in-degree count
*
* @param edgeTypeCol
* Name of the column in edges DataFrame that contains edge types
* @param edgeTypes
* Optional sequence of edge type values. If None, edge types will be discovered
* automatically.
* @group degree
*/
def typeInDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = {
val pivotDF = edgeTypes match {
case Some(types) =>
edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol, types)
case None =>
edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol)
}
val countDF = pivotDF.agg(count(lit(1))).na.fill(0)
val structCols = countDF.columns
.filter(_ != ID)
.map { colName =>
col(colName).cast("int").as(colName)
}
.toSeq
countDF.select(col(ID), struct(structCols: _*).as("inDegrees"))
}

/**
* The total degree of each vertex per edge type (both in and out), returned as a DataFrame with
* two columns:
* - [[GraphFrame.ID]] the ID of the vertex
* - "degrees" a struct with a field for each edge type, storing the total degree count
*
* @param edgeTypeCol
* Name of the column in edges DataFrame that contains edge types
* @param edgeTypes
* Optional sequence of edge type values. If None, edge types will be discovered
* automatically.
* @group degree
*/
def typeDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = {
val explodedEdges = edges.select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol))

val pivotDF = edgeTypes match {
case Some(types) =>
explodedEdges.groupBy(ID).pivot(edgeTypeCol, types)
case None =>
explodedEdges.groupBy(ID).pivot(edgeTypeCol)
}
val countDF = pivotDF.agg(count(lit(1))).na.fill(0)
val structCols = countDF.columns
.filter(_ != ID)
.map { colName =>
col(colName).cast("int").as(colName)
}
.toSeq

countDF.select(col(ID), struct(structCols: _*).as("degrees"))
}

// ============================ Motif finding ========================================

/**
Expand Down
136 changes: 136 additions & 0 deletions core/src/test/scala/org/graphframes/GraphFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,142 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext {
assert(degrees === Map(1L -> 2, 2L -> 3, 3L -> 1))
}

test("type degree metrics") {
val g = GraphFrame(vertices, edges)

assert(g.typeOutDegree("action").columns === Seq("id", "outDegrees"))
val typeOutDegrees = g.typeOutDegree("action").collect()

val outDegreesSchema =
g.typeOutDegree("action").schema("outDegrees").dataType.asInstanceOf[StructType]
val outDegreesFieldNames = outDegreesSchema.fields.map(_.name).toSet
assert(outDegreesFieldNames === Set("love", "hate", "follow"))

val typeOutDegMap = typeOutDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeOutDegMap(1L).getAs[Int]("love") === 1)
assert(typeOutDegMap(1L).getAs[Int]("hate") === 0)
assert(typeOutDegMap(1L).getAs[Int]("follow") === 0)

assert(typeOutDegMap(2L).getAs[Int]("love") === 0)
assert(typeOutDegMap(2L).getAs[Int]("hate") === 1)
assert(typeOutDegMap(2L).getAs[Int]("follow") === 1)

assert(g.typeInDegree("action").columns === Seq("id", "inDegrees"))
val typeInDegrees = g.typeInDegree("action").collect()

val inDegreesSchema =
g.typeInDegree("action").schema("inDegrees").dataType.asInstanceOf[StructType]
val inDegreesFieldNames = inDegreesSchema.fields.map(_.name).toSet
assert(inDegreesFieldNames === Set("love", "hate", "follow"))

val typeInDegMap = typeInDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeInDegMap(1L).getAs[Int]("love") === 0)
assert(typeInDegMap(1L).getAs[Int]("hate") === 1)
assert(typeInDegMap(1L).getAs[Int]("follow") === 0)

assert(typeInDegMap(2L).getAs[Int]("love") === 1)
assert(typeInDegMap(2L).getAs[Int]("hate") === 0)
assert(typeInDegMap(2L).getAs[Int]("follow") === 0)

assert(typeInDegMap(3L).getAs[Int]("love") === 0)
assert(typeInDegMap(3L).getAs[Int]("hate") === 0)
assert(typeInDegMap(3L).getAs[Int]("follow") === 1)

assert(g.typeDegree("action").columns === Seq("id", "degrees"))
val typeDegrees = g.typeDegree("action").collect()

val degreesSchema = g.typeDegree("action").schema("degrees").dataType.asInstanceOf[StructType]
val degreesFieldNames = degreesSchema.fields.map(_.name).toSet
assert(degreesFieldNames === Set("love", "hate", "follow"))

val typeDegMap = typeDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeDegMap(1L).getAs[Int]("love") === 1)
assert(typeDegMap(1L).getAs[Int]("hate") === 1)
assert(typeDegMap(1L).getAs[Int]("follow") === 0)

assert(typeDegMap(2L).getAs[Int]("love") === 1)
assert(typeDegMap(2L).getAs[Int]("hate") === 1)
assert(typeDegMap(2L).getAs[Int]("follow") === 1)

assert(typeDegMap(3L).getAs[Int]("love") === 0)
assert(typeDegMap(3L).getAs[Int]("hate") === 0)
assert(typeDegMap(3L).getAs[Int]("follow") === 1)
}

test("type degree metrics with explicit edge types") {
val g = GraphFrame(vertices, edges)
val edgeTypes = Seq("love", "hate", "follow")

val typeOutDegrees = g.typeOutDegree("action", Some(edgeTypes)).collect()

val typeOutDegMap = typeOutDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeOutDegMap(1L).getAs[Int]("love") === 1)
assert(typeOutDegMap(1L).getAs[Int]("hate") === 0)
assert(typeOutDegMap(1L).getAs[Int]("follow") === 0)

assert(typeOutDegMap(2L).getAs[Int]("love") === 0)
assert(typeOutDegMap(2L).getAs[Int]("hate") === 1)
assert(typeOutDegMap(2L).getAs[Int]("follow") === 1)

val typeInDegrees = g.typeInDegree("action", Some(edgeTypes)).collect()
val typeInDegMap = typeInDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeInDegMap(1L).getAs[Int]("love") === 0)
assert(typeInDegMap(1L).getAs[Int]("hate") === 1)
assert(typeInDegMap(1L).getAs[Int]("follow") === 0)

assert(typeInDegMap(2L).getAs[Int]("love") === 1)
assert(typeInDegMap(2L).getAs[Int]("hate") === 0)
assert(typeInDegMap(2L).getAs[Int]("follow") === 0)

assert(typeInDegMap(3L).getAs[Int]("love") === 0)
assert(typeInDegMap(3L).getAs[Int]("hate") === 0)
assert(typeInDegMap(3L).getAs[Int]("follow") === 1)

val typeDegrees = g.typeDegree("action", Some(edgeTypes)).collect()
val typeDegMap = typeDegrees.map { row =>
val id = row.getLong(0)
val degrees = row.getStruct(1)
(id, degrees)
}.toMap

assert(typeDegMap(1L).getAs[Int]("love") === 1)
assert(typeDegMap(1L).getAs[Int]("hate") === 1)
assert(typeDegMap(1L).getAs[Int]("follow") === 0)

assert(typeDegMap(2L).getAs[Int]("love") === 1)
assert(typeDegMap(2L).getAs[Int]("hate") === 1)
assert(typeDegMap(2L).getAs[Int]("follow") === 1)

assert(typeDegMap(3L).getAs[Int]("love") === 0)
assert(typeDegMap(3L).getAs[Int]("hate") === 0)
assert(typeDegMap(3L).getAs[Int]("follow") === 1)
}

test("cache") {
val g = GraphFrame(vertices, edges)

Expand Down
94 changes: 93 additions & 1 deletion python/graphframes/graphframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, List, Optional

from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
Expand Down Expand Up @@ -225,6 +225,98 @@ def degrees(self) -> DataFrame:
.agg(F.count("*").alias("degree"))
)

def type_out_degree(
self, edge_type_col: str, edge_types: Optional[List[Any]] = None
) -> DataFrame:
"""
The out-degree of each vertex per edge type, returned as a DataFrame with two columns:
- "id": the I 9E81 D of the vertex
- "outDegrees": a struct with a field for each edge type, storing the out-degree count

:param edge_type_col: Name of the column in edges DataFrame that contains edge types
:param edge_types: Optional list of edge type values. If None, edge types will be
discovered automatically.
:return: DataFrame with columns "id" and "outDegrees" (struct type)
"""
if edge_types is not None:
pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot(
edge_type_col, edge_types
)
else:
pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot(
edge_type_col
)

count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0)
struct_cols = [
F.col(col_name).cast("int").alias(col_name)
for col_name in count_df.columns
if col_name != self.ID
]

return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("outDegrees"))

def type_in_degree(
self, edge_type_col: str, edge_types: Optional[List[Any]] = None
) -> DataFrame:
"""
The in-degree of each vertex per edge type, returned as a DataFrame with two columns:
- "id": the ID of the vertex
- "inDegrees": a struct with a field for each edge type, storing the in-degree count

:param edge_type_col: Name of the column in edges DataFrame that contains edge types
:param edge_types: Optional list of edge type values. If None, edge types will be
discovered automatically.
:return: DataFrame with columns "id" and "inDegrees" (struct type)
"""
if edge_types is not None:
pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot(
edge_type_col, edge_types
)
else:
pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot(
edge_type_col
)

count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0)
struct_cols = [
F.col(col_name).cast("int").alias(col_name)
for col_name in count_df.columns
if col_name != self.ID
]
return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("inDegrees"))

def type_degree(self, edge_type_col: str, edge_types: Optional[List[Any]] = None) -> DataFrame:
"""
The total degree of each vertex per edge type (both in and out), returned as a DataFrame
with two columns:
- "id": the ID of the vertex
- "degrees": a struct with a field for each edge type, storing the total degree count

:param edge_type_col: Name of the column in edges DataFrame that contains edge types
:param edge_types: Optional list of edge type values. If None, edge types will be
discovered automatically.
:return: DataFrame with columns "id" and "degrees" (struct type)
"""
exploded_edges = self._impl._edges.select(
F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID),
F.col(edge_type_col),
)

if edge_types is not None:
pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col, edge_types)
else:
pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col)

count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0)
struct_cols = [
F.col(col_name).cast("int").alias(col_name)
for col_name in count_df.columns
if col_name != self.ID
]

return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("degrees"))

@property
def triplets(self) -> DataFrame:
"""
Expand Down
2 changes: 1 addition & 1 deletion python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import tempfile
import warnings

from py4j.java_gateway import JavaObject
import pytest
from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession
from pyspark.version import __version__

Expand Down
Loading
0