diff --git a/Lib/codecs.py b/Lib/codecs.py index e6ad6e3a05..82f23983e7 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -414,6 +414,9 @@ def __enter__(self): def __exit__(self, type, value, tb): self.stream.close() + def __reduce_ex__(self, proto): + raise TypeError("can't serialize %s" % self.__class__.__name__) + ### class StreamReader(Codec): @@ -663,6 +666,9 @@ def __enter__(self): def __exit__(self, type, value, tb): self.stream.close() + def __reduce_ex__(self, proto): + raise TypeError("can't serialize %s" % self.__class__.__name__) + ### class StreamReaderWriter: @@ -750,6 +756,9 @@ def __enter__(self): def __exit__(self, type, value, tb): self.stream.close() + def __reduce_ex__(self, proto): + raise TypeError("can't serialize %s" % self.__class__.__name__) + ### class StreamRecoder: @@ -866,6 +875,9 @@ def __enter__(self): def __exit__(self, type, value, tb): self.stream.close() + def __reduce_ex__(self, proto): + raise TypeError("can't serialize %s" % self.__class__.__name__) + ### Shortcuts def open(filename, mode='r', encoding=None, errors='strict', buffering=-1): @@ -878,7 +890,8 @@ def open(filename, mode='r', encoding=None, errors='strict', buffering=-1): codecs. Output is also codec dependent and will usually be Unicode as well. - Underlying encoded files are always opened in binary mode. + If encoding is not None, then the + underlying encoded files are always opened in binary mode. The default file mode is 'r', meaning to open the file in read mode. encoding specifies the encoding which is to be used for the @@ -1114,13 +1127,3 @@ def make_encoding_map(decoding_map): _false = 0 if _false: import encodings - -### Tests - -if __name__ == '__main__': - - # Make stdout translate Latin-1 output into UTF-8 output - sys.stdout = EncodedFile(sys.stdout, 'latin-1', 'utf-8') - - # Have stdin translate Latin-1 input into UTF-8 input - sys.stdin = EncodedFile(sys.stdin, 'utf-8', 'latin-1') diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 0b972a58a5..085b800b6d 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1,7 +1,9 @@ import codecs import contextlib +import copy import io import locale +import pickle import sys import unittest import encodings @@ -9,12 +11,15 @@ from test import support from test.support import os_helper -from test.support import warnings_helper try: import _testcapi except ImportError: _testcapi = None +try: + import _testinternalcapi +except ImportError: + _testinternalcapi = None try: import ctypes @@ -845,11 +850,12 @@ def test_decoder_state(self): "spamspam", self.spamle) self.check_state_handling_decode(self.encoding, "spamspam", self.spambe) - - # TODO: RUSTPYTHON + + # TODO: RUSTPYTHON - ValueError: invalid mode 'Ub' @unittest.expectedFailure def test_bug691291(self): - # Files are always opened in binary mode, even if no binary mode was + # If encoding is not None, then + # files are always opened in binary mode, even if no binary mode was # specified. This means that no automatic conversion of '\n' is done # on reading and writing. s1 = 'Hello\r\nworld\r\n' @@ -1378,8 +1384,11 @@ def test_escape(self): check(br"\9", b"\\9") with self.assertWarns(DeprecationWarning): check(b"\\\xfa", b"\\\xfa") - - # TODO: RUSTPYTHON + for i in range(0o400, 0o1000): + with self.assertWarns(DeprecationWarning): + check(rb'\%o' % i, bytes([i & 0o377])) + + # TODO: RUSTPYTHON - ValueError: not raised by escape_decode @unittest.expectedFailure def test_errors(self): decode = codecs.escape_decode @@ -1723,6 +1732,12 @@ def test_builtin_encode(self): self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org") self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.") + def test_builtin_decode_length_limit(self): + with self.assertRaisesRegex(UnicodeError, "way too long"): + (b"xn--016c"+b"a"*1100).decode("idna") + with self.assertRaisesRegex(UnicodeError, "too long"): + (b"xn--016c"+b"a"*70).decode("idna") + def test_stream(self): r = codecs.getreader("idna")(io.BytesIO(b"abc")) r.read(3) @@ -1933,6 +1948,7 @@ def test_file_closes_if_lookup_error_raised(self): file().close.assert_called() + class StreamReaderTest(unittest.TestCase): def setUp(self): @@ -1943,6 +1959,61 @@ def test_readlines(self): f = self.reader(self.stream) self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00']) + def test_copy(self): + f = self.reader(Queue(b'\xed\x95\x9c\n\xea\xb8\x80')) + with self.assertRaisesRegex(TypeError, 'StreamReader'): + copy.copy(f) + with self.assertRaisesRegex(TypeError, 'StreamReader'): + copy.deepcopy(f) + + def test_pickle(self): + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + f = self.reader(Queue(b'\xed\x95\x9c\n\xea\xb8\x80')) + with self.assertRaisesRegex(TypeError, 'StreamReader'): + pickle.dumps(f, proto) + + +class StreamWriterTest(unittest.TestCase): + + def setUp(self): + self.writer = codecs.getwriter('utf-8') + + def test_copy(self): + f = self.writer(Queue(b'')) + with self.assertRaisesRegex(TypeError, 'StreamWriter'): + copy.copy(f) + with self.assertRaisesRegex(TypeError, 'StreamWriter'): + copy.deepcopy(f) + + def test_pickle(self): + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + f = self.writer(Queue(b'')) + with self.assertRaisesRegex(TypeError, 'StreamWriter'): + pickle.dumps(f, proto) + + +class StreamReaderWriterTest(unittest.TestCase): + + def setUp(self): + self.reader = codecs.getreader('latin1') + self.writer = codecs.getwriter('utf-8') + + def test_copy(self): + f = codecs.StreamReaderWriter(Queue(b''), self.reader, self.writer) + with self.assertRaisesRegex(TypeError, 'StreamReaderWriter'): + copy.copy(f) + with self.assertRaisesRegex(TypeError, 'StreamReaderWriter'): + copy.deepcopy(f) + + def test_pickle(self): + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + f = codecs.StreamReaderWriter(Queue(b''), self.reader, self.writer) + with self.assertRaisesRegex(TypeError, 'StreamReaderWriter'): + pickle.dumps(f, proto) + class EncodedFileTest(unittest.TestCase): @@ -2086,7 +2157,10 @@ def test_basics(self): name += "_codec" elif encoding == "latin_1": name = "latin_1" - self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-")) + # Skip the mbcs alias on Windows + if name != "mbcs": + self.assertEqual(encoding.replace("_", "-"), + name.replace("_", "-")) (b, size) = codecs.getencoder(encoding)(s) self.assertEqual(size, len(s), "encoding=%r" % encoding) @@ -2156,6 +2230,7 @@ def test_basics(self): "encoding=%r" % encoding) @support.cpython_only + @unittest.skipIf(_testcapi is None, 'need _testcapi module') def test_basics_capi(self): s = "abc123" # all codecs should be able to encode these for encoding in all_unicode_encodings: @@ -2601,6 +2676,8 @@ def test_escape_encode(self): check('\u20ac', br'\u20ac') check('\U0001d120', br'\U0001d120') + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_escape_decode(self): decode = codecs.unicode_escape_decode check = coding_checker(self, decode) @@ -2639,6 +2716,9 @@ def test_escape_decode(self): check(br"\9", "\\9") with self.assertWarns(DeprecationWarning): check(b"\\\xfa", "\\\xfa") + for i in range(0o400, 0o1000): + with self.assertWarns(DeprecationWarning): + check(rb'\%o' % i, chr(i)) def test_decode_errors(self): decode = codecs.unicode_escape_decode @@ -3037,29 +3117,26 @@ def test_binary_to_text_denylists_text_transforms(self): bad_input.decode("rot_13") self.assertIsNone(failure.exception.__cause__) - # TODO: RUSTPYTHON + + # @unittest.skipUnless(zlib, "Requires zlib support") + # TODO: RUSTPYTHON, ^ restore once test passes @unittest.expectedFailure - @unittest.skipUnless(zlib, "Requires zlib support") - def test_custom_zlib_error_is_wrapped(self): + def test_custom_zlib_error_is_noted(self): # Check zlib codec gives a good error for malformed input - msg = "^decoding with 'zlib_codec' codec failed" - with self.assertRaisesRegex(Exception, msg) as failure: + msg = "decoding with 'zlib_codec' codec failed" + with self.assertRaises(zlib.error) as failure: codecs.decode(b"hello", "zlib_codec") - self.assertIsInstance(failure.exception.__cause__, - type(failure.exception)) + self.assertEqual(msg, failure.exception.__notes__[0]) - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_custom_hex_error_is_wrapped(self): + # TODO: RUSTPYTHON - AttributeError: 'Error' object has no attribute '__notes__' + @unittest.expectedFailure + def test_custom_hex_error_is_noted(self): # Check hex codec gives a good error for malformed input - msg = "^decoding with 'hex_codec' codec failed" - with self.assertRaisesRegex(Exception, msg) as failure: + import binascii + msg = "decoding with 'hex_codec' codec failed" + with self.assertRaises(binascii.Error) as failure: codecs.decode(b"hello", "hex_codec") - self.assertIsInstance(failure.exception.__cause__, - type(failure.exception)) - - # Unfortunately, the bz2 module throws OSError, which the codec - # machinery currently can't wrap :( + self.assertEqual(msg, failure.exception.__notes__[0]) # Ensure codec aliases from http://bugs.python.org/issue7475 work def test_aliases(self): @@ -3083,11 +3160,8 @@ def test_uu_invalid(self): self.assertRaises(ValueError, codecs.decode, b"", "uu-codec") -# The codec system tries to wrap exceptions in order to ensure the error -# mentions the operation being performed and the codec involved. We -# currently *only* want this to happen for relatively stateless -# exceptions, where the only significant information they contain is their -# type and a single str argument. +# The codec system tries to add notes to exceptions in order to ensure +# the error mentions the operation being performed and the codec involved. # Use a local codec registry to avoid appearing to leak objects when # registering multiple search functions @@ -3097,10 +3171,10 @@ def _get_test_codec(codec_name): return _TEST_CODECS.get(codec_name) -class ExceptionChainingTest(unittest.TestCase): +class ExceptionNotesTest(unittest.TestCase): def setUp(self): - self.codec_name = 'exception_chaining_test' + self.codec_name = 'exception_notes_test' codecs.register(_get_test_codec) self.addCleanup(codecs.unregister, _get_test_codec) @@ -3124,105 +3198,98 @@ def set_codec(self, encode, decode): _TEST_CODECS[self.codec_name] = codec_info @contextlib.contextmanager - def assertWrapped(self, operation, exc_type, msg): - full_msg = r"{} with {!r} codec failed \({}: {}\)".format( - operation, self.codec_name, exc_type.__name__, msg) - with self.assertRaisesRegex(exc_type, full_msg) as caught: + def assertNoted(self, operation, exc_type, msg): + full_msg = r"{} with {!r} codec failed".format( + operation, self.codec_name) + with self.assertRaises(exc_type) as caught: yield caught - self.assertIsInstance(caught.exception.__cause__, exc_type) - self.assertIsNotNone(caught.exception.__cause__.__traceback__) + self.assertIn(full_msg, caught.exception.__notes__[0]) + caught.exception.__notes__.clear() def raise_obj(self, *args, **kwds): # Helper to dynamically change the object raised by a test codec raise self.obj_to_raise - - def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError): + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def check_note(self, obj_to_raise, msg, exc_type=RuntimeError): self.obj_to_raise = obj_to_raise self.set_codec(self.raise_obj, self.raise_obj) - with self.assertWrapped("encoding", exc_type, msg): + with self.assertNoted("encoding", exc_type, msg): "str_input".encode(self.codec_name) - with self.assertWrapped("encoding", exc_type, msg): + with self.assertNoted("encoding", exc_type, msg): codecs.encode("str_input", self.codec_name) - with self.assertWrapped("decoding", exc_type, msg): + with self.assertNoted("decoding", exc_type, msg): b"bytes input".decode(self.codec_name) - with self.assertWrapped("decoding", exc_type, msg): + with self.assertNoted("decoding", exc_type, msg): codecs.decode(b"bytes input", self.codec_name) - + # TODO: RUSTPYTHON @unittest.expectedFailure def test_raise_by_type(self): - self.check_wrapped(RuntimeError, "") - + self.check_note(RuntimeError, "") + # TODO: RUSTPYTHON @unittest.expectedFailure def test_raise_by_value(self): - msg = "This should be wrapped" - self.check_wrapped(RuntimeError(msg), msg) - + msg = "This should be noted" + self.check_note(RuntimeError(msg), msg) + # TODO: RUSTPYTHON @unittest.expectedFailure def test_raise_grandchild_subclass_exact_size(self): - msg = "This should be wrapped" + msg = "This should be noted" class MyRuntimeError(RuntimeError): __slots__ = () - self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError) + self.check_note(MyRuntimeError(msg), msg, MyRuntimeError) # TODO: RUSTPYTHON @unittest.expectedFailure def test_raise_subclass_with_weakref_support(self): - msg = "This should be wrapped" + msg = "This should be noted" class MyRuntimeError(RuntimeError): pass - self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError) + self.check_note(MyRuntimeError(msg), msg, MyRuntimeError) - def check_not_wrapped(self, obj_to_raise, msg): - def raise_obj(*args, **kwds): - raise obj_to_raise - self.set_codec(raise_obj, raise_obj) - with self.assertRaisesRegex(RuntimeError, msg): - "str input".encode(self.codec_name) - with self.assertRaisesRegex(RuntimeError, msg): - codecs.encode("str input", self.codec_name) - with self.assertRaisesRegex(RuntimeError, msg): - b"bytes input".decode(self.codec_name) - with self.assertRaisesRegex(RuntimeError, msg): - codecs.decode(b"bytes input", self.codec_name) - - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") - def test_init_override_is_not_wrapped(self): + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_init_override(self): class CustomInit(RuntimeError): def __init__(self): pass - self.check_not_wrapped(CustomInit, "") - - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") - def test_new_override_is_not_wrapped(self): + self.check_note(CustomInit, "") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_new_override(self): class CustomNew(RuntimeError): def __new__(cls): return super().__new__(cls) - self.check_not_wrapped(CustomNew, "") + self.check_note(CustomNew, "") - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") - def test_instance_attribute_is_not_wrapped(self): - msg = "This should NOT be wrapped" + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_instance_attribute(self): + msg = "This should be noted" exc = RuntimeError(msg) exc.attr = 1 - self.check_not_wrapped(exc, "^{}$".format(msg)) + self.check_note(exc, "^{}$".format(msg)) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") - def test_non_str_arg_is_not_wrapped(self): - self.check_not_wrapped(RuntimeError(1), "1") - - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") - def test_multiple_args_is_not_wrapped(self): + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_non_str_arg(self): + self.check_note(RuntimeError(1), "1") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_multiple_args(self): msg_re = r"^\('a', 'b', 'c'\)$" - self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re) + self.check_note(RuntimeError('a', 'b', 'c'), msg_re) - @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") # http://bugs.python.org/issue19609 - def test_codec_lookup_failure_not_wrapped(self): + @unittest.expectedFailureIfWindows("TODO: RUSTPYTHON") + def test_codec_lookup_failure(self): msg = "^unknown encoding: {}$".format(self.codec_name) - # The initial codec lookup should not be wrapped with self.assertRaisesRegex(LookupError, msg): "str input".encode(self.codec_name) with self.assertRaisesRegex(LookupError, msg): @@ -3232,7 +3299,7 @@ def test_codec_lookup_failure_not_wrapped(self): with self.assertRaisesRegex(LookupError, msg): codecs.decode(b"bytes input", self.codec_name) - # TODO: RUSTPYTHON + @unittest.expectedFailure def test_unflagged_non_text_codec_handling(self): # The stdlib non-text codecs are now marked so they're @@ -3461,9 +3528,14 @@ def test_incremental(self): def test_mbcs_alias(self): # Check that looking up our 'default' codepage will return # mbcs when we don't have a more specific one available - with mock.patch('_winapi.GetACP', return_value=123): - codec = codecs.lookup('cp123') - self.assertEqual(codec.name, 'mbcs') + code_page = 99_999 + name = f'cp{code_page}' + with mock.patch('_winapi.GetACP', return_value=code_page): + try: + codec = codecs.lookup(name) + self.assertEqual(codec.name, 'mbcs') + finally: + codecs.unregister(name) @support.bigmemtest(size=2**31, memuse=7, dry_run=False) def test_large_input(self, size): @@ -3624,9 +3696,31 @@ def test_seeking_write(self): self.assertEqual(sr.readline(), b'1\n') self.assertEqual(sr.readline(), b'abc\n') self.assertEqual(sr.readline(), b'789\n') + + def test_copy(self): + bio = io.BytesIO() + codec = codecs.lookup('ascii') + sr = codecs.StreamRecoder(bio, codec.encode, codec.decode, + encodings.ascii.StreamReader, encodings.ascii.StreamWriter) + + with self.assertRaisesRegex(TypeError, 'StreamRecoder'): + copy.copy(sr) + with self.assertRaisesRegex(TypeError, 'StreamRecoder'): + copy.deepcopy(sr) + + def test_pickle(self): + q = Queue(b'') + codec = codecs.lookup('ascii') + sr = codecs.StreamRecoder(q, codec.encode, codec.decode, + encodings.ascii.StreamReader, encodings.ascii.StreamWriter) + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + with self.assertRaisesRegex(TypeError, 'StreamRecoder'): + pickle.dumps(sr, proto) -@unittest.skipIf(_testcapi is None, 'need _testcapi module') +@unittest.skipIf(_testinternalcapi is None, 'need _testinternalcapi module') class LocaleCodecTest(unittest.TestCase): """ Test indirectly _Py_DecodeUTF8Ex() and _Py_EncodeUTF8Ex(). @@ -3640,7 +3734,7 @@ class LocaleCodecTest(unittest.TestCase): SURROGATES = "\uDC80\uDCFF" def encode(self, text, errors="strict"): - return _testcapi.EncodeLocaleEx(text, 0, errors) + return _testinternalcapi.EncodeLocaleEx(text, 0, errors) def check_encode_strings(self, errors): for text in self.STRINGS: @@ -3680,7 +3774,7 @@ def test_encode_unsupported_error_handler(self): self.assertEqual(str(cm.exception), 'unsupported error handler') def decode(self, encoded, errors="strict"): - return _testcapi.DecodeLocaleEx(encoded, 0, errors) + return _testinternalcapi.DecodeLocaleEx(encoded, 0, errors) def check_decode_strings(self, errors): is_utf8 = (self.ENCODING == "utf-8") @@ -3767,9 +3861,10 @@ class Rot13UtilTest(unittest.TestCase): $ echo "Hello World" | python -m encodings.rot_13 """ def test_rot13_func(self): + from encodings.rot_13 import rot13 infile = io.StringIO('Gb or, be abg gb or, gung vf gur dhrfgvba') outfile = io.StringIO() - encodings.rot_13.rot13(infile, outfile) + rot13(infile, outfile) outfile.seek(0) plain_text = outfile.read() self.assertEqual(