|
| 1 | +""" |
| 2 | +Interface to socketcand |
| 3 | +see https://github.com/linux-can/socketcand |
| 4 | +
|
| 5 | +Authors: Marvin Seiler, Gerrit Telkamp |
| 6 | +
|
| 7 | +Copyright (C) 2021 DOMOLOGIC GmbH |
| 8 | +http://www.domologic.de |
| 9 | +""" |
| 10 | +import can |
| 11 | +import socket |
| 12 | +import select |
| 13 | +import logging |
| 14 | +import time |
| 15 | +import traceback |
| 16 | +from collections import deque |
| 17 | + |
| 18 | +log = logging.getLogger(__name__) |
| 19 | + |
| 20 | + |
| 21 | +def convert_ascii_message_to_can_message(ascii_msg: str) -> can.Message: |
| 22 | + if not ascii_msg.startswith("< frame ") or not ascii_msg.endswith(" >"): |
| 23 | + log.warning(f"Could not parse ascii message: {ascii_msg}") |
| 24 | + return None |
| 25 | + else: |
| 26 | + # frame_string = ascii_msg.removeprefix("< frame ").removesuffix(" >") |
| 27 | + frame_string = ascii_msg[8:-2] |
| 28 | + parts = frame_string.split(" ", 3) |
| 29 | + can_id, timestamp = int(parts[0], 16), float(parts[1]) |
| 30 | + |
| 31 | + data = bytearray.fromhex(parts[2]) |
| 32 | + can_dlc = len(data) |
| 33 | + can_message = can.Message( |
| 34 | + timestamp=timestamp, arbitration_id=can_id, data=data, dlc=can_dlc |
| 35 | + ) |
| 36 | + return can_message |
| 37 | + |
| 38 | + |
| 39 | +def convert_can_message_to_ascii_message(can_message: can.Message) -> str: |
| 40 | + # Note: socketcan bus adds extended flag, remote_frame_flag & error_flag to id |
| 41 | + # not sure if that is necessary here |
| 42 | + can_id = can_message.arbitration_id |
| 43 | + # Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags |
| 44 | + data = can_message.data |
| 45 | + length = can_message.dlc |
| 46 | + bytes_string = " ".join("{:x}".format(x) for x in data[0:length]) |
| 47 | + return f"< send {can_id:X} {length:X} {bytes_string} >" |
| 48 | + |
| 49 | + |
| 50 | +def connect_to_server(s, host, port): |
| 51 | + timeout_ms = 10000 |
| 52 | + now = time.time() * 1000 |
| 53 | + end_time = now + timeout_ms |
| 54 | + while now < end_time: |
| 55 | + try: |
| 56 | + s.connect((host, port)) |
| 57 | + return |
| 58 | + except Exception as e: |
| 59 | + log.warning(f"Failed to connect to server: {type(e)} Message: {e}") |
| 60 | + now = time.time() * 1000 |
| 61 | + raise TimeoutError( |
| 62 | + f"connect_to_server: Failed to connect server for {timeout_ms} ms" |
| 63 | + ) |
| 64 | + |
| 65 | + |
| 66 | +class SocketCanDaemonBus(can.BusABC): |
| 67 | + def __init__(self, channel, host, port, can_filters=None, **kwargs): |
| 68 | + self.__host = host |
| 69 | + self.__port = port |
| 70 | + self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 71 | + self.__message_buffer = deque() |
| 72 | + self.__receive_buffer = "" # i know string is not the most efficient here |
| 73 | + connect_to_server(self.__socket, self.__host, self.__port) |
| 74 | + self._expect_msg("< hi >") |
| 75 | + |
| 76 | +
F438
log.info( |
| 77 | + f"SocketCanDaemonBus: connected with address {self.__socket.getsockname()}" |
| 78 | + ) |
| 79 | + self._tcp_send(f"< open {channel} >") |
| 80 | + self._expect_msg("< ok >") |
| 81 | + self._tcp_send(f"< rawmode >") |
| 82 | + self._expect_msg("< ok >") |
| 83 | + super().__init__(channel=channel, can_filters=can_filters) |
| 84 | + |
| 85 | + def _recv_internal(self, timeout): |
| 86 | + if len(self.__message_buffer) != 0: |
| 87 | + can_message = self.__message_buffer.popleft() |
| 88 | + return can_message, False |
| 89 | + |
| 90 | + try: |
| 91 | + # get all sockets that are ready (can be a list with a single value |
| 92 | + # being self.socket or an empty list if self.socket is not ready) |
| 93 | + ready_receive_sockets, _, _ = select.select( |
| 94 | + [self.__socket], [], [], timeout |
| 95 | + ) |
| 96 | + except socket.error as exc: |
| 97 | + # something bad happened (e.g. the interface went down) |
| 98 | + log.error(f"Failed to receive: {exc}") |
| 99 | + raise can.CanError(f"Failed to receive: {exc}") |
| 100 | + |
| 101 | + try: |
| 102 | + if not ready_receive_sockets: |
| 103 | + # socket wasn't readable or timeout occurred |
| 104 | + log.debug("Socket not ready") |
| 105 | + return None, False |
| 106 | + |
| 107 | + ascii_msg = self.__socket.recv(1024).decode( |
| 108 | + "ascii" |
| 109 | + ) # may contain multiple messages |
| 110 | + self.__receive_buffer += ascii_msg |
| 111 | + log.debug(f"Received Ascii Message: {ascii_msg}") |
| 112 | + buffer_view = self.__receive_buffer |
| 113 | + chars_processed_successfully = 0 |
| 114 | + while True: |
| 115 | + if len(buffer_view) == 0: |
| 116 | + break |
| 117 | + |
| 118 | + start = buffer_view.find("<") |
| 119 | + if start == -1: |
| 120 | + log.warning( |
| 121 | + f"Bad data: No opening < found => discarding entire buffer '{buffer_view}'" |
| 122 | + ) |
| 123 | + chars_processed_successfully = len(self.__receive_buffer) |
| 124 | + break |
| 125 | + end = buffer_view.find(">") |
| 126 | + if end == -1: |
| 127 | + log.warning("Got incomplete message => waiting for more data") |
| 128 | + if len(buffer_view) > 200: |
| 129 | + log.warning( |
| 130 | + "Incomplete message exceeds 200 chars => Discarding" |
| 131 | + ) |
| 132 | + chars_processed_successfully = len(self.__receive_buffer) |
| 133 | + break |
| 134 | + chars_processed_successfully += end + 1 |
| 135 | + single_message = buffer_view[start : end + 1] |
| 136 | + parsed_can_message = convert_ascii_message_to_can_message( |
| 137 | + single_message |
| 138 | + ) |
| 139 | + if parsed_can_message is None: |
| 140 | + log.warning(f"Invalid Frame: {single_message}") |
| 141 | + else: |
| 142 | + self.__message_buffer.append(parsed_can_message) |
| 143 | + buffer_view = buffer_view[end + 1 :] |
| 144 | + |
| 145 | + self.__receive_buffer = self.__receive_buffer[ |
| 146 | + chars_processed_successfully + 1 : |
| 147 | + ] |
| 148 | + can_message = ( |
| 149 | + None |
| 150 | + if len(self.__message_buffer) == 0 |
| 151 | + else self.__message_buffer.popleft() |
| 152 | + ) |
| 153 | + return can_message, False |
| 154 | + |
| 155 | + except Exception as exc: |
| 156 | + log.error(f"Failed to receive: {exc} {traceback.format_exc()}") |
| 157 | + raise can.CanError(f"Failed to receive: {exc} {traceback.format_exc()}") |
| 158 | + |
| 159 | + def _tcp_send(self, msg: str): |
| 160 | + log.debug(f"Sending TCP Message: '{msg}'") |
| 161 | + self.__socket.sendall(msg.encode("ascii")) |
| 162 | + |
| 163 | + def _expect_msg(self, msg): |
| 164 | + ascii_msg = self.__socket.recv(256).decode("ascii") |
| 165 | + if not ascii_msg == msg: |
| 166 | + raise can.CanError(f"{msg} message expected!") |
| 167 | + |
| 168 | + def send(self, msg, timeout=None): |
| 169 | + ascii_msg = convert_can_message_to_ascii_message(msg) |
| 170 | + self._tcp_send(ascii_msg) |
| 171 | + |
| 172 | + def shutdown(self): |
| 173 | + self.stop_all_periodic_tasks() |
| 174 | + self.__socket.close() |
0 commit comments