8000 [WIP] feat: random walks by SemyonSinchenko · Pull Request #752 · graphframes/graphframes · GitHub
[go: up one dir, main page]

Skip to content

Conversation

@SemyonSinchenko
Copy link
Collaborator

What changes were proposed in this pull request?

  • RandomWalks Base
  • RandomWalks with Restart Impl
  • Edges Sampling API

Why are the changes needed?

Close #726
Close #324

@SemyonSinchenko
Copy link
Collaborator Author

Work is in progress actually. I'm still thinking how to implement RW in a best way, what should be in abstraction, what should be in impls. What configurations should be provided, etc.

Current idea:

  • limiting amount of collected neighbors (would be nice to have Reservoir Sampling, cc: @SauronShepherd )
  • run in batches
  • each batch generate short walks, save to parquet (partitioning???)
  • at the end we are joining all the batches based on initially generated RW UUID

I think it should allow to generate really long way with a quite limited resources... Not sure about performance.

@rjurney
Copy link
Collaborator
rjurney commented Nov 20, 2025

@SemyonSinchenko How could this work? Like a Pregel thing? Maye nodes carry their paths with them? I've long been confused here... joins don't seem to scale. I'm not sure but would love to do so... can you explain? I mean I have read other implementations like the one I shared in the other thread: https://github.com/data61/stellar-random-walk

@SemyonSinchenko
Copy link
Collaborator Author

@SemyonSinchenko How could this work? Like a Pregel thing? Maye nodes carry their paths with them? I've long been confused here... joins don't seem to scale. I'm not sure but would love to do so... can you explain? I mean I have read other implementations like the one I shared in the other thread: https://github.com/data61/stellar-random-walk

Change my mind, but pure second oreder RW is not scalable. Just to understand, imagine two node with 1000 degree (common case in power law graphs). You need to collect two sets of neighborhouds of size 1000. It scales even worser than GF triangleCount that is suffering from the same problem.

@SemyonSinchenko
Copy link
Collaborator Author

Yes, it is doing joins. Joins are slow but they are scalable. I see no other options tbh. To avoid huge neighborhoods, Im using a limit: at each batch take only part of vertex neighbors.

@SauronShepherd
Copy link
Contributor

I'm afraid I'm not knowledgeable enough to give a strong opinion here, but after a short chat with ChatGPT I wondered: what about integrating an existing system like ThunderRW with GraphFrames?

8000

@SemyonSinchenko
Copy link
Collaborator Author

I'm starting to think that we do not need RWs at all. @SauronShepherd there is no problem to write it from scratch in Spark. My question to you was mostly about reservoir sampling aggregation function if you are interested to implement it.

@rjurney
Copy link
Collaborator
rjurney commented Nov 21, 2025

Various embedding algorithms require random walks and I've seen implementations out there, but maybe they're not top priority?

@SemyonSinchenko
Copy link
Collaborator Author
SemyonSinchenko commented Nov 21, 2025

Various embedding algorithms require random walks and I've seen implementations out there, but maybe they're not top priority?

This PR is a WIP implementation, so if you have any suggestions or comments feel free. Im trying to make it scalable and from the first look it is.

- Added Scala-style docstrings to all classes, traits, methods, and fields
- Improved documentation for random walk algorithms and configurations
- Correct element_at index from 0 to 1 for 1-based Spark SQL arrays
- Fix walk array construction by appending nextNode instead of currVisitingVertex
- Add null handling for nodes with no outgoing neighbors in restart logic
- Add comprehensive Scala docstrings to RandomWalkBase and RandomWalkWithRestart
- Create RWExample.scala demonstrating RandomWalkWithRestart on LDBC datasets

...
This commit introduces a new Word2Vec-based embedding method using the hashing trick to handle large vocabularies efficiently in graph frames, particularly for random walk sequences. It includes configurable parameters like number of hashing functions, max features, and standard W2V settings, with comprehensive Scaladoc for public APIs.

- Added core/src/main/scala/org/graphframes/embeddings/Word2VecHashingTrick.scala: New class implementing hashing trick by applying multiple Murmur3 hash functions and modulo to map features to a fixed-size space, reducing collisions and memory usage. It trains a W2V model on expanded sequences and provides a companion model class for vector retrieval via averaging hashed embeddings. Setters include docstrings explaining trade-offs (e.g., more hashes improve quality but multiply dataset size).
- Modified core/src/main/scala/org/graphframes/examples/RWExample.scala: Updated main method to accept a single file path argument for edge loading instead of downloading LDBC datasets, simplifying usage for local files. Replaced vertex loading with direct derivation from edges for consistency and reduced I/O.
- Modified core/src/main/scala/org/graphframes/exceptions.scala: Added GraphFramesW2VException class to handle W2V-specific errors, such as unsupported input types in hashing.
Replace collect_set + shuffle + slice with ReservoirSamplingAgg UDAF for
efficient sampling of up to maxNbrs neighbors per vertex. This improves
performance by avoiding full neighbor list aggregation and shuffling,
especially beneficial for high-degree vertices.

- Add ReservoirSamplingAgg trait: generic aggregator using reservoir
  sampling algorithm, supporting merge operations for distributed
  computation.
- Handle various vertex ID types (String, Short, Byte, Int, Long) with
  appropriate encoders.
- Raise GraphFramesUnsupportedVertexTypeException for unsupported types.
- Add comprehensive test suite covering reduce, merge, and finish
  operations with edge cases and fixed seeds for determinism.

Modified files:
- .gitignore: Ignore Emacs temp files for cleaner diffs.
- core/src/main/scala/org/graphframes/exceptions.scala: New exception class.
- core/src/main/scala/org/graphframes/rw/RandomWalkBase.scala: Integrate
  ReservoirSamplingAgg in prepareGraph method.

New files:
- core/src/main/scala/org/apache/spark/sql/graphframes/expressions/ReservoirSamplingAgg.scala
- core/src/test/scala/org/apache/spark/sql/graphframes/expressions/ReservoirSamplingAggSuite.scala
delete wrong implementation of w2v + hashing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: support for generating random walks feat: sampling API and strategies

3 participants

0