From 67d0472a38c832ec3e435e95766db3b79e788a14 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 11 Oct 2025 10:26:39 +0200 Subject: [PATCH 1/3] reduce memory consumption of cycles detection --- .../main/scala/org/graphframes/GraphFrame.scala | 6 +----- .../org/graphframes/lib/DetectingCycles.scala | 7 +++++-- .../graphframes/lib/DetectingCyclesSuite.scala | 15 +++------------ 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/graphframes/GraphFrame.scala b/core/src/main/scala/org/graphframes/GraphFrame.scala index da59a74f8..23781c641 100644 --- a/core/src/main/scala/org/graphframes/GraphFrame.scala +++ b/core/src/main/scala/org/graphframes/GraphFrame.scala @@ -672,11 +672,7 @@ class GraphFrame private ( * large-scale sparse graphs." Proceedings of Simpósio Brasileiro de Pesquisa Operacional * (SBPO’15) (2015): 1-11. * - * Returns a DataFrame with ID and cycles, ID are not unique if there are multiple cycles - * starting from this ID. For the case of cycle 1 -> 2 -> 3 -> 1 all the vertices will have the - * same cycle! E.g.: 1 -> [1, 2, 3, 1] 2 -> [2, 3, 1, 2] 3 -> [3, 1, 2, 3] - * - * Deduplication of cycles should be done by the user! + * Returns a DataFrame with unque cycles. * * @return * an instance of DetectingCycles initialized with the current context diff --git a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala index 340ba81ba..c68884f30 100644 --- a/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala +++ b/core/src/main/scala/org/graphframes/lib/DetectingCycles.scala @@ -1,5 +1,6 @@ package org.graphframes.lib +import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.* import org.apache.spark.sql.types.ArrayType @@ -30,7 +31,6 @@ class DetectingCycles private[graphframes] (private val graph: GraphFrame) filter(col(foundSeqCol), x => size(x) > lit(0)).alias(foundSeqCol)) .filter(size(col(foundSeqCol)) > lit(0)) .select( - col(GraphFrame.ID), // from vid -> [[cycle1, cycle2, ...]] // to vid -> [cycle1], vid -> [cycle2], ... explode(col(foundSeqCol)).alias(foundSeqCol)) @@ -62,7 +62,10 @@ object DetectingCycles { // Each vertex stores all the found cycles val foundSequences = array().cast(ArrayType(ArrayType(vertexDT))) // Message is simply stored sequences - val sentMessages = when(size(Pregel.src(storedSeqCol)) =!= lit(0), Pregel.src(storedSeqCol)) + // Send only sequences if the starting vertex of them is less than the destination + val sentMessages = when( + size(Pregel.src(storedSeqCol)) =!= lit(0), + filter(Pregel.src(storedSeqCol), (x: Column) => x.getItem(0) <= Pregel.dst(GraphFrame.ID))) .otherwise(lit(null).cast(ArrayType(ArrayType(vertexDT)))) // If the sequence contains the current vertex ID somewhere in the middle, it is // a previously detected cycle and a sequence should be discarded. diff --git a/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala index c3bf55413..6a213cfae 100644 --- a/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala +++ b/core/src/test/scala/org/graphframes/lib/DetectingCyclesSuite.scala @@ -17,17 +17,13 @@ class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext .createDataFrame(Seq((1L, 2L), (2L, 3L), (3L, 1L), (1L, 4L), (2L, 5L))) .toDF("src", "dst")) val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() - assert(res.count() == 3) + assert(res.count() == 1) @nowarn val collected = res - .sort(GraphFrame.ID) - .select(DetectingCycles.foundSeqCol) .collect() .map(r => r.getAs[mutable.WrappedArray[Long]](0)) assert(collected(0) == Seq(1, 2, 3, 1)) - assert(collected(1) == Seq(2, 3, 1, 2)) - assert(collected(2) == Seq(3, 1, 2, 3)) res.unpersist() } @@ -53,20 +49,15 @@ class DetectingCyclesSuite extends SparkFunSuite with GraphFrameTestSparkContext .createDataFrame(Seq((1L, 2L), (2L, 1L), (1L, 3L), (3L, 1L), (2L, 5L), (5L, 1L))) .toDF("src", "dst")) val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() - assert(res.count() == 7) + assert(res.count() == 3) @nowarn val collected = res - .sort(GraphFrame.ID, DetectingCycles.foundSeqCol) - .select(DetectingCycles.foundSeqCol) + .sort(DetectingCycles.foundSeqCol) .collect() .map(r => r.getAs[mutable.WrappedArray[Long]](0)) assert(collected(0) == Seq(1, 2, 1)) assert(collected(1) == Seq(1, 2, 5, 1)) assert(collected(2) == Seq(1, 3, 1)) - assert(collected(3) == Seq(2, 1, 2)) - assert(collected(4) == Seq(2, 5, 1, 2)) - assert(collected(5) == Seq(3, 1, 3)) - assert(collected(6) == Seq(5, 1, 2, 5)) res.unpersist() } } From d20820e9513e85159924550b402cfc74546ab669 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 11 Oct 2025 14:59:22 +0200 Subject: [PATCH 2/3] pytests --- docs/src/04-user-guide/05-traversals.md | 20 ++++++-------------- python/graphframes/graphframe.py | 6 +----- python/tests/test_graphframes.py | 4 ++-- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/docs/src/04-user-guide/05-traversals.md b/docs/src/04-user-guide/05-traversals.md index 5f0cd1d8c..600cd85dc 100644 --- a/docs/src/04-user-guide/05-traversals.md +++ b/docs/src/04-user-guide/05-traversals.md @@ -209,18 +209,10 @@ val res = graph.detectingCycles.setUseLocalCheckpoints(true).run() res.show(false) // Output: -// +----+--------------+ -// | id | found_cycles | -// +----+--------------+ -// |1 |[1, 3, 1] | -// |1 |[1, 2, 1] | -// |1 |[1, 2, 5, 1] | -// |2 |[2, 1, 2] | -// |2 |[2, 5, 1, 2] | -// |3 |[3, 1, 3] | -// |5 |[5, 1, 2, 5] | -// +----+--------------+ +// +--------------+ +// | found_cycles | +// +--------------+ +// |[1, 3, 1] | +// |[1, 2, 1] | +// |[1, 2, 5, 1] | ``` - -**WARNING:** This algorithm returns all the cycles, and users should handle deduplication of \[1, 2, 1\] and \[2, 1, 2\] ( -that is the same cycle)! diff --git a/python/graphframes/graphframe.py b/python/graphframes/graphframe.py index d2cae7b9e..6c72c20ed 100644 --- a/python/graphframes/graphframe.py +++ b/python/graphframes/graphframe.py @@ -301,11 +301,7 @@ def detectingCycles( large-scale sparse graphs." Proceedings of Simpósio Brasileiro de Pesquisa Operacional (SBPO’15) (2015): 1-11. - Returns a DataFrame with ID and cycles, ID are not unique if there are multiple cycles - starting from this ID. For the case of cycle 1 -> 2 -> 3 -> 1 all the vertices will have the - same cycle! E.g.: 1 -> [1, 2, 3, 1] 2 -> [2, 3, 1, 2] 3 -> [3, 1, 2, 3] - - Deduplication of cycles should be done by the user! + Returns a DataFrame with unique cycles. :param checkpoint_interval: Pregel checkpoint interval, default is 2 :param use_local_checkpoints: should local checkpoints be used instead of checkpointDir diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index f5aa0eace..41a74940e 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -481,9 +481,9 @@ def test_cycles_finding(spark: SparkSession, args: PregelArguments) -> None: use_local_checkpoints=args.use_local_checkpoints, storage_level=args.storage_level, ) - assert res.count() == 3 + assert res.count() == 1 collected = res.sort("id").select("found_cycles").collect() - assert [row[0] for row in collected] == [[1, 2, 3, 1], [2, 3, 1, 2], [3, 1, 2, 3]] + assert collected[0] == [1, 2, 3, 1] _ = res.unpersist() From bdae0ced45e5477c474546c04a7dc0088abe263b Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sun, 12 Oct 2025 10:21:54 +0200 Subject: [PATCH 3/3] fix pytest --- python/tests/test_graphframes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_graphframes.py b/python/tests/test_graphframes.py index 41a74940e..0d4395602 100644 --- a/python/tests/test_graphframes.py +++ b/python/tests/test_graphframes.py @@ -483,7 +483,7 @@ def test_cycles_finding(spark: SparkSession, args: PregelArguments) -> None: ) assert res.count() == 1 collected = res.sort("id").select("found_cycles").collect() - assert collected[0] == [1, 2, 3, 1] + assert collected[0][0] == [1, 2, 3, 1] _ = res.unpersist()