11
11
import os
12
12
from datetime import datetime , timedelta , timezone
13
13
from enum import Enum
14
- from typing import Any , Callable , Dict , Generator , List , Optional , TextIO , Union
14
+ from typing import Any , Callable , Dict , Generator , Optional , TextIO , Tuple , Union
15
15
16
16
from ..message import Message
17
17
from ..typechecking import StringPathLike
18
- from ..util import channel2int , dlc2len , len2dlc
19
- from .generic import (
20
- TextIOMessageReader ,
21
- TextIOMessageWriter ,
22
- )
18
+ from ..util import channel2int , len2dlc
19
+ from .generic import TextIOMessageReader , TextIOMessageWriter
23
20
24
21
logger = logging .getLogger ("can.io.trc" )
25
22
@@ -58,13 +55,22 @@ def __init__(
58
55
"""
59
56
super ().__init__ (file , mode = "r" )
60
57
self .file_version = TRCFileVersion .UNKNOWN
61
- self .start_time : Optional [ datetime ] = None
58
+ self ._start_time : float = 0
62
59
self .columns : Dict [str , int ] = {}
60
+ self ._num_columns = - 1
63
61
64
62
if not self .file :
65
63
raise ValueError ("The given file cannot be None" )
66
64
67
- self ._parse_cols : Callable [[List [str ]], Optional [Message ]] = lambda x : None
65
+ self ._parse_cols : Callable [[Tuple [str , ...]], Optional [Message ]] = (
66
+ lambda x : None
67
+ )
68
+
69
+ @property
70
+ def start_time (self ) -> Optional [datetime ]:
71
+ if self ._start_time :
72
+ return datetime .fromtimestamp (self ._start_time , timezone .utc )
73
+ return None
68
74
69
75
def _extract_header (self ):
70
76
line = ""
@@ -89,16 +95,18 @@ def _extract_header(self):
89
95
elif line .startswith (";$STARTTIME" ):
90
96
logger .debug ("TRCReader: Found start time '%s'" , line )
91
97
try :
92
- self .start_time = datetime (
93
- 1899 , 12 , 30 , tzinfo = timezone .utc
94
- ) + timedelta (days = float (line .split ("=" )[1 ]))
98
+ self ._start_time = (
99
+ datetime (1899 , 12 , 30 , tzinfo = timezone .utc )
100
+ + timedelta (days = float (line .split ("=" )[1 ]))
101
+ ).timestamp ()
95
102
except IndexError :
96
103
logger .debug ("TRCReader: Failed to parse start time" )
97
104
elif line .startswith (";$COLUMNS" ):
98
105
logger .debug ("TRCReader: Found columns '%s'" , line )
99
106
try :
100
107
columns = line .split ("=" )[1 ].split ("," )
101
108
self .columns = {column : columns .index (column ) for column in columns }
109
+ self ._num_columns = len (columns ) - 1
102
110
except IndexError :
103
111
logger .debug ("TRCReader: Failed to parse columns" )
104
112
elif line .startswith (";" ):
@@ -107,7 +115,7 @@ def _extract_header(self):
107
115
break
108
116
109
117
if self .file_version >= TRCFileVersion .V1_1 :
110
- if self .start_time is None :
118
+ if self ._start_time is None :
111
119
raise ValueError ("File has no start time information" )
112
120
113
121
if self .file_version >= TRCFileVersion .V2_0 :
@@ -132,7 +140,7 @@ def _extract_header(self):
132
140
133
141
return line
134
142
135
- def _parse_msg_v1_0 (self , cols : List [str ]) -> Optional [Message ]:
143
+ def _parse_msg_v1_0 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
136
144
arbit_id = cols [2 ]
137
145
if arbit_id == "FFFFFFFF" :
138
146
logger .info ("TRCReader: Dropping bus info line" )
@@ -147,16 +155,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
147
155
msg .data = bytearray ([int (cols [i + 4 ], 16 ) for i in range (msg .dlc )])
148
156
return msg
149
157
150
- def _parse_msg_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
158
+ def _parse_msg_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
151
159
arbit_id = cols [3 ]
152
160
153
161
msg = Message ()
154
- if isinstance (self .start_time , datetime ):
155
- msg .timestamp = (
156
- self .start_time + timedelta (milliseconds = float (cols [1 ]))
157
- ).timestamp ()
158
- else :
159
- msg .timestamp = float (cols [1 ]) / 1000
162
+ msg .timestamp = float (cols [1 ]) / 1000 + self ._start_time
160
163
msg .arbitration_id = int (arbit_id , 16 )
161
164
msg .is_extended_id = len (arbit_id ) > 4
162
165
msg .channel = 1
@@ -165,16 +168,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
165
168
msg .is_rx = cols [2 ] == "Rx"
166
169
return msg
167
170
168
- def _parse_msg_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
171
+ def _parse_msg_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
169
172
arbit_id = cols [4 ]
170
173
171
174
msg = Message ()
172
- if isinstance (self .start_time , datetime ):
173
- msg .timestamp = (
174
- self .start_time + timedelta (milliseconds = float (cols [1 ]))
175
- ).timestamp ()
176
- else :
177
- msg .timestamp = float (cols [1 ]) / 1000
175
+ msg .timestamp = float (cols [1 ]) / 1000 + self ._start_time
178
176
msg .arbitration_id = int (arbit_id , 16 )
179
177
msg .is_extended_id = len (arbit_id ) > 4
180
178
msg .channel = int (cols [2 ])
@@ -183,7 +181,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
183
181
msg .is_rx = cols [3 ] == "Rx"
184
182
return msg
185
183
186
- def _parse_msg_v2_x (self , cols : List [str ]) -> Optional [Message ]:
184
+ def _parse_msg_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
187
185
type_ = cols [self .columns ["T" ]]
188
186
bus = self .columns .get ("B" , None )
189
187
@@ -192,50 +190,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
192
190
dlc = len2dlc (length )
193
191
elif "L" in self .columns :
194
192
dlc = int (cols [self .columns ["L" ]])
195
- length = dlc2len (dlc )
196
193
else :
197
194
raise ValueError ("No length/dlc columns present." )
198
195
199
196
msg = Message ()
200
- if isinstance (self .start_time , datetime ):
201
- msg .timestamp = (
202
- self .start_time + timedelta (milliseconds = float (cols [self .columns ["O" ]]))
203
- ).timestamp ()
204
- else :
205
- msg .timestamp = float (cols [1 ]) / 1000
197
+ msg .timestamp = float (cols [self .columns ["O" ]]) / 1000 + self ._start_time
206
198
msg .arbitration_id = int (cols [self .columns ["I" ]], 16 )
207
199
msg .is_extended_id = len (cols [self .columns ["I" ]]) > 4
208
200
msg .channel = int (cols [bus ]) if bus is not None else 1
209
201
msg .dlc = dlc
210
- msg .data = bytearray (
211
- [int (cols [i + self .columns ["D" ]], 16 ) for i in range (length )]
212
- )
202
+ if dlc :
203
+ msg .data = bytearray .fromhex (cols [self .columns ["D" ]])
213
204
msg .is_rx = cols [self .columns ["d" ]] == "Rx"
214
- msg .is_fd = type_ in [ "FD" , "FB" , "FE" , "BI" ]
215
- msg .bitrate_switch = type_ in [ "FB" , " FE" ]
216
- msg .error_state_indicator = type_ in [ "FE" , "BI" ]
205
+ msg .is_fd = type_ in { "FD" , "FB" , "FE" , "BI" }
206
+ msg .bitrate_switch = type_ in { "FB" , "FE" }
207
+ msg .error_state_indicator = type_ in { "FE" , "BI" }
217
208
218
209
return msg
219
210
220
- def _parse_cols_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
211
+ def _parse_cols_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
221
212
dtype = cols [2 ]
222
213
if dtype in ("Tx" , "Rx" ):
223
214
return self ._parse_msg_v1_1 (cols )
224
215
else :
225
216
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
226
217
return None
227
218
228
- def _parse_cols_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
219
+ def _parse_cols_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
229
220
dtype = cols [3 ]
230
221
if dtype in ("Tx" , "Rx" ):
231
222
return self ._parse_msg_v1_3 (cols )
232
223
else :
233
224
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
234
225
return None
235
226
236
- def _parse_cols_v2_x (self , cols : List [str ]) -> Optional [Message ]:
227
+ def _parse_cols_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
237
228
dtype = cols [self .columns ["T" ]]
238
- if dtype in [ "DT" , "FD" , "FB" ] :
229
+ if dtype in { "DT" , "FD" , "FB" , "FE" , "BI" } :
239
230
return self ._parse_msg_v2_x (cols )
240
231
else :
241
232
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
@@ -244,7 +235,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
244
235
def _parse_line (self , line : str ) -> Optional [Message ]:
245
236
logger .debug ("TRCReader: Parse '%s'" , line )
246
237
try :
247
- cols = line .split ()
238
+ cols = tuple ( line .split (maxsplit = self . _num_columns ) )
248
239
return self ._parse_cols (cols )
249
240
except IndexError :
250
241
logger .warning ("TRCReader: Failed to parse message '%s'" , line )
0 commit comments