From 3d6623e8bf40709517463c4f4e69e5fe7b68e159 Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Tue, 21 Oct 2025 09:17:29 +0530 Subject: [PATCH 1/4] Add Graphframe Type Degree Methods Signed-off-by: joelrobin18 --- .../scala/org/graphframes/GraphFrame.scala | 71 ++++++++++++++++++ .../org/graphframes/GraphFrameSuite.scala | 75 +++++++++++++++++++ python/graphframes/graphframe.py | 73 ++++++++++++++++++ python/tests/test_graphframes.py | 52 +++++++++++++ 4 files changed, 271 insertions(+) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index 9121aa657..f4836a857 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -310,6 +310,77 @@ class GraphFrame private ( .agg(count("*").cast("int").as("degree")) } + /** + * The out-degree of each vertex per edge type, returned as a DataFrame with two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "outDegrees" a struct with a field for each edge type, storing the out-degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @group degree + */ + def typeOutDegree(edgeTypeCol: String): DataFrame = { + import org.apache.spark.sql.functions.{sum, when} + + val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) + + val aggExprs = edgeTypes.map { edgeType => + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + } + + edges + .groupBy(col(SRC).as(ID)) + .agg(struct(aggExprs.toSeq: _*).as("outDegrees")) + } + + /** + * The in-degree of each vertex per edge type, returned as a DataFrame with two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "inDegrees" a struct with a field for each edge type, storing the in-degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @group degree + */ + def typeInDegree(edgeTypeCol: String): DataFrame = { + import org.apache.spark.sql.functions.{sum, when} + + val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) + + val aggExprs = edgeTypes.map { edgeType => + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + } + + edges + .groupBy(col(DST).as(ID)) + .agg(struct(aggExprs.toSeq: _*).as("inDegrees")) + } + + /** + * The total degree of each vertex per edge type (both in and out), returned as a DataFrame with + * two columns: + * - [[GraphFrame.ID]] the ID of the vertex + * - "degrees" a struct with a field for each edge type, storing the total degree count + * + * @param edgeTypeCol + * Name of the column in edges DataFrame that contains edge types + * @group degree + */ + def typeDegree(edgeTypeCol: String): DataFrame = { + import org.apache.spark.sql.functions.{sum, when} + + val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) + + val aggExprs = edgeTypes.map { edgeType => + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + } + + edges + .select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol)) + .groupBy(ID) + .agg(struct(aggExprs.toSeq: _*).as("degrees")) + } + // ============================ Motif finding ======================================== /** diff --git a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala index ac86e3993..4de1c801e 100644 --- a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -334,6 +334,81 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(degrees === Map(1L -> 2, 2L -> 3, 3L -> 1)) } + test("type degree metrics") { + val g = GraphFrame(vertices, edges) + + assert(g.typeOutDegree("action").columns === Seq("id", "outDegrees")) + val typeOutDegrees = g.typeOutDegree("action").collect() + + val outDegreesSchema = g.typeOutDegree("action").schema("outDegrees").dataType.asInstanceOf[StructType] + val outDegreesFieldNames = outDegreesSchema.fields.map(_.name).toSet + assert(outDegreesFieldNames === Set("love", "hate", "follow")) + + val typeOutDegMap = typeOutDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeOutDegMap(1L).getAs[Int]("love") === 1) + assert(typeOutDegMap(1L).getAs[Int]("hate") === 0) + assert(typeOutDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeOutDegMap(2L).getAs[Int]("love") === 0) + assert(typeOutDegMap(2L).getAs[Int]("hate") === 1) + assert(typeOutDegMap(2L).getAs[Int]("follow") === 1) + + assert(g.typeInDegree("action").columns === Seq("id", "inDegrees")) + val typeInDegrees = g.typeInDegree("action").collect() + + val inDegreesSchema = g.typeInDegree("action").schema("inDegrees").dataType.asInstanceOf[StructType] + val inDegreesFieldNames = inDegreesSchema.fields.map(_.name).toSet + assert(inDegreesFieldNames === Set("love", "hate", "follow")) + + val typeInDegMap = typeInDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeInDegMap(1L).getAs[Int]("love") === 0) + assert(typeInDegMap(1L).getAs[Int]("hate") === 1) + assert(typeInDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(2L).getAs[Int]("love") === 1) + assert(typeInDegMap(2L).getAs[Int]("hate") === 0) + assert(typeInDegMap(2L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(3L).getAs[Int]("love") === 0) + assert(typeInDegMap(3L).getAs[Int]("hate") === 0) + assert(typeInDegMap(3L).getAs[Int]("follow") === 1) + + assert(g.typeDegree("action").columns === Seq("id", "degrees")) + val typeDegrees = g.typeDegree("action").collect() + + val degreesSchema = g.typeDegree("action").schema("degrees").dataType.asInstanceOf[StructType] + val degreesFieldNames = degreesSchema.fields.map(_.name).toSet + assert(degreesFieldNames === Set("love", "hate", "follow")) + + val typeDegMap = typeDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeDegMap(1L).getAs[Int]("love") === 1) + assert(typeDegMap(1L).getAs[Int]("hate") === 1) + assert(typeDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeDegMap(2L).getAs[Int]("love") === 1) + assert(typeDegMap(2L).getAs[Int]("hate") === 1) + assert(typeDegMap(2L).getAs[Int]("follow") === 1) + + assert(typeDegMap(3L).getAs[Int]("love") === 0) + assert(typeDegMap(3L).getAs[Int]("hate") === 0) + assert(typeDegMap(3L).getAs[Int]("follow") === 1) + } + test("cache") { val g = GraphFrame(vertices, edges) diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index d2cae7b9e..db7ef5509 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -225,6 +225,79 @@ def degrees(self) -> DataFrame: .agg(F.count("*").alias("degree")) ) + + def typeOutDegree(self, edgeTypeCol: str) -> DataFrame: + """ + The out-degree of each vertex per edge type, returned as a DataFrame with two columns: + - "id": the ID of the vertex + - "outDegrees": a struct with a field for each edge type, storing the out-degree count + + :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :return: DataFrame with columns "id" and "outDegrees" (struct type) + """ + edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + + agg_exprs = [ + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + for edge_type in edge_types + ] + + result = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).agg( + F.struct(*agg_exprs).alias("outDegrees") + ) + + return result + + def typeInDegree(self, edgeTypeCol: str) -> DataFrame: + """ + The in-degree of each vertex per edge type, returned as a DataFrame with two columns: + - "id": the ID of the vertex + - "inDegrees": a struct with a field for each edge type, storing the in-degree count + + :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :return: DataFrame with columns "id" and "inDegrees" (struct type) + """ + edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + + agg_exprs = [ + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + for edge_type in edge_types + ] + + result = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).agg( + F.struct(*agg_exprs).alias("inDegrees") + ) + + return result + + def typeDegree(self, edgeTypeCol: str) -> DataFrame: + """ + The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns: + - "id": the ID of the vertex + - "degrees": a struct with a field for each edge type, storing the total degree count + + :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :return: DataFrame with columns "id" and "degrees" (struct type) + """ + edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + + agg_exprs = [ + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + for edge_type in edge_types + ] + + result = ( + self._impl._edges.select( + F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID), + F.col(edgeTypeCol) + ) + .groupBy(self.ID) + .agg(F.struct(*agg_exprs).alias("degrees")) + ) + + return result + + @property def triplets(self) -> DataFrame: """ diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index f5aa0eace..68e3b1df0 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -157,6 +157,58 @@ def test_degrees(local_g: GraphFrame) -> None: assert set(deg.columns) == {"id", "degree"} +def test_type_degrees(local_g: GraphFrame) -> None: + typeOutDeg = local_g.typeOutDegree("action") + assert set(typeOutDeg.columns) == {"id", "outDegrees"} + + schema = typeOutDeg.schema["outDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.outDegrees for row in typeOutDeg.collect()} + assert results[1].love == 1 + assert results[1].hate == 0 + assert results[1].follow == 0 + assert results[2].love == 0 + assert results[2].hate == 1 + assert results[2].follow == 1 + + typeInDeg = local_g.typeInDegree("action") + assert set(typeInDeg.columns) == {"id", "inDegrees"} + + schema = typeInDeg.schema["inDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.inDegrees for row in typeInDeg.collect()} + assert results[1].love == 0 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 0 + assert results[2].follow == 0 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + typeDeg = local_g.typeDegree("action") + assert set(typeDeg.columns) == {"id", "degrees"} + + schema = typeDeg.schema["degrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.degrees for row in typeDeg.collect()} + assert results[1].love == 1 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 1 + assert results[2].follow == 1 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + def test_motif_finding(local_g: GraphFrame) -> None: motifs = local_g.find("(a)-[e]->(b)") assert motifs.count() == 3 From 29b9b29e281c950f6c2e669898dd3f6fbbadcd8d Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Tue, 21 Oct 2025 10:06:21 +0530 Subject: [PATCH 2/4] Lint Signed-off-by: joelrobin18 --- .../scala/org/graphframes/GraphFrame.scala | 14 ++++-- .../org/graphframes/GraphFrameSuite.scala | 8 ++-- python/graphframes/graphframe.py | 19 +++++--- python/tests/conftest.py | 2 +- python/tests/test_graphframes.py | 44 ++++++++----------- 5 files changed, 47 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index f4836a857..59291c1fe 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -310,7 +310,7 @@ class GraphFrame private ( .agg(count("*").cast("int").as("degree")) } - /** + /** * The out-degree of each vertex per edge type, returned as a DataFrame with two columns: * - [[GraphFrame.ID]] the ID of the vertex * - "outDegrees" a struct with a field for each edge type, storing the out-degree count @@ -325,7 +325,9 @@ class GraphFrame private ( val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) + .cast("int") + .as(edgeType.toString) } edges @@ -348,7 +350,9 @@ class GraphFrame private ( val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) + .cast("int") + .as(edgeType.toString) } edges @@ -372,7 +376,9 @@ class GraphFrame private ( val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)).cast("int").as(edgeType.toString) + sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) + .cast("int") + .as(edgeType.toString) } edges diff --git a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala index 4de1c801e..fb5c8ebc6 100644 --- a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -340,7 +340,8 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(g.typeOutDegree("action").columns === Seq("id", "outDegrees")) val typeOutDegrees = g.typeOutDegree("action").collect() - val outDegreesSchema = g.typeOutDegree("action").schema("outDegrees").dataType.asInstanceOf[StructType] + val outDegreesSchema = + g.typeOutDegree("action").schema("outDegrees").dataType.asInstanceOf[StructType] val outDegreesFieldNames = outDegreesSchema.fields.map(_.name).toSet assert(outDegreesFieldNames === Set("love", "hate", "follow")) @@ -361,7 +362,8 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(g.typeInDegree("action").columns === Seq("id", "inDegrees")) val typeInDegrees = g.typeInDegree("action").collect() - val inDegreesSchema = g.typeInDegree("action").schema("inDegrees").dataType.asInstanceOf[StructType] + val inDegreesSchema = + g.typeInDegree("action").schema("inDegrees").dataType.asInstanceOf[StructType] val inDegreesFieldNames = inDegreesSchema.fields.map(_.name).toSet assert(inDegreesFieldNames === Set("love", "hate", "follow")) @@ -408,7 +410,7 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(typeDegMap(3L).getAs[Int]("hate") === 0) assert(typeDegMap(3L).getAs[Int]("follow") === 1) } - + test("cache") { val g = GraphFrame(vertices, edges) diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index db7ef5509..76fe3ab1d 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -225,7 +225,6 @@ def degrees(self) -> DataFrame: .agg(F.count("*").alias("degree")) ) - def typeOutDegree(self, edgeTypeCol: str) -> DataFrame: """ The out-degree of each vertex per edge type, returned as a DataFrame with two columns: @@ -238,7 +237,9 @@ def typeOutDegree(self, edgeTypeCol: str) -> DataFrame: edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) + .cast("int") + .alias(edge_type) for edge_type in edge_types ] @@ -260,7 +261,9 @@ def typeInDegree(self, edgeTypeCol: str) -> DataFrame: edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) + .cast("int") + .alias(edge_type) for edge_type in edge_types ] @@ -272,7 +275,8 @@ def typeInDegree(self, edgeTypeCol: str) -> DataFrame: def typeDegree(self, edgeTypeCol: str) -> DataFrame: """ - The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns: + The total degree of each vertex per edge type (both in and out), returned as a DataFrame + with two columns: - "id": the ID of the vertex - "degrees": a struct with a field for each edge type, storing the total degree count @@ -282,14 +286,16 @@ def typeDegree(self, edgeTypeCol: str) -> DataFrame: edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)).cast("int").alias(edge_type) + F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) + .cast("int") + .alias(edge_type) for edge_type in edge_types ] result = ( self._impl._edges.select( F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID), - F.col(edgeTypeCol) + F.col(edgeTypeCol), ) .groupBy(self.ID) .agg(F.struct(*agg_exprs).alias("degrees")) @@ -297,7 +303,6 @@ def typeDegree(self, edgeTypeCol: str) -> DataFrame: return result - @property def triplets(self) -> DataFrame: """ diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 782664186..39ae039ec 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -5,8 +5,8 @@ import tempfile import warnings -from py4j.java_gateway import JavaObject import pytest +from py4j.java_gateway import JavaObject from pyspark.sql import SparkSession from pyspark.version import __version__ diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index 68e3b1df0..29c143bee 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -17,16 +17,17 @@ from dataclasses import dataclass -from pyspark.storagelevel import StorageLevel + import pytest -from pyspark.sql import DataFrame, SparkSession, functions as sqlfunctions +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as sqlfunctions +from pyspark.sql.utils import is_remote +from pyspark.storagelevel import StorageLevel from graphframes.classic.graphframe import _from_java_gf from graphframes.examples import BeliefPropagation, Graphs from graphframes.graphframe import GraphFrame -from pyspark.sql.utils import is_remote - @dataclass class PregelArguments: @@ -93,9 +94,7 @@ def test_validate(spark: SparkSession) -> None: good_g.validate() # no exception should be thrown not_distinct_vertices = GraphFrame( - spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (1, "d")]).toDF( - "id", "attr" - ), + spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (1, "d")]).toDF("id", "attr"), spark.createDataFrame([(1, 2), (2, 1), (2, 3)]).toDF("src", "dst"), ) with pytest.raises(ValueError): @@ -199,15 +198,16 @@ def test_type_degrees(local_g: GraphFrame) -> None: assert field_names == {"love", "hate", "follow"} results = {row.id: row.degrees for row in typeDeg.collect()} - assert results[1].love == 1 - assert results[1].hate == 1 + assert results[1].love == 1 + assert results[1].hate == 1 assert results[1].follow == 0 - assert results[2].love == 1 - assert results[2].hate == 1 - assert results[2].follow == 1 + assert results[2].love == 1 + assert results[2].hate == 1 + assert results[2].follow == 1 assert results[3].love == 0 assert results[3].hate == 0 - assert results[3].follow == 1 + assert results[3].follow == 1 + def test_motif_finding(local_g: GraphFrame) -> None: motifs = local_g.find("(a)-[e]->(b)") @@ -500,9 +500,9 @@ def test_strongly_connected_components(spark: SparkSession) -> None: g = GraphFrame(vertices, edges) c = g.stronglyConnectedComponents(5) for row in c.collect(): - assert row.id == row.component, ( - f"Vertex {row.id} not equal to its component {row.component}" - ) + assert ( + row.id == row.component + ), f"Vertex {row.id} not equal to its component {row.component}" _ = c.unpersist() @@ -513,9 +513,7 @@ def test_triangle_counts(spark: SparkSession, storage_level: StorageLevel) -> No g = GraphFrame(vertices, edges) c = g.triangleCount(storage_level=storage_level) for row in c.select("id", "count").collect(): - assert row.asDict()["count"] == 1, ( - f"Triangle count for vertex {row.id} is not 1" - ) + assert row.asDict()["count"] == 1, f"Triangle count for vertex {row.id} is not 1" _ = c.unpersist() @@ -524,9 +522,7 @@ def test_cycles_finding(spark: SparkSession, args: PregelArguments) -> None: vertices = spark.createDataFrame( [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")], ["id", "attr"] ) - edges = spark.createDataFrame( - [(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"] - ) + edges = spark.createDataFrame([(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"]) graph = GraphFrame(vertices, edges) res = graph.detectingCycles( checkpoint_interval=args.checkpoint_interval, @@ -581,9 +577,7 @@ def test_belief_propagation(spark: SparkSession): # Check that each belief is a valid probability in [0, 1]. for row in results.vertices.select("belief").collect(): belief = row["belief"] - assert 0 <= belief <= 1, ( - f"Expected belief to be probability in [0,1], but found {belief}" - ) + assert 0 <= belief <= 1, f"Expected belief to be probability in [0,1], but found {belief}" @pytest.mark.skipif(is_remote(), reason="DISABLE FOR CONNECT") From d4feff10a922b9ff22f032b1027c25be4d58b870 Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Wed, 22 Oct 2025 21:38:15 +0530 Subject: [PATCH 3/4] Perf improve Signed-off-by: joelrobin18 --- .../scala/org/graphframes/GraphFrame.scala | 82 +++++++------ .../org/graphframes/GraphFrameSuite.scala | 59 ++++++++++ python/graphframes/graphframe.py | 109 ++++++++++-------- python/tests/test_graphframes.py | 70 +++++++++-- 4 files changed, 224 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index 59291c1fe..02225babe 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -317,22 +317,25 @@ class GraphFrame private ( * * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, + * edge types will be discovered automatically. * @group degree */ - def typeOutDegree(edgeTypeCol: String): DataFrame = { - import org.apache.spark.sql.functions.{sum, when} + def typeOutDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + import org.apache.spark.sql.functions.coalesce - val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) - - val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) - .cast("int") - .as(edgeType.toString) + val pivotDF = edgeTypes match { + case Some(types) => + edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol, types) + case None => + edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol) } - - edges - .groupBy(col(SRC).as(ID)) - .agg(struct(aggExprs.toSeq: _*).as("outDegrees")) + val countDF = pivotDF.agg(count("*")) + val structCols = countDF.columns.filter(_ != ID).map { colName => + coalesce(col(colName), lit(0)).cast("int").as(colName) + } + countDF.select(col(ID), struct(structCols: _*).as("outDegrees")) } /** @@ -342,22 +345,25 @@ class GraphFrame private ( * * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, + * edge types will be discovered automatically. * @group degree */ - def typeInDegree(edgeTypeCol: String): DataFrame = { - import org.apache.spark.sql.functions.{sum, when} - - val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) + def typeInDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + import org.apache.spark.sql.functions.coalesce - val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) - .cast("int") - .as(edgeType.toString) + val pivotDF = edgeTypes match { + case Some(types) => + edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol, types) + case None => + edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol) } - - edges - .groupBy(col(DST).as(ID)) - .agg(struct(aggExprs.toSeq: _*).as("inDegrees")) + val countDF = pivotDF.agg(count("*")) + val structCols = countDF.columns.filter(_ != ID).map { colName => + coalesce(col(colName), lit(0)).cast("int").as(colName) + } + countDF.select(col(ID), struct(structCols: _*).as("inDegrees")) } /** @@ -368,23 +374,27 @@ class GraphFrame private ( * * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types + * @param edgeTypes + * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, + * edge types will be discovered automatically. * @group degree */ - def typeDegree(edgeTypeCol: String): DataFrame = { - import org.apache.spark.sql.functions.{sum, when} + def typeDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { + import org.apache.spark.sql.functions.coalesce + val explodedEdges = edges.select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol)) - val edgeTypes = edges.select(edgeTypeCol).distinct().collect().map(_.get(0)) - - val aggExprs = edgeTypes.map { edgeType => - sum(when(col(edgeTypeCol) === lit(edgeType), 1).otherwise(0)) - .cast("int") - .as(edgeType.toString) + val pivotDF = edgeTypes match { + case Some(types) => + explodedEdges.groupBy(ID).pivot(edgeTypeCol, types) + case None => + explodedEdges.groupBy(ID).pivot(edgeTypeCol) + } + val countDF = pivotDF.agg(count("*")) + val structCols = countDF.columns.filter(_ != ID).map { colName => + coalesce(col(colName), lit(0)).cast("int").as(colName) } - edges - .select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol)) - .groupBy(ID) - .agg(struct(aggExprs.toSeq: _*).as("degrees")) + countDF.select(col(ID), struct(structCols: _*).as("degrees")) } // ============================ Motif finding ======================================== diff --git a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala index fb5c8ebc6..acc58998e 100644 --- a/core/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/core/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -411,6 +411,65 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { assert(typeDegMap(3L).getAs[Int]("follow") === 1) } + test("type degree metrics with explicit edge types") { + val g = GraphFrame(vertices, edges) + val edgeTypes = Seq("love", "hate", "follow") + + val typeOutDegrees = g.typeOutDegree("action", Some(edgeTypes)).collect() + + val typeOutDegMap = typeOutDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeOutDegMap(1L).getAs[Int]("love") === 1) + assert(typeOutDegMap(1L).getAs[Int]("hate") === 0) + assert(typeOutDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeOutDegMap(2L).getAs[Int]("love") === 0) + assert(typeOutDegMap(2L).getAs[Int]("hate") === 1) + assert(typeOutDegMap(2L).getAs[Int]("follow") === 1) + + val typeInDegrees = g.typeInDegree("action", Some(edgeTypes)).collect() + val typeInDegMap = typeInDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeInDegMap(1L).getAs[Int]("love") === 0) + assert(typeInDegMap(1L).getAs[Int]("hate") === 1) + assert(typeInDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(2L).getAs[Int]("love") === 1) + assert(typeInDegMap(2L).getAs[Int]("hate") === 0) + assert(typeInDegMap(2L).getAs[Int]("follow") === 0) + + assert(typeInDegMap(3L).getAs[Int]("love") === 0) + assert(typeInDegMap(3L).getAs[Int]("hate") === 0) + assert(typeInDegMap(3L).getAs[Int]("follow") === 1) + + val typeDegrees = g.typeDegree("action", Some(edgeTypes)).collect() + val typeDegMap = typeDegrees.map { row => + val id = row.getLong(0) + val degrees = row.getStruct(1) + (id, degrees) + }.toMap + + assert(typeDegMap(1L).getAs[Int]("love") === 1) + assert(typeDegMap(1L).getAs[Int]("hate") === 1) + assert(typeDegMap(1L).getAs[Int]("follow") === 0) + + assert(typeDegMap(2L).getAs[Int]("love") === 1) + assert(typeDegMap(2L).getAs[Int]("hate") === 1) + assert(typeDegMap(2L).getAs[Int]("follow") === 1) + + assert(typeDegMap(3L).getAs[Int]("love") === 0) + assert(typeDegMap(3L).getAs[Int]("hate") === 0) + assert(typeDegMap(3L).getAs[Int]("follow") === 1) + } + test("cache") { val g = GraphFrame(vertices, edges) diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index 76fe3ab1d..49ff69965 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -18,7 +18,7 @@ from __future__ import annotations import warnings -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, List, Optional from pyspark.sql import functions as F from pyspark.storagelevel import StorageLevel @@ -225,83 +225,96 @@ def degrees(self) -> DataFrame: .agg(F.count("*").alias("degree")) ) - def typeOutDegree(self, edgeTypeCol: str) -> DataFrame: + def type_out_degree( + self, edge_type_col: str, edge_types: Optional[List[Any]] = None + ) -> DataFrame: """ The out-degree of each vertex per edge type, returned as a DataFrame with two columns: - "id": the ID of the vertex - "outDegrees": a struct with a field for each edge type, storing the out-degree count - :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. + If None, edge types will be discovered automatically. :return: DataFrame with columns "id" and "outDegrees" (struct type) """ - edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + if edge_types is not None: + pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot( + edge_type_col, edge_types + ) + else: + pivot_df = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).pivot( + edge_type_col + ) - agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) - .cast("int") - .alias(edge_type) - for edge_type in edge_types + count_df = pivot_df.agg(F.count("*")) + struct_cols = [ + F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID ] - result = self._impl._edges.groupBy(F.col(self.SRC).alias(self.ID)).agg( - F.struct(*agg_exprs).alias("outDegrees") - ) - - return result + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("outDegrees")) - def typeInDegree(self, edgeTypeCol: str) -> DataFrame: + def type_in_degree( + self, edge_type_col: str, edge_types: Optional[List[Any]] = None + ) -> DataFrame: """ The in-degree of each vertex per edge type, returned as a DataFrame with two columns: - "id": the ID of the vertex - "inDegrees": a struct with a field for each edge type, storing the in-degree count - :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. + If None, edge types will be discovered automatically. :return: DataFrame with columns "id" and "inDegrees" (struct type) """ - edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + if edge_types is not None: + pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot( + edge_type_col, edge_types + ) + else: + pivot_df = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).pivot( + edge_type_col + ) - agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) - .cast("int") - .alias(edge_type) - for edge_type in edge_types + count_df = pivot_df.agg(F.count("*")) + struct_cols = [ + F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID ] + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("inDegrees")) - result = self._impl._edges.groupBy(F.col(self.DST).alias(self.ID)).agg( - F.struct(*agg_exprs).alias("inDegrees") - ) - - return result - - def typeDegree(self, edgeTypeCol: str) -> DataFrame: + def type_degree(self, edge_type_col: str, edge_types: Optional[List[Any]] = None) -> DataFrame: """ - The total degree of each vertex per edge type (both in and out), returned as a DataFrame - with two columns: + The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns: - "id": the ID of the vertex - "degrees": a struct with a field for each edge type, storing the total degree count - :param edgeTypeCol: Name of the column in edges DataFrame that contains edge types + :param edge_type_col: Name of the column in edges DataFrame that contains edge types + :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. + If None, edge types will be discovered automatically. :return: DataFrame with columns "id" and "degrees" (struct type) """ - edge_types = [row[0] for row in self._impl._edges.select(edgeTypeCol).distinct().collect()] + exploded_edges = self._impl._edges.select( + F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID), + F.col(edge_type_col), + ) - agg_exprs = [ - F.sum(F.when(F.col(edgeTypeCol) == edge_type, 1).otherwise(0)) - .cast("int") - .alias(edge_type) - for edge_type in edge_types - ] + if edge_types is not None: + pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col, edge_types) + else: + pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col) - result = ( - self._impl._edges.select( - F.explode(F.array(F.col(self.SRC), F.col(self.DST))).alias(self.ID), - F.col(edgeTypeCol), - ) - .groupBy(self.ID) - .agg(F.struct(*agg_exprs).alias("degrees")) - ) + count_df = pivot_df.agg(F.count("*")) + struct_cols = [ + F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + for col_name in count_df.columns + if col_name != self.ID + ] - return result + return count_df.select(F.col(self.ID), F.struct(*struct_cols).alias("degrees")) @property def triplets(self) -> DataFrame: diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index 29c143bee..2e88d8ad9 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -157,14 +157,14 @@ def test_degrees(local_g: GraphFrame) -> None: def test_type_degrees(local_g: GraphFrame) -> None: - typeOutDeg = local_g.typeOutDegree("action") - assert set(typeOutDeg.columns) == {"id", "outDegrees"} + type_out_degree = local_g.type_out_degree("action") + assert set(type_out_degree.columns) == {"id", "outDegrees"} - schema = typeOutDeg.schema["outDegrees"].dataType + schema = type_out_degree.schema["outDegrees"].dataType field_names = {field.name for field in schema.fields} assert field_names == {"love", "hate", "follow"} - results = {row.id: row.outDegrees for row in typeOutDeg.collect()} + results = {row.id: row.outDegrees for row in type_out_degree.collect()} assert results[1].love == 1 assert results[1].hate == 0 assert results[1].follow == 0 @@ -172,14 +172,14 @@ def test_type_degrees(local_g: GraphFrame) -> None: assert results[2].hate == 1 assert results[2].follow == 1 - typeInDeg = local_g.typeInDegree("action") - assert set(typeInDeg.columns) == {"id", "inDegrees"} + type_in_degree = local_g.type_in_degree("action") + assert set(type_in_degree.columns) == {"id", "inDegrees"} - schema = typeInDeg.schema["inDegrees"].dataType + schema = type_in_degree.schema["inDegrees"].dataType field_names = {field.name for field in schema.fields} assert field_names == {"love", "hate", "follow"} - results = {row.id: row.inDegrees for row in typeInDeg.collect()} + results = {row.id: row.inDegrees for row in type_in_degree.collect()} assert results[1].love == 0 assert results[1].hate == 1 assert results[1].follow == 0 @@ -190,14 +190,60 @@ def test_type_degrees(local_g: GraphFrame) -> None: assert results[3].hate == 0 assert results[3].follow == 1 - typeDeg = local_g.typeDegree("action") - assert set(typeDeg.columns) == {"id", "degrees"} + type_degree = local_g.type_degree("action") + assert set(type_degree.columns) == {"id", "degrees"} - schema = typeDeg.schema["degrees"].dataType + schema = type_degree.schema["degrees"].dataType field_names = {field.name for field in schema.fields} assert field_names == {"love", "hate", "follow"} - results = {row.id: row.degrees for row in typeDeg.collect()} + results = {row.id: row.degrees for row in type_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 1 + assert results[2].follow == 1 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + +def test_type_degrees_with_explicit_types(local_g: GraphFrame) -> None: + edge_types = ["love", "hate", "follow"] + type_out_degree = local_g.type_out_degree("action", edge_types) + assert set(type_out_degree.columns) == {"id", "outDegrees"} + + schema = type_out_degree.schema["outDegrees"].dataType + field_names = {field.name for field in schema.fields} + assert field_names == {"love", "hate", "follow"} + + results = {row.id: row.outDegrees for row in type_out_degree.collect()} + assert results[1].love == 1 + assert results[1].hate == 0 + assert results[1].follow == 0 + assert results[2].love == 0 + assert results[2].hate == 1 + assert results[2].follow == 1 + + type_in_degree = local_g.type_in_degree("action", edge_types) + assert set(type_in_degree.columns) == {"id", "inDegrees"} + + results = {row.id: row.inDegrees for row in type_in_degree.collect()} + assert results[1].love == 0 + assert results[1].hate == 1 + assert results[1].follow == 0 + assert results[2].love == 1 + assert results[2].hate == 0 + assert results[2].follow == 0 + assert results[3].love == 0 + assert results[3].hate == 0 + assert results[3].follow == 1 + + type_degree = local_g.type_degree("action", edge_types) + assert set(type_degree.columns) == {"id", "degrees"} + + results = {row.id: row.degrees for row in type_degree.collect()} assert results[1].love == 1 assert results[1].hate == 1 assert results[1].follow == 0 From 638c549a71510d97a9a8bd3e97f7d1c874dc73cf Mon Sep 17 00:00:00 2001 From: joelrobin18 Date: Thu, 23 Oct 2025 06:56:17 +0530 Subject: [PATCH 4/4] Perf improve Signed-off-by: joelrobin18 --- .../scala/org/graphframes/GraphFrame.scala | 50 ++++++++++--------- python/graphframes/graphframe.py | 27 +++++----- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index 02225babe..f847e4c3c 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -318,23 +318,24 @@ class GraphFrame private ( * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types * @param edgeTypes - * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, - * edge types will be discovered automatically. + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. * @group degree */ def typeOutDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { - import org.apache.spark.sql.functions.coalesce - val pivotDF = edgeTypes match { case Some(types) => edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol, types) case None => edges.groupBy(col(SRC).as(ID)).pivot(edgeTypeCol) } - val countDF = pivotDF.agg(count("*")) - val structCols = countDF.columns.filter(_ != ID).map { colName => - coalesce(col(colName), lit(0)).cast("int").as(colName) - } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq countDF.select(col(ID), struct(structCols: _*).as("outDegrees")) } @@ -346,23 +347,24 @@ class GraphFrame private ( * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types * @param edgeTypes - * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, - * edge types will be discovered automatically. + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. * @group degree */ def typeInDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { - import org.apache.spark.sql.functions.coalesce - val pivotDF = edgeTypes match { case Some(types) => edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol, types) case None => edges.groupBy(col(DST).as(ID)).pivot(edgeTypeCol) } - val countDF = pivotDF.agg(count("*")) - val structCols = countDF.columns.filter(_ != ID).map { colName => - coalesce(col(colName), lit(0)).cast("int").as(colName) - } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq countDF.select(col(ID), struct(structCols: _*).as("inDegrees")) } @@ -375,12 +377,11 @@ class GraphFrame private ( * @param edgeTypeCol * Name of the column in edges DataFrame that contains edge types * @param edgeTypes - * Optional sequence of edge type values. If provided, avoids a collect() operation. If None, - * edge types will be discovered automatically. + * Optional sequence of edge type values. If None, edge types will be discovered + * automatically. * @group degree */ def typeDegree(edgeTypeCol: String, edgeTypes: Option[Seq[Any]] = None): DataFrame = { - import org.apache.spark.sql.functions.coalesce val explodedEdges = edges.select(explode(array(col(SRC), col(DST))).as(ID), col(edgeTypeCol)) val pivotDF = edgeTypes match { @@ -389,10 +390,13 @@ class GraphFrame private ( case None => explodedEdges.groupBy(ID).pivot(edgeTypeCol) } - val countDF = pivotDF.agg(count("*")) - val structCols = countDF.columns.filter(_ != ID).map { colName => - coalesce(col(colName), lit(0)).cast("int").as(colName) - } + val countDF = pivotDF.agg(count(lit(1))).na.fill(0) + val structCols = countDF.columns + .filter(_ != ID) + .map { colName => + col(colName).cast("int").as(colName) + } + .toSeq countDF.select(col(ID), struct(structCols: _*).as("degrees")) } diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index 49ff69965..5164a53c4 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -234,8 +234,8 @@ def type_out_degree( - "outDegrees": a struct with a field for each edge type, storing the out-degree count :param edge_type_col: Name of the column in edges DataFrame that contains edge types - :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. - If None, edge types will be discovered automatically. + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. :return: DataFrame with columns "id" and "outDegrees" (struct type) """ if edge_types is not None: @@ -247,9 +247,9 @@ def type_out_degree( edge_type_col ) - count_df = pivot_df.agg(F.count("*")) + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) struct_cols = [ - F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + F.col(col_name).cast("int").alias(col_name) for col_name in count_df.columns if col_name != self.ID ] @@ -265,8 +265,8 @@ def type_in_degree( - "inDegrees": a struct with a field for each edge type, storing the in-degree count :param edge_type_col: Name of the column in edges DataFrame that contains edge types - :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. - If None, edge types will be discovered automatically. + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. :return: DataFrame with columns "id" and "inDegrees" (struct type) """ if edge_types is not None: @@ -278,9 +278,9 @@ def type_in_degree( edge_type_col ) - count_df = pivot_df.agg(F.count("*")) + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) struct_cols = [ - F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + F.col(col_name).cast("int").alias(col_name) for col_name in count_df.columns if col_name != self.ID ] @@ -288,13 +288,14 @@ def type_in_degree( def type_degree(self, edge_type_col: str, edge_types: Optional[List[Any]] = None) -> DataFrame: """ - The total degree of each vertex per edge type (both in and out), returned as a DataFrame with two columns: + The total degree of each vertex per edge type (both in and out), returned as a DataFrame + with two columns: - "id": the ID of the vertex - "degrees": a struct with a field for each edge type, storing the total degree count :param edge_type_col: Name of the column in edges DataFrame that contains edge types - :param edge_types: Optional list of edge type values. If provided, avoids a collect() operation. - If None, edge types will be discovered automatically. + :param edge_types: Optional list of edge type values. If None, edge types will be + discovered automatically. :return: DataFrame with columns "id" and "degrees" (struct type) """ exploded_edges = self._impl._edges.select( @@ -307,9 +308,9 @@ def type_degree(self, edge_type_col: str, edge_types: Optional[List[Any]] = None else: pivot_df = exploded_edges.groupBy(self.ID).pivot(edge_type_col) - count_df = pivot_df.agg(F.count("*")) + count_df = pivot_df.agg(F.count(F.lit(1))).na.fill(0) struct_cols = [ - F.coalesce(F.col(col_name), F.lit(0)).cast("int").alias(col_name) + F.col(col_name).cast("int").alias(col_name) for col_name in count_df.columns if col_name != self.ID ]