diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index 9121aa657..f847e4c3c 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -310,6 +310,97 @@ class GraphFrame private ( .agg(count("*").cast("int").as("degree")) } + /** + * The out-degree of each vertex per edge type, returned as a DataFrame with two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "outDegrees" a struct with a field for each edge type, storing the out-degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. + * @group degree + */ + def typeOutDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + val pivotDF = edgeTypes match { + case Some(types) => + edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol, types) + case None => + edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol) + } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq + countDF.select(col(ID), struct(structCols: _*).as("outDegrees")) + } + + /** + * The in-degree of each vertex per edge type, returned as a DataFrame with two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "inDegrees" a struct with a field for each edge type, storing the in-degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. + * @group degree + */ + def typeInDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + val pivotDF = edgeTypes match { + case Some(types) => + edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol, types) + case None => + edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol) + } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq + countDF.select(col(ID), struct(structCols: _*).as("inDegrees")) + } + + /** + * The total degree of each vertex per edge type (both in and out), returned as a DataFrame with + * two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "degrees" a struct with a field for each edge type, storing the total degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. + * @group degree + */ + def typeDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + val explodedEdges = edges.select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol)) + + val pivotDF = edgeTypes match { + case Some(types) => + explodedEdges.groupBy(ID).pivot(edgeTypeCol, types) + case None => + explodedEdges.groupBy(ID).pivot(edgeTypeCol) + } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq + + countDF.select(col(ID), struct(structCols: _*).as("degrees")) + } + // ============================ Motif finding ======================================== /** diff --git a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala index ac86e3993..acc58998e 100644 --- a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -334,6 +334,142 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(degrees === Map(1L -> 2, 2L -> 3, 3L -> 1)) } + test("type degree metrics") { + val g = GraphFrame(vertices, edges) + + assert(g.typeOutDegree("action").columns === Seq("id", "outDegrees")) + val typeOutDegrees = g.typeOutDegree("action").collect() + + val outDegreesSchema = + g.typeOutDegree("action").schema("outDegrees").dataType.asInstanceOf[StructType] + val outDegreesFieldNames = outDegreesSchema.fields.map(_.name).toSet + assert(outDegreesFieldNames === Set("love", "hate", "follow")) + + val typeOutDegMap = typeOutDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeOutDegMap(1L).getAs[Int]("love") === 1) + assert(typeOutDegMap(1L).getAs[Int]("hate") === 0) + assert(typeOutDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeOutDegMap(2L).getAs[Int]("love") === 0) + assert(typeOutDegMap(2L).getAs[Int]("hate") === 1) + assert(typeOutDegMap(2L).getAs[Int]("follow") === 1) + + assert(g.typeInDegree("action").columns === Seq("id", "inDegrees")) + val typeInDegrees = g.typeInDegree("action").collect() + + val inDegreesSchema = + g.typeInDegree("action").schema("inDegrees").dataType.asInstanceOf[StructType] + val inDegreesFieldNames = inDegreesSchema.fields.map(_.name).toSet + assert(inDegreesFieldNames === Set("love", "hate", "follow")) + + val typeInDegMap = typeInDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeInDegMap(1L).getAs[Int]("love") === 0) + assert(typeInDegMap(1L).getAs[Int]("hate") === 1) + assert(typeInDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(2L).getAs[Int]("love") === 1) + assert(typeInDegMap(2L).getAs[Int]("hate") === 0) + assert(typeInDegMap(2L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(3L).getAs[Int]("love") === 0) + assert(typeInDegMap(3L).getAs[Int]("hate") === 0) + assert(typeInDegMap(3L).getAs[Int]("follow") === 1) + + assert(g.typeDegree("action").columns === Seq("id", "degrees")) + val typeDegrees = g.typeDegree("action").collect() + + val degreesSchema = g.typeDegree("action").schema("degrees").dataType.asInstanceOf[StructType] + val degreesFieldNames = degreesSchema.fields.map(_.name).toSet + assert(degreesFieldNames === Set("love", "hate", "follow")) + + val typeDegMap = typeDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeDegMap(1L).getAs[Int]("love") === 1) + assert(typeDegMap(1L).getAs[Int]("hate") === 1) + assert(typeDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeDegMap(2L).getAs[Int]("love") === 1) + assert(typeDegMap(2L).getAs[Int]("hate") === 1) + assert(typeDegMap(2L).getAs[Int]("follow") === 1) + + assert(typeDegMap(3L).getAs[Int]("love") === 0) + assert(typeDegMap(3L).getAs[Int]("hate") === 0) + assert(typeDegMap(3L).getAs[Int]("follow") === 1) + } + + test("type degree metrics with explicit edge types") { + val g = GraphFrame(vertices, edges) + val edgeTypes = Seq("love", "hate", "follow") + + val typeOutDegrees = g.typeOutDegree("action", Some(edgeTypes)).collect() + + val typeOutDegMap = typeOutDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeOutDegMap(1L).getAs[Int]("love") === 1) + assert(typeOutDegMap(1L).getAs[Int]("hate") === 0) + assert(typeOutDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeOutDegMap(2L).getAs[Int]("love") === 0) + assert(typeOutDegMap(2L).getAs[Int]("hate") === 1) + assert(typeOutDegMap(2L).getAs[Int]("follow") === 1) + + val typeInDegrees = g.typeInDegree("action", Some(edgeTypes)).collect() + val typeInDegMap = typeInDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeInDegMap(1L).getAs[Int]("love") === 0) + assert(typeInDegMap(1L).getAs[Int]("hate") === 1) + assert(typeInDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(2L).getAs[Int]("love") === 1) + assert(typeInDegMap(2L).getAs[Int]("hate") === 0) + assert(typeInDegMap(2L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(3L).getAs[Int]("love") === 0) + assert(typeInDegMap(3L).getAs[Int]("hate") === 0) + assert(typeInDegMap(3L).getAs[Int]("follow") === 1) + + val typeDegrees = g.typeDegree("action", Some(edgeTypes)).collect() + val typeDegMap = typeDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeDegMap(1L).getAs[Int]("love") === 1) + assert(typeDegMap(1L).getAs[Int]("hate") === 1) + assert(typeDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeDegMap(2L).getAs[Int]("love") === 1) + assert(typeDegMap(2L).getAs[Int]("hate") === 1) + assert(typeDegMap(2L).getAs[Int]("follow") === 1) + + assert(typeDegMap(3L).getAs[Int]("love") === 0) + assert(typeDegMap(3L).getAs[Int]("hate") === 0) + assert(typeDegMap(3L).getAs[Int]("follow") === 1) + } + test("cache") { val g = GraphFrame(vertices, edges) diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index d2cae7b9e..5164a53c4 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -18,7 +18,7 @@ from __future__ import annotations import warnings -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, List, Optional from pyspark.sql import functions as F from pyspark.storagelevel import StorageLevel @@ -225,6 +225,98 @@ def degrees(self) -> DataFrame: .agg(F.count("*").alias("degree")) ) + def type_out_degree( + self, edge_type_col: str, edge_types: Optional[List[Any]] = None + ) -> DataFrame: + """ + The out-degree of each vertex per edge type, returned as a DataFrame with two columns: + - "id": the ID of the vertex + - "outDegrees": a struct with a field for each edge type, storing the out-degree count + + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. + :return: DataFrame with columns "id" and "outDegrees" (struct type) + """ + if edge_types is not None: + pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot( + edge_type_col, edge_types + ) + else: + pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot( + edge_type_col + ) + + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) + struct_cols = [ + F.col(col_name).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID + ] + + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("outDegrees")) + + def type_in_degree( + self, edge_type_col: str, edge_types: Optional[List[Any]] = None + ) -> DataFrame: + """ + The in-degree of each vertex per edge type, returned as a DataFrame with two columns: + - "id": the ID of the vertex + - "inDegrees": a struct with a field for each edge type, storing the in-degree count + + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. + :return: DataFrame with columns "id" and "inDegrees" (struct type) + """ + if edge_types is not None: + pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot( + edge_type_col, edge_types + ) + else: + pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot( + edge_type_col + ) + + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) + struct_cols = [ + F.col(col_name).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID + ] + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("inDegrees")) + + def type_degree(self, edge_type_col: str, edge_types: Optional[List[Any]] = None) -> DataFrame: + """ + The total degree of each vertex per edge type (both in and out), returned as a DataFrame + with two columns: + - "id": the ID of the vertex + - "degrees": a struct with a field for each edge type, storing the total degree count + + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. + :return: DataFrame with columns "id" and "degrees" (struct type) + """ + exploded_edges = self._impl._edges.select( + F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID), + F.col(edge_type_col), + ) + + if edge_types is not None: + pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col, edge_types) + else: + pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col) + + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) + struct_cols = [ + F.col(col_name).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID + ] + + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("degrees")) + @property def triplets(self) -> DataFrame: """ diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 782664186..39ae039ec 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -5,8 +5,8 @@ import tempfile import warnings -from py4j.java_gateway import JavaObject import pytest +from py4j.java_gateway import JavaObject from pyspark.sql import SparkSession from pyspark.version import __version__ diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index f5aa0eace..2e88d8ad9 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -17,16 +17,17 @@ from dataclasses import dataclass -from pyspark.storagelevel import StorageLevel + import pytest -from pyspark.sql import DataFrame, SparkSession, functions as sqlfunctions +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as sqlfunctions +from pyspark.sql.utils import is_remote +from pyspark.storagelevel import StorageLevel from graphframes.classic.graphframe import _from_java_gf from graphframes.examples import BeliefPropagation, Graphs from graphframes.graphframe import GraphFrame -from pyspark.sql.utils import is_remote - @dataclass class PregelArguments: @@ -93,9 +94,7 @@ def test_validate(spark: SparkSession) -> None: good_g.validate() # no exception should be thrown not_distinct_vertices = GraphFrame( - spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (1, "d")]).toDF( - "id", "attr" - ), + spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (1, "d")]).toDF("id", "attr"), spark.createDataFrame([(1, 2), (2, 1), (2, 3)]).toDF("src", "dst"), ) with pytest.raises(ValueError): @@ -157,6 +156,105 @@ def test_degrees(local_g: GraphFrame) -> None: assert set(deg.columns) == {"id", "degree"} +def test_type_degrees(local_g: GraphFrame) -> None: + type_out_degree = local_g.type_out_degree("action") + assert set(type_out_degree.columns) == {"id", "outDegrees"} + + schema = type_out_degree.schema["outDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.outDegrees for row in type_out_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 0 + assert results[1].follow == 0 + assert results[2].love == 0 + assert results[2].hate == 1 + assert results[2].follow == 1 + + type_in_degree = local_g.type_in_degree("action") + assert set(type_in_degree.columns) == {"id", "inDegrees"} + + schema = type_in_degree.schema["inDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.inDegrees for row in type_in_degree.collect()} + assert results[1].love == 0 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 0 + assert results[2].follow == 0 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + type_degree = local_g.type_degree("action") + assert set(type_degree.columns) == {"id", "degrees"} + + schema = type_degree.schema["degrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.degrees for row in type_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 1 + assert results[2].follow == 1 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + +def test_type_degrees_with_explicit_types(local_g: GraphFrame) -> None: + edge_types = ["love", "hate", "follow"] + type_out_degree = local_g.type_out_degree("action", edge_types) + assert set(type_out_degree.columns) == {"id", "outDegrees"} + + schema = type_out_degree.schema["outDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.outDegrees for row in type_out_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 0 + assert results[1].follow == 0 + assert results[2].love == 0 + assert results[2].hate == 1 + assert results[2].follow == 1 + + type_in_degree = local_g.type_in_degree("action", edge_types) + assert set(type_in_degree.columns) == {"id", "inDegrees"} + + results = {row.id: row.inDegrees for row in type_in_degree.collect()} + assert results[1].love == 0 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 0 + assert results[2].follow == 0 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + type_degree = local_g.type_degree("action", edge_types) + assert set(type_degree.columns) == {"id", "degrees"} + + results = {row.id: row.degrees for row in type_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 1 + assert results[2].follow == 1 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + def test_motif_finding(local_g: GraphFrame) -> None: motifs = local_g.find("(a)-[e]->(b)") assert motifs.count() == 3 @@ -448,9 +546,9 @@ def test_strongly_connected_components(spark: SparkSession) -> None: g = GraphFrame(vertices, edges) c = g.stronglyConnectedComponents(5) for row in c.collect(): - assert row.id == row.component, ( - f"Vertex {row.id} not equal to its component {row.component}" - ) + assert ( + row.id == row.component + ), f"Vertex {row.id} not equal to its component {row.component}" _ = c.unpersist() @@ -461,9 +559,7 @@ def test_triangle_counts(spark: SparkSession, storage_level: StorageLevel) -> No g = GraphFrame(vertices, edges) c = g.triangleCount(storage_level=storage_level) for row in c.select("id", "count").collect(): - assert row.asDict()["count"] == 1, ( - f"Triangle count for vertex {row.id} is not 1" - ) + assert row.asDict()["count"] == 1, f"Triangle count for vertex {row.id} is not 1" _ = c.unpersist() @@ -472,9 +568,7 @@ def test_cycles_finding(spark: SparkSession, args: PregelArguments) -> None: vertices = spark.createDataFrame( [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")], ["id", "attr"] ) - edges = spark.createDataFrame( - [(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"] - ) + edges = spark.createDataFrame([(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"]) graph = GraphFrame(vertices, edges) res = graph.detectingCycles( checkpoint_interval=args.checkpoint_interval, @@ -529,9 +623,7 @@ def test_belief_propagation(spark: SparkSession): # Check that each belief is a valid probability in [0, 1]. for row in results.vertices.select("belief").collect(): belief = row["belief"] - assert 0 <= belief <= 1, ( - f"Expected belief to be probability in [0,1], but found {belief}" - ) + assert 0 <= belief <= 1, f"Expected belief to be probability in [0,1], but found {belief}" @pytest.mark.skipif(is_remote(), reason="DISABLE FOR CONNECT")