|
1 | 1 | import unittest
|
| 2 | +from unittest import mock |
2 | 3 | from test import support
|
3 | 4 | from test.support import (
|
4 | 5 | is_apple, os_helper, refleak_helper, socket_helper, threading_helper
|
@@ -2670,6 +2671,107 @@ def testBindBrEdrL2capSocket(self):
|
2670 | 2671 | addr = f.getsockname()
|
2671 | 2672 | self.assertEqual(addr, (socket.BDADDR_ANY, psm))
|
2672 | 2673 |
|
| 2674 | + @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test') |
| 2675 | + def testBadL2capAddr(self): |
| 2676 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f: |
| 2677 | + with self.assertRaises(OSError): |
| 2678 | + f.bind((socket.BDADDR_ANY, 0, 0, socket.BDADDR_BREDR, 0)) |
| 2679 | + with self.assertRaises(OSError): |
| 2680 | + f.bind((socket.BDADDR_ANY,)) |
| 2681 | + with self.assertRaises(OSError): |
| 2682 | + f.bind(socket.BDADDR_ANY) |
| 2683 | + with self.assertRaises(OSError): |
| 2684 | + f.bind((socket.BDADDR_ANY.encode(), 0x1001)) |
| 2685 | + |
| 2686 | + def testBindRfcommSocket(self): |
| 2687 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2688 | + channel = 0 |
| 2689 | + try: |
| 2690 | + s.bind((socket.BDADDR_ANY, channel)) |
| 2691 | + except OSError as err: |
| 2692 | + if sys.platform == 'win32' and err.winerror == 10050: |
| 2693 | + self.skipTest(str(err)) |
| 2694 | + raise |
| 2695 | + addr = s.getsockname() |
| 2696 | + self.assertEqual(addr, (mock.ANY, channel)) |
| 2697 | + self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}') |
| 2698 | + if sys.platform != 'win32': |
| 2699 | + self.assertEqual(addr, (socket.BDADDR_ANY, channel)) |
| 2700 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2701 | + s.bind(addr) |
| 2702 | + addr2 = s.getsockname() |
| 2703 | + self.assertEqual(addr2, addr) |
| 2704 | + |
| 2705 | + def testBadRfcommAddr(self): |
| 2706 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2707 | + channel = 0 |
| 2708 | + with self.assertRaises(OSError): |
| 2709 | + s.bind((socket.BDADDR_ANY.encode(), channel)) |
| 2710 | + with self.assertRaises(OSError): |
| 2711 | + s.bind((socket.BDADDR_ANY,)) |
| 2712 | + with self.assertRaises(OSError): |
| 2713 | + s.bind((socket.BDADDR_ANY, channel, 0)) |
| 2714 | + with self.assertRaises(OSError): |
| 2715 | + s.bind((socket.BDADDR_ANY + '\0', channel)) |
| 2716 | + with self.assertRaises(OSError): |
| 2717 | + s.bind(('invalid', channel)) |
| 2718 | + |
| 2719 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test') |
| 2720 | + def testBindHciSocket(self): |
| 2721 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: |
| 2722 | + if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')): |
| 2723 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2724 | + addr = s.getsockname() |
| 2725 | + self.assertEqual(addr, socket.BDADDR_ANY) |
| 2726 | + else: |
| 2727 | + dev = 0 |
| 2728 | + s.bind((dev,)) |
| 2729 | + addr = s.getsockname() |
| 2730 | + self.assertEqual(addr, dev) |
| 2731 | + |
| 2732 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test') |
| 2733 | + def testBadHciAddr(self): |
| 2734 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: |
| 2735 | + if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')): |
| 2736 | + with self.assertRaises(OSError): |
| 2737 | + s.bind(socket.BDADDR_ANY) |
| 2738 | + with self.assertRaises(OSError): |
| 2739 | + s.bind((socket.BDADDR_ANY.encode(),)) |
| 2740 | + if sys.platform.startswith('freebsd'): |
| 2741 | + with self.assertRaises(ValueError): |
| 2742 | + s.bind(socket.BDADDR_ANY.encode() + b'\0') |
| 2743 | + with self.assertRaises(ValueError): |
| 2744 | + s.bind(socket.BDADDR_ANY.encode() + b' '*100) |
| 2745 | + with self.assertRaises(OSError): |
| 2746 | + s.bind(b'invalid') |
| 2747 | + else: |
| 2748 | + dev = 0 |
| 2749 | + with self.assertRaises(OSError): |
| 2750 | + s.bind(()) |
| 2751 | + with self.assertRaises(OSError): |
| 2752 | + s.bind((dev, 0)) |
| 2753 | + with self.assertRaises(OSError): |
| 2754 | + s.bind(dev) |
| 2755 | + with self.assertRaises(OSError): |
| 2756 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2757 | + |
| 2758 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test') |
| 2759 | + def testBindScoSocket(self): |
| 2760 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: |
| 2761 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2762 | + addr = s.getsockname() |
| 2763 | + self.assertEqual(addr, socket.BDADDR_ANY) |
| 2764 | + |
| 2765 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test') |
| 2766 | + def testBadScoAddr(self): |
| 2767 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: |
| 2768 | + with self.assertRaises(OSError): |
| 2769 | + s.bind(socket.BDADDR_ANY) |
| 2770 | + with self.assertRaises(OSError): |
| 2771 | + s.bind((socket.BDADDR_ANY.encode(),)) |
| 2772 | + with self.assertRaises(OSError): |
| 2773 | + s.bind(b'invalid') |
| 2774 | + |
2673 | 2775 |
|
2674 | 2776 | @unittest.skipUnless(HAVE_SOCKET_HYPERV,
|
2675 | 2777 | 'Hyper-V sockets required for this test.')
|
|
0 commit comments