-
Notifications
You must be signed in to change notification settings - Fork 24.2k
[DataPipe] Enforcing single valid iterator for IterDataPipes with single DataPipe as output #70479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
[ghstack-poisoned]
CI Flow Status⚛️ CI FlowRuleset - Version:
You can add a comment to the PR and tag @pytorchbot with the following commands: # ciflow rerun, "ciflow/default" will always be added automatically
@pytorchbot ciflow rerun
# ciflow rerun with additional labels "-l <ciflow/label_name>", which is equivalent to adding these labels manually and trigger the rerun
@pytorchbot ciflow rerun -l ciflow/scheduled -l ciflow/slow For more information, please take a look at the CI Flow Wiki. |
🔗 Helpful links
✅ No Failures (0 Pending)As of commit db9862f (more details on the Dr. CI page): Expand to see more💚 💚 Looks good so far! There are no failures yet. 💚 💚 This comment was automatically generated by Dr. CI (expand for details).Please report bugs/suggestions to the (internal) Dr. CI Users group. |
@NivekT has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
…erDataPipe" Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…erDataPipe" Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…erDataPipe" Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…erDataPipe" Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…erDataPipe" Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in that linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…es with single DataPipe as output" Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in that linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…es with single DataPipe as output" Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in that linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
@VitalyFedyunin @ejguan This PR and the next one in the stack are ready for you to have a look. I expect some CI breakage since we probably re-used some DataPipes somewhere in our tests. I am in the process of going through and fixing those. I am also going through the domain use cases to see if anything breaks. |
…es with single DataPipe as output" Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in that linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
test/test_profiler.py
Outdated
it_dp2 = iter(dp2) # This creates a 2nd iterator, hence invalidating `it_dp1` and `dp1` | ||
self.assertEqual(5, it_dp2.get_value(5)) # type: ignore[attr-defined] | ||
with self.assertRaisesRegex(RuntimeError, "A separate iterator has been created"): | ||
next(it_dp2) | ||
with self.assertRaisesRegex(RuntimeError, "A separate iterator has been created"): | ||
self.assertEqual(list(range(10)), list(it_dp2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The necessary change to this test case here is worth having a closer look.
The basic issue was that within our constraint logic, we decided to increment the counter even when iter
returns self
, and this invalidates the source datapipe dp1
such that calling next
will trigger an exception
Given an instance of a DataPipe and an iterator ID, check if the IDs match, and if not, raises an exception. | ||
""" | ||
if hasattr(datapipe, "_is_child_datapipe") and datapipe._is_child_datapipe is True: | ||
pass # TODO: Add logic for multiple ChildDataPipe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is added in the next PR within this stack
# Note that if the `__next__` and `__iter__` do something completely unrelated? It may cause issue but | ||
# the user will be violating the iterator protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the __next__
and __iter__
do something completely unrelated, it would violate the iterator protocol and the behavior here maybe unexpected. But as long as user sticks with iterator protocol or returns an iterator, everything should be fine.
test/test_datapipe.py
Outdated
# Functional Test: Check that every `__iter__` call returns the same object | ||
source_dp = _CustomIterDP_Self(range(10)) | ||
it1 = iter(source_dp) | ||
self.assertEqual(0, next(it1)) | ||
self.assertEqual(1, next(source_dp)) | ||
it2 = iter(source_dp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we are applying the single iterator constraint even when iter
returns self
. This means the users can at most create one iterator from the object.
@NivekT has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this PR introduces BC breaking changes, could you please add a detailed behavior changes in the PR summary? When we do release note, it will save more time.
…es with single DataPipe as output" This PR introduces the single iterator constraint for `IterDataPipe`, focusing on `IterDataPipe` that produces one single `IterDataPipe` as output. This constraint is necessary because having multiple iterators referencing the same `IterDataPipe` can lead to incoherent internal state (e.g. `shuffler` may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms. Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. The new test `TestIterDataPipeSingletonConstraint` in `test_datapipe.py` illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need buggy case when it returns a new object, but also have next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding a buggy case at the bottom. Let me know if it matches what you have in mind.
it2 = iter(source_dp) | ||
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"): | ||
next(it1) | ||
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, why it is invalid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have at least one valid iterator after this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my original proposal, the users can are allowed to call __iter__
as many times as they want. But after our offline discussion, we decided to apply the constraint on __iter__
even when it may be returning self
. This is because, having multiple active iterators referencing the same underlying object can lead to confusion and bugs. Plus the use case isn't really necessary when you can just reference dp
. The code snippet below illustrates what the behavior would've been like if it is allowed:
dp = CustomDP(range(10))
next(dp) # returns 0
it1 = iter(dp) # returns `self`
next(it1) # returns 1
it2 = iter(dp) # returns `self`
next(it2) # returns 2
next(it1) # returns 3
next(dp) # returns 4
In the current implementation, calling next(dp)
raises an error if there is more than 1 iterator created from that DataPipe. This is because the ID that tracks whether the iterator is valid (i.e. dp._valid_iterator_id
) is tied to the DataPipe (and because it returns self
, the ID is tied to the iterator as well). I think it has to be the case, otherwise when next(dp)
is called. It has no idea to what ID to check against.
source_dp = _CustomIterDP_Self(range(10))
it1 = iter(source_dp)
self.assertEqual(0, next(it1))
self.assertEqual(1, next(source_dp))
it2 = iter(source_dp)
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"):
next(it1)
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"):
next(it2)
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"):
next(source_dp)
# In this test case, there is no valid iterator at the end, because `iter(it1)` delegates to `iter(dp)` anyway.
The simplest workaround for users is to re-define dp
via dp = CustomDP()
or avoid creating more than one iterator from dp
by keep using dp
as the variable rather than calling iter(dp)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the proposal you have
- For
DataPipe
returningself
in__iter__
: We disallow users to calliter
multiple times. - For
DataPipe
using a generator function as__iter__
: We are making sure a new iterator is created and previous iterator is invalidated wheneveriter
is called.
As we are making iterator of DataPiep
singleton (the same behavior), we are introducing a new difference on iterator. I think this is not preferable.
I think as we have reset
function to help DataPipe
to reset iterator. Why don't we rely on reset
to clean up all buffer, etc. for DataPipe
returning self
in __iter__
. And, the IteratorDecorator
and datapipe
itself should be able to track the iterator_id to invalidate previous iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see how that can be confusing.
I think this proposal will be best, given a DataPipe dp
with __next__
with iterators it1
and it2
:
- Allows unlimited creation of iterators, but only the latest one is valid/active
- Doesn't place restriction on
next(dp)
(but check for invalidation whennext(it1)
is called- We can't really place restriction on
next(dp)
, because any call tonext(it2)
gets delegated tonext(dp)
within theIteratorDecorator
- We can't really place restriction on
source_dp = _CustomIterDP_Self(range(10))
it1 = iter(source_dp)
self.assertEqual(0, next(it1))
self.assertEqual(1, next(source_dp))
# Only invalidates `it1`, and not `source_dp`. Since methods of `it2` depends on `source_dp` remaining valid
it2 = iter(source_dp)
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"):
next(it1)
self.assertEqual(0, next(it2))
self.assertEqual(1, next(source_dp))
Please examine this test for the full details:
Lines 2337 to 2390 in c77df3e
def test_iterdatapipe_singleton_self_next(self): | |
r""" | |
Testing for the case where IterDataPipe's `__iter__` returns `self` and there is a `__next__` method | |
Note that the following DataPipe by is singleton by default (because `__iter__` returns `self`). | |
""" | |
class _CustomIterDP_Self(IterDataPipe): | |
def __init__(self, iterable): | |
self.source = iterable | |
self.iterable = iter(iterable) | |
def __iter__(self): | |
self.reset() | |
return self | |
def __next__(self): | |
return next(self.iterable) | |
def reset(self): | |
self.iterable = iter(self.source) | |
# Functional Test: Check that every `__iter__` call returns the same object | |
source_dp = _CustomIterDP_Self(range(10)) | |
res = list(source_dp) | |
it = iter(source_dp) | |
self.assertEqual(res, list(it)) | |
# Functional Test: Check if invalidation logic is correct | |
source_dp = _CustomIterDP_Self(range(10)) | |
it1 = iter(source_dp) | |
self.assertEqual(0, next(it1)) | |
self.assertEqual(1, next(source_dp)) | |
# Only invalidates `it1`, and not `source_dp`. Since methods of `it2` depends on `source_dp` remaining valid | |
it2 = iter(source_dp) | |
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"): | |
next(it1) | |
self.assertEqual(0, next(it2)) | |
self.assertEqual(1, next(source_dp)) | |
# Functional Test: extend the test to a pipeline | |
source_dp = _CustomIterDP_Self(dp.iter.IterableWrapper(range(10)).map(_fake_fn).filter(_fake_filter_fn)) | |
it1 = iter(source_dp) | |
self.assertEqual(0, next(it1)) | |
self.assertEqual(1, next(source_dp)) | |
# Only invalidates `it1`, and not `source_dp`. Since methods of `it2` depends on `source_dp` remaining valid | |
it2 = iter(source_dp) | |
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"): | |
next(it1) | |
self.assertEqual(0, next(it2)) | |
self.assertEqual(1, next(source_dp)) | |
# Functional Test: multiple simultaneous references to the same DataPipe fails | |
with self.assertRaisesRegex(RuntimeError, "This iterator has been invalidated"): | |
for _ in zip(source_dp, source_dp): | |
pass |
I suddenly realize there may potentially be another problem. When we invalidate the previous iterator, we don't destroy them. It means the iterator object is never released from memory except the case the |
The iterator object should get garbage collected if there is no longer any references to it. Correct? dp = IterableWrapper(range(10))
it1 = iter(dp)
it2 = iter(dp) # invalidates `it1`
`it1` can now be gc'ed because there isn't any reference to it |
If possible, we could use the |
@VitalyFedyunin @ejguan I have addressed the comments and added more test cases. Let's discuss these two related points if they remain to be unclear/problematic: |
…es with single DataPipe as output" This PR introduces the single iterator constraint for `IterDataPipe`, focusing on `IterDataPipe` that produces one single `IterDataPipe` as output. This constraint is necessary because having multiple iterators referencing the same `IterDataPipe` can lead to incoherent internal state (e.g. `shuffler` may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms. Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. The new test `TestIterDataPipeSingletonConstraint` in `test_datapipe.py` illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
…es with single DataPipe as output" This PR introduces the single iterator constraint for `IterDataPipe`, focusing on `IterDataPipe` that produces one single `IterDataPipe` as output. This constraint is necessary because having multiple iterators referencing the same `IterDataPipe` can lead to incoherent internal state (e.g. `shuffler` may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms. Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. The new test `TestIterDataPipeSingletonConstraint` in `test_datapipe.py` illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior based on the test cases look good to me. Could you please also update the docstring for IterDataPipe
to indicate this singleton behavior?
And, could you please import this PR to Phabricator? We don't need to land from internal but it's good to validate this won't break any internal logic
…es with single DataPipe as output" This PR introduces the single iterator constraint for `IterDataPipe`, focusing on `IterDataPipe` that produces one single `IterDataPipe` as output. This constraint is necessary because having multiple iterators referencing the same `IterDataPipe` can lead to incoherent internal state (e.g. `shuffler` may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms. Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. The new test `TestIterDataPipeSingletonConstraint` in `test_datapipe.py` illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
I can definitely update the docstring. I tried importing yesterday but it didn't work but I can try again. |
…es with single DataPipe as output" This PR introduces the single iterator constraint for `IterDataPipe`, focusing on `IterDataPipe` that produces one single `IterDataPipe` as output. This constraint is necessary because having multiple iterators referencing the same `IterDataPipe` can lead to incoherent internal state (e.g. `shuffler` may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms. Fixes part of pytorch/data#45 The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in `torch/utils/data/datapipes/_typing.py` to see if the implementation is sensible. The new test `TestIterDataPipeSingletonConstraint` in `test_datapipe.py` illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear. Differential Revision: [D33344609](https://our.internmc.facebook.com/intern/diff/D33344609) [ghstack-poisoned]
@NivekT has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
@pytorchbot revert this, broke internal tests, escalated to Kevin Tse [ktse@fb.com] |
Reverting PR 70479 failed due to Command
Raised by https://github.com/pytorch/pytorch/actions/runs/2353257470 |
landing it internally and we will do fix forward |
Thank you! |
msg = ("This iterator has been invalidated because another iterator has been created" | ||
f"from the same IterDataPipe: {_generate_iterdatapipe_msg(datapipe)}\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There is a missing space between created
and from
in the Error message. cc: @NivekT
Stack from ghstack:
_IterDataPipeMeta
#76776This PR introduces the single iterator constraint for
IterDataPipe
, focusing onIterDataPipe
that produces one singleIterDataPipe
as output. This constraint is necessary because having multiple iterators referencing the sameIterDataPipe
can lead to incoherent internal state (e.g.shuffler
may not produce sensible outputs), such that we cannot save a snapshot of the DataPipe or have determinism mechanisms.Fixes part of pytorch/data#45
The expected behavior of single iterator per DataPipe is described in details within the linked issue. Please review the changes in
torch/utils/data/datapipes/_typing.py
to see if the implementation is sensible. The new testTestIterDataPipeSingletonConstraint
intest_datapipe.py
illustrates the behaviors that the constraint should impose. Please comment there if any behavior seems unexpected/unclear.BC-breaking Note:
IterDataPipe
, eachIterDataPipe
can only have one active iterator at a time. The most recently created iterator is considered to be the active one; all previous ones are invalid. Attempting to use an invalid DataPipe will result in an error. The code example below illustrates th 8000 is behavior:1.11.0 or before:
This PR:
IterDataPipe
is not allowed.1.11.0 or before:
This PR:
.fork()
.MapDataPipe
,Dataset
, orIterableDataset
.Differential Revision: D33344609