5
5
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
6
import struct
7
7
import logging
8
- from time import sleep_ms
9
- from machine import I2C
10
- from machine import Pin
11
- from micropython import const
8
+ from time import sleep
9
+ import sys
10
+
11
+ _is_micropython = sys .implementation .name == "micropython"
12
+
13
+ if _is_micropython :
14
+ from machine import Pin
15
+ from machine import I2C
16
+ from micropython import const
17
+ else :
18
+ import gpiod
19
+ from gpiod .line import Direction
20
+ from gpiod .line import Value
21
+ from smbus2 import SMBus , i2c_msg
22
+
23
+ def const (x ):
24
+ return x
25
+
12
26
13
27
_MIN_ADDRESS = const (0x0B )
14
28
_TMP_ADDRESS = const (0x0A )
70
84
_CHANNEL_TYPES = const (("adc" , "dac" , "rtd" , "pwm" , "hiz" , "din" ))
71
85
72
86
87
+ class IOPin :
88
+ def __init__ (self , pin_id ):
89
+ if _is_micropython :
90
+ self .pin = Pin (pin_id , Pin .IN , Pin .PULL_UP )
91
+ else :
92
+ self .pin = gpiod .request_lines (
93
+ pin_id [0 ],
94
+ consumer = "Opta" ,
95
+ config = {pin_id [1 ]: gpiod .LineSettings (direction = Direction .INPUT )},
96
+ )
97
+
98
+ def read (self ):
99
+ if _is_micropython :
100
+ return self .pin .value ()
101
+ else :
102
+ return self .pin .get_values ()[0 ] == Value .ACTIVE
103
+
104
+
105
+ class I2CBus :
106
+ def __init__ (self , bus_id , freq ):
107
+ if _is_micropython :
108
+ self .bus = I2C (bus_id , freq = freq )
109
+ else :
110
+ self .bus = SMBus (bus_id )
111
+
112
+ def read (self , addr , buf ):
113
+ if _is_micropython :
114
+ self .bus .readfrom_into (addr , buf )
115
+ else :
116
+ msg = i2c_msg .read (addr , len (buf ))
117
+ self .bus .i2c_rdwr (msg )
118
+ buf [:] = msg .buf [0 :len (buf )]
119
+
120
+ def write (self , addr , buf ):
121
+ if _is_micropython :
122
+ self .bus .writeto (addr , buf )
123
+ else :
124
+ msg = i2c_msg .write (addr , buf )
125
+ self .bus .i2c_rdwr (msg )
126
+
127
+
73
128
class Expansion :
74
129
def __init__ (self , opta , type , addr , name ):
75
130
self .opta = opta
@@ -274,7 +329,7 @@ def get_bool(k, d):
274
329
if "default_value" in kwargs :
275
330
value = kwargs ["default_value" ]
276
331
self .opta ._cmd (self .addr , _CMD_SET_ANALOG_DAC_DEF , "<BHB" , channel , value , 1 )
277
- sleep_ms ( 250 ) # DAC requires at leas 250ms to update after a write.
332
+ sleep ( 0. 250 ) # DAC requires at leas 250ms to update after a write.
278
333
elif channel_type == "din" :
279
334
deb_mode = kwargs .get ("debounce_mode" , "simple" )
280
335
self .opta ._cmd (
@@ -309,18 +364,18 @@ def get_bool(k, d):
309
364
310
365
311
366
class Opta :
312
- def __init__ (self , bus_id , freq = 400_000 , det = None ):
367
+ def __init__ (self , bus_id , freq = 400_000 , pin_id = "BUS_DETECT" ):
313
368
"""
314
369
Initializes an Opta controller.
315
370
316
371
Parameters:
317
372
- bus_id : The I2C bus identifier.
318
373
- freq : I2C bus frequency (default=400_000).
319
- - det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
374
+ - pin_id : GPIO pin ID used for bus detection (default= "BUS_DETECT").
320
375
"""
321
- self .bus = I2C (bus_id , freq = freq )
376
+ self .pin = IOPin (pin_id )
377
+ self .bus = I2CBus (bus_id , freq )
322
378
self .cmd_buf = memoryview (bytearray (256 + 2 ))
323
- self .det = Pin ("BUS_DETECT" , Pin .IN , Pin .PULL_UP ) if det is None else det
324
379
self .exp_types = {
325
380
0x02 : ("digital" , "Opta Digital Mechanical" ),
326
381
0x03 : ("digital" , "Opta Digital Solid State" ),
@@ -336,14 +391,14 @@ def _log_enabled(self, level):
336
391
return logging .getLogger ().isEnabledFor (level )
337
392
338
393
def _bus_read (self , addr , buf ):
339
- self .bus .readfrom_into (addr , buf )
394
+ self .bus .read (addr , buf )
340
395
if self ._log_enabled (logging .DEBUG ):
341
396
self ._log_debug ("Recv: " + " " .join (["%02X" % (a ) for a in buf ]))
342
397
343
398
def _bus_write (self , addr , buf ):
344
399
if self ._log_enabled (logging .DEBUG ):
345
400
self ._log_debug ("Send: " + " " .join (["%02X" % (a ) for a in buf ]))
346
- self .bus .writeto (addr , buf )
401
+ self .bus .write (addr , buf )
347
402
348
403
def _crc8 (self , buf , poly = 0x07 , crc = 0x00 ):
349
404
for byte in buf :
@@ -382,7 +437,7 @@ def _cmd(self, addr, cmd, fmt=None, *args):
382
437
383
438
def _reset_bus (self , addr ):
384
439
self ._cmd (addr , _CMD_CHIP_RESET , "B" , 0x56 )
385
- sleep_ms ( 2000 )
440
+ sleep ( 2 )
386
441
387
442
def _set_address (self , addr , addr_new = None ):
388
443
if addr_new is not None :
@@ -413,9 +468,9 @@ def enum_devices(self):
413
468
414
469
addr = _MAX_ADDRESS
415
470
# Assign temp I2C addresses to expansions.
416
- while not self .det . value ():
471
+ while not self .pin . read ():
417
472
self ._set_address (0x0A , addr_new = addr )
418
- sleep_ms ( 100 )
473
+ sleep ( 0. 100 )
419
474
try :
420
475
xaddr , xtype = self ._set_address (addr )
421
476
if xaddr == addr :
@@ -428,7 +483,7 @@ def enum_devices(self):
428
483
# Assign final I2C addresses to expansions.
429
484
for addr_new in range (_MIN_ADDRESS , _MIN_ADDRESS + addr - _MAX_ADDRESS ):
430
485
self ._set_address (addr - 1 , addr_new )
431
- sleep_ms ( 100 )
486
+ sleep ( 0. 100 )
432
487
try :
433
488
xaddr , xtype = self ._set_address (addr_new )
434
489
addr -= 1
0 commit comments