8000 bpo-30987 - Support for ISO-TP protocol in SocketCAN by pylessard · Pull Request #2956 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-30987 - Support for ISO-TP protocol in SocketCAN #2956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 28, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added unit tests for CAN ISOTP
  • Loading branch information
pylessard committed Jul 31, 2017
commit ccd65b8cccd9096c006e10aa6e0dfcac7d9c2fbd
232 changes: 232 additions & 0 deletions Lib/test/test_socket.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def _have_socket_can():
s.close()
return True

def _have_socket_can_isotp():
"""Check whether CAN ISOTP sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
except (AttributeError, OSError):
return False
else:
s.close()
return True

def _have_socket_rds():
"""Check whether RDS sockets are supported on this host."""
try:
Expand All @@ -77,6 +87,8 @@ def _have_socket_alg():

HAVE_SOCKET_CAN = _have_socket_can()

HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()

HAVE_SOCKET_RDS = _have_socket_rds()

HAVE_SOCKET_ALG = _have_socket_alg()
Expand Down Expand Up @@ -353,6 +365,48 @@ def clientTearDown(self):
self.cli = None
ThreadableTest.clientTearDown(self)

class ThreadableISOTPTest(unittest.TestCase, ThreadableTest):
"""To be able to run this test, a `vcan0` CAN interface can be created with
the following commands:
# modprobe vcan
# ip link add dev vcan0 type vcan
# ifconfig vcan0 up
"""
interface = 'vcan0'
addr_offset=1
address_lock = threading.Lock()
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self,*args, **kwargs)
ThreadableTest.__init__(self)
(self.cli_addr, self.srv_addr) = ThreadableISOTPTest.getAddressPair()

@classmethod
def getAddressPair(cls):
cls.address_lock.acquire_lock()
latched_offset = cls.addr_offset
cls.addr_offset += 2
if cls.addr_offset > 0x7FE: # 11 bit standard CAN identifier (ISO-11898-2)
cls.addr_offset = 1
cls.address_lock.release_lock()
return (latched_offset, latched_offset+1)

def clientSetUp(self):
try:
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
if 'cli' in self.opts:
for opt_const in self.opts['cli']:
self.serv.setsockopt(socket.SOL_CAN_ISOTP, opt_const, self.opts['cli'][opt_const])
self.cli.bind((self.interface, self.srv_addr, self.cli_addr))
except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass

def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)

class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):

def __init__(self, methodName='runTest'):
Expand Down Expand Up @@ -1709,6 +1763,184 @@ def testBCM(self):
self.assertEqual(bytes_sent, len(header_plus_frame))


@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
class BasicISOTPTest(unittest.TestCase):

def testCrucialConstants(self):
socket.AF_CAN
socket.PF_CAN
socket.CAN_ISOTP
socket.SOCK_DGRAM

@unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
'socket.CAN_ISOTP required for this test.')
def testISOTPConstants(self):
socket.CAN_ISOTP

socket.SOL_CAN_ISOTP # Socket Option Level for ISOTP protocol (AF_CAN + 6)
socket.CAN_ISOTP_OPTS # sets struct can_isotp_options within the driver.
socket.CAN_ISOTP_RECV_FC # set flow control options for receiver
socket.CAN_ISOTP_TX_STMIN # override sepration time received in flow control frame
socket.CAN_ISOTP_RX_STMIN # force to ignore messages received in an interval smaller than this value
socket.CAN_ISOTP_LL_OPTS # sets struct can_isotp_ll_options within the driver

# Options flags
socket.CAN_ISOTP_LISTEN_MODE # listen only (do not send FC)
socket.CAN_ISOTP_EXTEND_ADDR # enable extended addressing
socket.CAN_ISOTP_TX_PADDING # enable CAN frame padding tx path
socket.CAN_ISOTP_RX_PADDING # enable CAN frame padding rx path
socket.CAN_ISOTP_CHK_PAD_LEN # check received CAN frame padding
socket.CAN_ISOTP_CHK_PAD_DATA # check received CAN frame padding
socket.CAN_ISOTP_HALF_DUPLEX # half duplex error state handling
socket.CAN_ISOTP_FORCE_TXSTMIN # ignore stmin from received FC
socket.CAN_ISOTP_FORCE_RXSTMIN # ignore CFs depending on rx stmin
socket.CAN_ISOTP_RX_EXT_ADDR # different rx extended addressing


def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass

@unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
'socket.CAN_ISOTP required for this test.')
def testCreateISOTPSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
pass

def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,1,2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use

with self.assertRaisesRegex(OSError, 'interface name too long'):
    s.bind(('x' * 1024, 1, 2))

to make it more readable.


@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_OPTS"),
'socket.CAN_ISOTP_OPTS is required for this test')
def testSetOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
flags = 0x01234567
frame_txtime = 0x89ABCDEF
ext_address = 0x11
txpad_content = 0x21
rxpad_content = 0x31
rx_ext_address = ext_address
opts = struct.pack("=LLBBBB",flags, frame_txtime, ext_address, txpad_content, rxpad_content, rx_ext_address)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_OPTS, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_OPTS, len(opts))
self.assertEqual(readopts,opts)

@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_RECV_FC"),
'socket.CAN_ISOTP_RECV_FC is required for this test')
def testSetFCOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
bs = 0x12
stmin = 0x22
wftmax = 0x32
opts = struct.pack("=BBB",bs, stmin, wftmax)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_RECV_FC, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_RECV_FC, len(opts))
self.assertEqual(readopts,opts)

@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_LL_OPTS"),
'socket.CAN_ISOTP_LL_OPTS is required for this test')
def testSetLLOpts(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
mtu = 16
tx_dl = 8
tx_flags = 0x13
opts = struct.pack("=BBB",mtu, tx_dl, tx_flags)
s.setsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_LL_OPTS, opts)
readopts = s.getsockopt(socket.SOL_CAN_ISOTP, socket.CAN_ISOTP_LL_OPTS, len(opts))
self.assertEqual(readopts,opts)


@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
class ISOTPTest(ThreadableISOTPTest):

test_opts = {
'testTiming' : {
'cli' : {
socket.CAN_ISOTP_RECV_FC : struct.pack("=BBB",10, 0x7F, 10) # bs, stmin, wftmax
},
'serv' : {

}
}
}

def __init__(self, methodName='runTest', *args, **kwargs):
ThreadableISOTPTest.__init__(self,methodName, *args, **kwargs)
if methodName in ISOTPTest.test_opts:
self.opts = ISOTPTest.test_opts[methodName]
else:
self.opts = {}

def setUp(self, *args, **kwargs):
self.serv = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
try:
if 'serv' in self.opts:
for opt_const in self.opts['serv']:
self.serv.setsockopt(socket.SOL_CAN_ISOTP, opt_const, self.opts['serv'][opt_const])
self.serv.bind((self.interface, self.cli_addr, self.srv_addr))
except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)

def tearDown(self):
self.serv.close()
self.serv=None

@classmethod
def makeRandomData(cls, n):
data = [random.randint(0,0xFF) for i in range(0,n)]
return struct.pack("B"*len(data), *data)

def _testFullFrame(self):
self.bindata = self.makeRandomData(4095)
self.cli.send(self.bindata)

def testFullFrame(self):
time.sleep(0.1)
bindata = self.serv.recv(len(self.bindata))
self.assertEqual(bindata, self.bindata)

def _testTiming(self):
self.bindata2 = self.makeRandomData(150)
self.tic = time.time()
self.cli.send(self.bindata2)

@unittest.skipUnless(hasattr(socket, "SOL_CAN_ISOTP"),
'socket.SOL_CAN_ISOTP is required for this test')
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP_RECV_FC"),
'socket.CAN_ISOTP_RECV_FC is required for this test')
def testTiming(self):
"""
The main goal of this test is to validate that the driver correctly receive options via setsockopt.
We are not trying to validate the proper functioning of the driver itself.
"""
time.sleep(0.1)
bindata2 = self.serv.recv(len(self.bindata2))
self.toc = time.time()
self.assertEqual(bindata2, self.bindata2)
measured_time = math.ceil(self.toc-self.tic)
min_time = math.floor(self.calcFrameCnt(len(self.bindata2))*0.127)
self.assertGreater(measured_time, min_time)

def calcFrameCnt(self, size):
if size < 1 or size > 4095:
raise ValueError('ISOTP frame size must be between 1 and 4095')

if (size < 8):
return 1
else:
return int(1 + math.ceil((float(size) - 6.0) / 7.0))


@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):

Expand Down
0