|
23 | 23 |
|
24 | 24 | import unittest
|
25 | 25 | import os
|
| 26 | +import stat |
26 | 27 | from binascii import hexlify
|
27 | 28 | from hashlib import md5
|
28 | 29 |
|
|
36 | 37 | SSHException,
|
37 | 38 | )
|
38 | 39 | from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
|
| 40 | +from paramiko.common import o600 |
39 | 41 |
|
40 | 42 | from cryptography.exceptions import UnsupportedAlgorithm
|
41 | 43 | from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
|
42 |
| -from mock import patch |
| 44 | +from mock import patch, Mock |
43 | 45 | import pytest
|
44 | 46 |
|
45 | 47 | from .util import _support, is_low_entropy
|
@@ -696,3 +698,57 @@ def test_certificates(self):
|
696 | 698 | key1.load_certificate,
|
697 | 699 | _support("test_rsa.key-cert.pub"),
|
698 | 700 | )
|
| 701 | + |
| 702 | + @patch("paramiko.pkey.os") |
| 703 | + def _test_keyfile_race(self, os_, exists): |
| 704 | + # Re: CVE-2022-24302 |
| 705 | + password = "television" |
| 706 | + newpassword = "radio" |
| 707 | + source = _support("test_ecdsa_384.key") |
| 708 | + new = source + ".new" |
| 709 | + # Mock setup |
| 710 | + os_.path.exists.return_value = exists |
| 711 | + # Attach os flag values to mock |
| 712 | + for attr, value in vars(os).items(): |
| 713 | + if attr.startswith("O_"): |
| 714 | + setattr(os_, attr, value) |
| 715 | + # Load fixture key |
| 716 | + key = ECDSAKey(filename=source, password=password) |
| 717 | + key._write_private_key = Mock() |
| 718 | + # Write out in new location |
| 719 | + key.write_private_key_file(new, password=newpassword) |
| 720 | + # Expected open via os module |
| 721 | + os_.open.assert_called_once_with(new, flags=os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=o600) |
| 722 | + os_.fdopen.assert_called_once_with(os_.open.return_value, mode="w") |
| 723 | + # Old chmod still around for backwards compat |
| 724 | + os_.chmod.assert_called_once_with(new, o600) |
| 725 | + assert ( |
| 726 | + key._write_private_key.call_args[0][0] |
| 727 | + == os_.fdopen.return_value.__enter__.return_value |
| 728 | + ) |
| 729 | + |
| 730 | + def test_new_keyfiles_avoid_file_descriptor_race_on_chmod(self): |
| 731 | + self._test_keyfile_race(exists=False) |
| 732 | + |
| 733 | + def test_existing_keyfiles_still_work_ok(self): |
| 734 | + self._test_keyfile_race(exists=True) |
| 735 | + |
| 736 | + def test_new_keyfiles_avoid_descriptor_race_integration(self): |
| 737 | + # Integration-style version of above |
| 738 | + password = "television" |
| 739 | + newpassword = "radio" |
| 740 | + source = _support("test_ecdsa_384.key") |
| 741 | + new = source + ".new" |
| 742 | + # Load fixture key |
| 743 | + key = ECDSAKey(filename=source, password=password) |
| 744 | + try: |
| 745 | + # Write out in new location |
| 746 | + key.write_private_key_file(new, password=newpassword) |
| 747 | + # Test mode |
| 748 | + assert stat.S_IMODE(os.stat(new).st_mode) == o600 |
| 749 | + # Prove can open with new password |
| 750 | + reloaded = ECDSAKey(filename=new, password=newpassword) |
| 751 | + assert reloaded == key |
| 752 | + finally: |
| 753 | + if os.path.exists(new): |
| 754 | + os.unlink(new) |
0 commit comments