8000 Update zipfile from CPython 3.12.2 · RustPython/RustPython@11f0735 · GitHub
[go: up one dir, main page]

Skip to content
Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 11f0735

Browse files
CPython Developersyouknowone
CPython Developers
authored andcommitted
Update zipfile from CPython 3.12.2
1 parent 8d58668 commit 11f0735

18 files changed

+2327
-784
lines changed

Lib/test/archiver_tests.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""Tests common to tarfile and zipfile."""
2+
3+
import os
4+
import sys
5+
6+
from test.support import os_helper
7+
8+
class OverwriteTests:
9+
10+
def setUp(self):
11+
os.makedirs(self.testdir)
12+
self.addCleanup(os_helper.rmtree, self.testdir)
13+
14+
def create_file(self, path, content=b''):
15+
with open(path, 'wb') as f:
16+
f.write(content)
17+
18+
def open(self, path):
19+
raise NotImplementedError
20+
21+
def extractall(self, ar):
22+
raise NotImplementedError
23+
24+
25+
def test_overwrite_file_as_file(self):
26+
target = os.path.join(self.testdir, 'test')
27+
self.create_file(target, b'content')
28+
with self.open(self.ar_with_file) as ar:
29+
self.extractall(ar)
30+
self.assertTrue(os.path.isfile(target))
31+
with open(target, 'rb') as f:
32+
self.assertEqual(f.read() A3E2 , b'newcontent')
33+
34+
def test_overwrite_dir_as_dir(self):
35+
target = os.path.join(self.testdir, 'test')
36+
os.mkdir(target)
37+
with self.open(self.ar_with_dir) as ar:
38+
self.extractall(ar)
39+
self.assertTrue(os.path.isdir(target))
40+
41+
def test_overwrite_dir_as_implicit_dir(self):
42+
target = os.path.join(self.testdir, 'test')
43+
os.mkdir(target)
44+
with self.open(self.ar_with_implicit_dir) as ar:
45+
self.extractall(ar)
46+
self.assertTrue(os.path.isdir(target))
47+
self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
48+
with open(os.path.join(target, 'file'), 'rb') as f:
49+
self.assertEqual(f.read(), b'newcontent')
50+
51+
def test_overwrite_dir_as_file(self):
52+
target = os.path.join(self.testdir, 'test')
53+
os.mkdir(target)
54+
with self.open(self.ar_with_file) as ar:
55+
with self.assertRaises(PermissionError if sys.platform == 'win32'
56+
else IsADirectoryError):
57+
self.extractall(ar)
58+
self.assertTrue(os.path.isdir(target))
59+
60+
def test_overwrite_file_as_dir(self):
61+
target = os.path.join(self.testdir, 'test')
62+
self.create_file(target, b'content')
63+
with self.open(self.ar_with_dir) as ar:
64+
with self.assertRaises(FileExistsError):
65+
self.extractall(ar)
66+
self.assertTrue(os.path.isfile(target))
67+
with open(target, 'rb') as f:
68+
self.assertEqual(f.read(), b'content')
69+
70+
def test_overwrite_file_as_implicit_dir(self):
71+
target = os.path.join(self.testdir, 'test')
72+
self.create_file(target, b'content')
73+
with self.open(self.ar_with_implicit_dir) as ar:
74+
with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
75+
else NotADirectoryError):
76+
self.extractall(ar)
77+
self.assertTrue(os.path.isfile(target))
78+
with open(target, 'rb') as f:
79+
self.assertEqual(f.read(), b'content')
80+
81+
@os_helper.skip_unless_symlink
82+
def test_overwrite_file_symlink_as_file(self):
83+
# XXX: It is potential security vulnerability.
84+
target = os.path.join(self.testdir, 'test')
85+
target2 = os.path.join(self.testdir, 'test2')
86+
self.create_file(target2, b'content')
87+
os.symlink('test2', target)
88+
with self.open(self.ar_with_file) as ar:
89+
self.extractall(ar)
90+
self.assertTrue(os.path.islink(target))
91+
self.assertTrue(os.path.isfile(target2))
92+
with open(target2, 'rb') as f:
93+
self.assertEqual(f.read(), b'newcontent')
94+
95+
@os_helper.skip_unless_symlink
96+
def test_overwrite_broken_file_symlink_as_file(self):
97+
# XXX: It is potential security vulnerability.
98+
target = os.path.join(self.testdir, 'test')
99+
target2 = os.path.join(self.testdir, 'test2')
100+
os.symlink('test2', target)
101+
with self.open(self.ar_with_file) as ar:
102+
self.extractall(ar)
103+
self.assertTrue(os.path.islink(target))
104+
self.assertTrue(os.path.isfile(target2))
105+
with open(target2, 'rb') as f:
106+
self.assertEqual(f.read(), b'newcontent')
107+
108+
@os_helper.skip_unless_symlink
109+
def test_overwrite_dir_symlink_as_dir(self):
110+
# XXX: It is potential security vulnerability.
111+
target = os.path.join(self.testdir, 'test')
112+
target2 = os.path.join(self.testdir, 'test2')
113+
os.mkdir(target2)
114+
os.symlink('test2', target, target_is_directory=True)
115+
with self.open(self.ar_with_dir) as ar:
116+
self.extractall(ar)
117+
self.assertTrue(os.path.islink(target))
118+
self.assertTrue(os.path.isdir(target2))
119+
120+
@os_helper.skip_unless_symlink
121+
def test_overwrite_dir_symlink_as_implicit_dir(self):
122+
# XXX: It is potential security vulnerability.
123+
target = os.path.join(self.testdir, 'test')
124+
target2 = os.path.join(self.testdir, 'test2')
125+
os.mkdir(target2)
126+
os.symlink('test2', target, target_is_directory=True)
127+
with self.open(self.ar_with_implicit_dir) as ar:
128+
self.extractall(ar)
129+
self.assertTrue(os.path.islink(target))
130+
self.assertTrue(os.path.isdir(target2))
131+
self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
132+
with open(os.path.join(target2, 'file'), 'rb') as f:
133+
self.assertEqual(f.read(), b'newcontent')
134+
135+
@os_helper.skip_unless_symlink
136+
def test_overwrite_broken_dir_symlink_as_dir(self):
137+
target = os.path.join(self.testdir, 'test')
138+
target2 = os.path.join(self.testdir, 'test2')
139+
os.symlink('test2', target, target_is_directory=True)
140+
with self.open(self.ar_with_dir) as ar:
141+
with self.assertRaises(FileExistsError):
142+
self.extractall(ar)
143+
self.assertTrue(os.path.islink(target))
144+
self.assertFalse(os.path.exists(target2))
145+
146+
@os_helper.skip_unless_symlink
147+
def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
148+
target = os.path.join(self.testdir, 'test')
149+
target2 = os.path.join(self.testdir, 'test2')
150+
os.symlink('test2', target, target_is_directory=True)
151+
with self.open(self.ar_with_implicit_dir) as ar:
152+
with self.assertRaises(FileExistsError):
153+
self.extractall(ar)
154+
self.assertTrue(os.path.islink(target))
155+
self.assertFalse(os.path.exists(target2))

Lib/test/test_zipfile/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import os
2+
from test.support import load_package_tests
3+
4+
def load_tests(*args):
5+
return load_package_tests(os.path.dirname(__file__), *args)

Lib/test/test_zipfile/__main__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import unittest
2+
3+
from . import load_tests # noqa: F401
4+
5+
6+
if __name__ == "__main__":
7+
unittest.main()

Lib/test/test_zipfile/_path/__init__.py

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import functools
2+
3+
4+
# from jaraco.functools 3.5.2
5+
def compose(*funcs):
6+
def compose_two(f1, f2):
7+
return lambda *args, **kwargs: f1(f2(*args, **kwargs))
8+
9+
return functools.reduce(compose_two, funcs)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import itertools
2+
from collections import deque
3+
from itertools import islice
4+
5+
6+
# from jaraco.itertools 6.3.0
7+
class Counter:
8+
"""
9+
Wrap an iterable in an object that stores the count of items
10+
that pass through it.
11+
12+
>>> items = Counter(range(20))
13+
>>> items.count
14+
0
15+
>>> values = list(items)
16+
>>> items.count
17+
20
18+
"""
19+
20+
def __init__(self, i):
21+
self.count = 0
22+
self.iter = zip(itertools.count(1), i)
23+
24+
def __iter__(self):
25+
return self
26+
27+
def __next__(self):
28+
self.count, result = next(self.iter)
29+
return result
30+
31+
32+
# from more_itertools v8.13.0
33+
def always_iterable(obj, base_type=(str, bytes)):
34+
if obj is None:
35+
return iter(())
36+
37+
if (base_type is not None) and isinstance(obj, base_type):
38+
return iter((obj,))
39+
40+
try:
41+
return iter(obj)
42+
except TypeError:
43+
return iter((obj,))
44+
45+
46+
# from more_itertools v9.0.0
47+
def consume(iterator, n=None):
48+
"""Advance *iterable* by *n* steps. If *n* is ``None``, consume it
49+
entirely.
50+
Efficiently exhausts an iterator without returning values. Defaults to
51+
consuming the whole iterator, but an optional second argument may be
52+
provided to limit consumption.
53+
>>> i = (x for x in range(10))
54+
>>> next(i)
55+
0
56+
>>> consume(i, 3)
57+
>>> next(i)
58+
4
59+
>>> consume(i)
60+
>>> next(i)
61+
Traceback (most recent call last):
62+
File "<stdin>", line 1, in <module>
63+
StopIteration
64+
If the iterator has fewer items remaining than the provided limit, the
65+
whole iterator will be consumed.
66+
>>> i = (x for x in range(3))
67+
>>> consume(i, 5)
68+
>>> next(i)
69+
Traceback (most recent call last):
70+
File "<stdin>", line 1, in <module>
71+
StopIteration
72+
"""
73+
# Use functions that consume iterators at C speed.
74+
if n is None:
75+
# feed the entire iterator into a zero-length deque
76+
deque(iterator, maxlen=0)
77+
else:
78+
# advance to the empty slice starting at position n
79+
next(islice(iterator, n, n), None)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import importlib
2+
import unittest
3+
4+
5+
def import_or_skip(name):
6+
try:
7+
return importlib.import_module(name)
8+
except ImportError: # pragma: no cover
9+
raise unittest.SkipTest(f'Unable to import {name}')
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import types
2+
import functools
3+
4+
from ._itertools import always_iterable
5+
6+
7+
def parameterize(names, value_groups):
8+
"""
9+
Decorate a test method to run it as a set of subtests.
10+
11+
Modeled after pytest.parametrize.
12+
"""
13+
14+
def decorator(func):
15+
@functools.wraps(func)
16+
def wrapped(self):
17+
for values in value_groups:
18+
resolved = map(Invoked.eval, always_iterable(values))
19+
params = dict(zip(always_iterable(names), resolved))
20+
with self.subTest(**params):
21+
func(self, **params)
22+
23+
return wrapped
24+
25+
return decorator
26+
27+
28+
class Invoked(types.SimpleNamespace):
29+
"""
30+
Wrap a function to be invoked for each usage.
31+
"""
32+
33+
@classmethod
34+
def wrap(cls, func):
35+
return cls(func=func)
36+
37+
@classmethod
38+
def eval(cls, cand):
39+
return cand.func() if isinstance(cand, cls) else cand

0 commit comments

Comments
 (0)
0