|
1 | 1 | import unittest
|
| 2 | +from unittest import mock |
2 | 3 | from test import support
|
3 | 4 | from test.support import (
|
4 | 5 | is_apple, os_helper, refleak_helper, socket_helper, threading_helper
|
@@ -179,6 +180,17 @@ def _have_socket_bluetooth():
|
179 | 180 | return True
|
180 | 181 |
|
181 | 182 |
|
| 183 | +def _have_socket_bluetooth_l2cap(): |
| 184 | + """Check whether BTPROTO_L2CAP sockets are supported on this host.""" |
| 185 | + try: |
| 186 | + s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) |
| 187 | + except (AttributeError, OSError): |
| 188 | + return False |
| 189 | + else: |
| 190 | + s.close() |
| 191 | + return True |
| 192 | + |
| 193 | + |
182 | 194 | def _have_socket_hyperv():
|
183 | 195 | """Check whether AF_HYPERV sockets are supported on this host."""
|
184 | 196 | try:
|
@@ -239,6 +251,8 @@ def downgrade_malformed_data_warning():
|
239 | 251 |
|
240 | 252 | HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
|
241 | 253 |
|
| 254 | +HAVE_SOCKET_BLUETOOTH_L2CAP = _have_socket_bluetooth_l2cap() |
| 255 | + |
242 | 256 | HAVE_SOCKET_HYPERV = _have_socket_hyperv()
|
243 | 257 |
|
244 | 258 | # Size in bytes of the int type
|
@@ -2622,6 +2636,116 @@ def testCreateScoSocket(self):
|
2622 | 2636 | with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
2623 | 2637 | pass
|
2624 | 2638 |
|
| 2639 | + @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test') |
| 2640 | + def testBindBrEdrL2capSocket(self): |
| 2641 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f: |
| 2642 | + # First user PSM in BR/EDR L2CAP |
| 2643 | + psm = 0x1001 |
| 2644 | + f.bind((socket.BDADDR_ANY, psm)) |
| 2645 | + addr = f.getsockname() |
| 2646 | + self.assertEqual(addr, (socket.BDADDR_ANY, psm)) |
| 2647 | + |
| 2648 | + @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test') |
| 2649 | + def testBadL2capAddr(self): |
| 2650 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f: |
| 2651 | + with self.assertRaises(OSError): |
| 2652 | + f.bind((socket.BDADDR_ANY, 0, 0)) |
| 2653 | + with self.assertRaises(OSError): |
| 2654 | + f.bind((socket.BDADDR_ANY,)) |
| 2655 | + with self.assertRaises(OSError): |
| 2656 | + f.bind(socket.BDADDR_ANY) |
| 2657 | + with self.assertRaises(OSError): |
| 2658 | + f.bind((socket.BDADDR_ANY.encode(), 0x1001)) |
| 2659 | + |
| 2660 | + def testBindRfcommSocket(self): |
| 2661 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2662 | + channel = 0 |
| 2663 | + try: |
| 2664 | + s.bind((socket.BDADDR_ANY, channel)) |
| 2665 | + except OSError as err: |
| 2666 | + if sys.platform == 'win32' and err.winerror == 10050: |
| 2667 | + self.skipTest(str(err)) |
| 2668 | + raise |
| 2669 | + addr = s.getsockname() |
| 2670 | + self.assertEqual(addr, (mock.ANY, channel)) |
| 2671 | + self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}') |
| 2672 | + if sys.platform != 'win32': |
| 2673 | + self.assertEqual(addr, (socket.BDADDR_ANY, channel)) |
| 2674 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2675 | + s.bind(addr) |
| 2676 | + addr2 = s.getsockname() |
| 2677 | + self.assertEqual(addr2, addr) |
| 2678 | + |
| 2679 | + def testBadRfcommAddr(self): |
| 2680 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| 2681 | + channel = 0 |
| 2682 | + with self.assertRaises(OSError): |
| 2683 | + s.bind((socket.BDADDR_ANY.encode(), channel)) |
| 2684 | + with self.assertRaises(OSError): |
| 2685 | + s.bind((socket.BDADDR_ANY,)) |
| 2686 | + with self.assertRaises(OSError): |
| 2687 | + s.bind((socket.BDADDR_ANY, channel, 0)) |
| 2688 | + with self.assertRaises(OSError): |
| 2689 | + s.bind((socket.BDADDR_ANY + '\0', channel)) |
| 2690 | + with self.assertRaises(OSError): |
| 2691 | + s.bind(('invalid', channel)) |
| 2692 | + |
| 2693 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test') |
| 2694 | + def testBindHciSocket(self): |
| 2695 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: |
| 2696 | + if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')): |
| 2697 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2698 | + addr = s.getsockname() |
| 2699 | + self.assertEqual(addr, socket.BDADDR_ANY) |
| 2700 | + else: |
| 2701 | + dev = 0 |
| 2702 | + s.bind((dev,)) |
| 2703 | + addr = s.getsockname() |
| 2704 | + self.assertEqual(addr, dev) |
| 2705 | + |
| 2706 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test') |
| 2707 | + def testBadHciAddr(self): |
| 2708 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: |
| 2709 | + if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')): |
| 2710 | + with self.assertRaises(OSError): |
| 2711 | + s.bind(socket.BDADDR_ANY) |
| 2712 | + with self.assertRaises(OSError): |
| 2713 | + s.bind((socket.BDADDR_ANY.encode(),)) |
| 2714 | + if sys.platform.startswith('freebsd'): |
| 2715 | + with self.assertRaises(ValueError): |
| 2716 | + s.bind(socket.BDADDR_ANY.encode() + b'\0') |
| 2717 | + with self.assertRaises(ValueError): |
| 2718 | + s.bind(socket.BDADDR_ANY.encode() + b' '*100) |
| 2719 | + with self.assertRaises(OSError): |
| 2720 | + s.bind(b'invalid') |
| 2721 | + else: |
| 2722 | + dev = 0 |
| 2723 | + with self.assertRaises(OSError): |
| 2724 | + s.bind(()) |
| 2725 | + with self.assertRaises(OSError): |
| 2726 | + s.bind((dev, 0)) |
| 2727 | + with self.assertRaises(OSError): |
| 2728 | + s.bind(dev) |
| 2729 | + with self.assertRaises(OSError): |
| 2730 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2731 | + |
| 2732 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test') |
| 2733 | + def testBindScoSocket(self): |
| 2734 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: |
| 2735 | + s.bind(socket.BDADDR_ANY.encode()) |
| 2736 | + addr = s.getsockname() |
| 2737 | + self.assertEqual(addr, socket.BDADDR_ANY) |
| 2738 | + |
| 2739 | + @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test') |
| 2740 | + def testBadScoAddr(self): |
| 2741 | + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: |
| 2742 | + with self.assertRaises(OSError): |
| 2743 | + s.bind(socket.BDADDR_ANY) |
| 2744 | + with self.assertRaises(OSError): |
| 2745 | + s.bind((socket.BDADDR_ANY.encode(),)) |
| 2746 | + with self.assertRaises(OSError): |
| 2747 | + s.bind(b'invalid') |
| 2748 | + |
2625 | 2749 |
|
2626 | 2750 | @unittest.skipUnless(HAVE_SOCKET_HYPERV,
|
2627 | 2751 | 'Hyper-V sockets required for this test.')
|
|
0 commit comments