|
15 | 15 | except ImportError:
|
16 | 16 | ssl = None
|
17 | 17 |
|
| 18 | +from contextlib import closing |
18 | 19 | from unittest import TestCase, SkipTest, skipUnless
|
19 | 20 | from test import test_support
|
20 | 21 | from test.test_support import HOST, HOSTv6
|
@@ -67,6 +68,10 @@ def __init__(self, conn):
|
67 | 68 | self.rest = None
|
68 | 69 | self.next_retr_data = RETR_DATA
|
69 | 70 | self.push('220 welcome')
|
| 71 | + # We use this as the string IPv4 address to direct the client |
| 72 | + # to in response to a PASV command. To test security behavior. |
| 73 | + # https://bugs.python.org/issue43285/. |
| 74 | + self.fake_pasv_server_ip = '252.253.254.255' |
70 | 75 |
|
71 | 76 | def collect_incoming_data(self, data):
|
72 | 77 | self.in_buffer.append(data)
|
@@ -109,13 +114,13 @@ def cmd_pasv(self, arg):
|
109 | 114 | sock.bind((self.socket.getsockname()[0], 0))
|
110 | 115 | sock.listen(5)
|
111 | 116 | sock.settimeout(10)
|
112 |
| - ip, port = sock.getsockname()[:2] |
113 |
| - ip = ip.replace('.', ',') |
114 |
| - p1, p2 = divmod(port, 256) |
| 117 | + port = sock.getsockname()[1] |
| 118 | + ip = self.fake_pasv_server_ip |
| 119 | + ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256 |
115 | 120 | self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
|
116 | 121 | conn, addr = sock.accept()
|
117 | 122 | self.dtp = self.dtp_handler(conn, baseclass=self)
|
118 |
| - |
| 123 | + |
119 | 124 | def cmd_eprt(self, arg):
|
120 | 125 | af, ip, port = arg.split(arg[0])[1:-1]
|
121 | 126 | port = int(port)
|
@@ -577,6 +582,107 @@ def test_makepasv(self):
|
577 | 582 | # IPv4 is in use, just make sure send_epsv has not been used
|
578 | 583 | self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
|
579 | 584 |
|
| 585 | + def test_makepasv_issue43285_security_disabled(self): |
| 586 | + """Test the opt-in to the old vulnerable behavior.""" |
| 587 | + self.client.trust_server_pasv_ipv4_address = True |
| 588 | + bad_host, port = self.client.makepasv() |
| 589 | + self.assertEqual( |
| 590 | + bad_host, self.server.handler_instance.fake_pasv_server_ip) |
| 591 | + # Opening and closing a connection keeps the dummy server happy |
| 592 | + # instead of timing out on accept. |
| 593 | + socket.create_connection((self.client.sock.getpeername()[0], port), |
| 594 | + timeout=TIMEOUT).close() |
| 595 | + |
| 596 | + def test_makepasv_issue43285_security_enabled_default(self): |
| 597 | + self.assertFalse(self.client.trust_server_pasv_ipv4_address) |
| 598 | + trusted_host, port = self.client.makepasv() |
| 599 | + self.assertNotEqual( |
| 600 | + trusted_host, self.server.handler_instance.fake_pasv_server_ip) |
| 601 | + # Opening and closing a connection keeps the dummy server happy |
| 602 | + # instead of timing out on accept. |
| 603 | + socket.create_connection((trusted_host, port), timeout=TIMEOUT).close() |
| 604 | + |
| 605 | + def test_with_statement(self): |
| 606 | + self.client.quit() |
| 607 | + |
| 608 | + def is_client_connected(): |
| 609 | + if self.client.sock is None: |
| 610 | + return False |
| 611 | + try: |
| 612 | + self.client.sendcmd('noop') |
| 613 | + except (OSError, EOFError): |
| 614 | + return False |
| 615 | + return True |
| 616 | + |
| 617 | + # base test |
| 618 | + with ftplib.FTP(timeout=TIMEOUT) as self.client: |
| 619 | + self.client.connect(self.server.host, self.server.port) |
| 620 | + self.client.sendcmd('noop') |
| 621 | + self.assertTrue(is_client_connected()) |
| 622 | + self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') |
| 623 | + self.assertFalse(is_client_connected()) |
| 624 | + |
| 625 | + # QUIT sent inside the with block |
| 626 | + with ftplib.FTP(timeout=TIMEOUT) as self.client: |
| 627 | + self.client.connect(self.server.host, self.server.port) |
| 628 | + self.client.sendcmd('noop') |
| 629 | + self.client.quit() |
| 630 | + self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') |
| 631 | + self.assertFalse(is_client_connected()) |
| 632 | + |
| 633 | + # force a wrong response code to be sent on QUIT: error_perm |
| 634 | + # is expected and the connection is supposed to be closed |
| 635 | + try: |
| 636 | + with ftplib.FTP(timeout=TIMEOUT) as self.client: |
| 637 | + self.client.connect(self.server.host, self.server.port) |
| 638 | + self.client.sendcmd('noop') |
| 639 | + self.server.handler_instance.next_response = '550 error on quit' |
| 640 | + except ftplib.error_perm as err: |
| 641 | + self.assertEqual(str(err), '550 error on quit') |
| 642 | + else: |
| 643 | + self.fail('Exception not raised') |
| 644 | + # needed to give the threaded server some time to set the attribute |
| 645 | + # which otherwise would still be == 'noop' |
| 646 | + time.sleep(0.1) |
| 647 | + self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') |
| 648 | + self.assertFalse(is_client_connected()) |
| 649 | + |
| 650 | + def test_source_address(self): |
| 651 | + self.client.quit() |
| 652 | + port = test_support.find_unused_port() |
| 653 | + try: |
| 654 | + self.client.connect(self.server.host, self.server.port, |
| 655 | + source_address=(HOST, port)) |
| 656 | + self.assertEqual(self.client.sock.getsockname()[1], port) |
| 657 | + self.client.quit() |
| 658 | + except OSError as e: |
| 659 | + if e.errno == errno.EADDRINUSE: |
| 660 | + self.skipTest("couldn't bind to port %d" % port) |
| 661 | + raise |
| 662 | + |
| 663 | + def test_source_address_passive_connection(self): |
| 664 | + port = test_support.find_unused_port() |
| 665 | + self.client.source_address = (HOST, port) |
| 666 | + try: |
| 667 | + with closing(self.client.transfercmd('list')) as sock: |
| 668 | + self.assertEqual(sock.getsockname()[1], port) |
| 669 | + except OSError as e: |
| 670 | + if e.errno == errno.EADDRINUSE: |
| 671 | + self.skipTest("couldn't bind to port %d" % port) |
| 672 | + raise |
| 673 | + |
| 674 | + def test_parse257(self): |
| 675 | + self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') |
| 676 | + self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') |
| 677 | + self.assertEqual(ftplib.parse257('257 ""'), '') |
| 678 | + self.assertEqual(ftplib.parse257('257 "" created'), '') |
| 679 | + self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"') |
| 680 | + # The 257 response is supposed to include the directory |
| 681 | + # name and in case it contains embedded double-quotes |
| 682 | + # they must be doubled (see RFC-959, chapter 7, appendix 2). |
| 683 | + self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar') |
| 684 | + self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar') |
| 685 | + |
580 | 686 | def test_line_too_long(self):
|
581 | 687 | self.assertRaises(ftplib.Error, self.client.sendcmd,
|
582 | 688 | 'x' * self.client.maxline * 2)
|
|
0 commit comments