8000 lib/opta: Add support for running on CPython. · arduino/arduino-lib-mpy@7ddfaae · GitHub
[go: up one dir, main page]

Skip to content

Commit 7ddfaae

Browse files
committed
lib/opta: Add support for running on CPython.
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
1 parent 8312406 commit 7ddfaae

File tree

2 files changed

+72
-15
lines changed

2 files changed

+72
-15
lines changed

lib/opta/example.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
level=logging.INFO # Switch to DEBUG to see raw commands
1515
)
1616

17+
# On CPython, something like the following should be used:
18+
# opta = opta.Opta(bus_id=3, pin_id=("/dev/gpiochip1", 5))
1719
opta = opta.Opta(bus_id=3)
1820

1921
# enum_devices initializes the bus, resets all expansions, and returns a list of

lib/opta/opta.py

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,24 @@
55
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
66
import struct
77
import logging
8-
from time import sleep_ms
9-
from machine import I2C
10-
from machine import Pin
11-
from micropython import const
8+
from time import sleep
9+
import sys
10+
11+
_is_micropython = sys.implementation.name == "micropython"
12+
13+
if _is_micropython:
14+
from machine import Pin
15+
from machine import I2C
16+
from micropython import const
17+
else:
18+
import gpiod
19+
from gpiod.line import Direction
20+
from gpiod.line import Value
21+
from smbus2 import SMBus, i2c_msg
22+
23+
def const(x):
24+
return x
25+
1226

1327
_MIN_ADDRESS = const(0x0B)
1428
_TMP_ADDRESS = const(0x0A)
@@ -70,6 +84,47 @@
7084
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))
7185

7286

87+
class IOPin:
88+
def __init__(self, pin_id):
89+
if _is_micropython:
90+
self.pin = Pin(pin_id, Pin.IN, Pin.PULL_UP)
91+
else:
92+
self.pin = gpiod.request_lines(
93+
pin_id[0],
94+
consumer="Opta",
95+
config={pin_id[1]: gpiod.LineSettings(direction=Direction.INPUT)},
96+
)
97+
98+
def read(self):
99+
if _is_micropython:
100+
return self.pin.value()
101+
else:
102+
return self.pin.get_values()[0] == Value.ACTIVE
103+
104+
105+
class I2CBus:
106+
def __init__(self, bus_id, freq):
107+
if _is_micropython:
108+
self.bus = I2C(bus_id, freq=freq)
109+
else:
110+
self.bus = SMBus(bus_id)
111+
112+
def read(self, addr, buf):
113+
if _is_micropython:
114+
self.bus.readfrom_into(addr, buf)
115+
else:
116+
msg = i2c_msg.read(addr, len(buf))
117+
self.bus.i2c_rdwr(msg)
118+
buf[:] = msg.buf[0:len(buf)]
119+
120+
def write(self, addr, buf):
121+
if _is_micropython:
122+
self.bus.writeto(addr, buf)
123+
else:
124+
msg = i2c_msg.write(addr, buf)
125+
self.bus.i2c_rdwr(msg)
126+
127+
73128
class Expansion:
74129
def __init__(self, opta, type, addr, name):
75130
self.opta = opta
@@ -274,7 +329,7 @@ def get_bool(k, d):
274329
if "default_value" in kwargs:
275330
value = kwargs["default_value"]
276331
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC_DEF, "<BHB", channel, value, 1)
277-
sleep_ms(250) # DAC requires at leas 250ms to update after a write.
332+
sleep(0.250) # DAC requires at leas 250ms to update after a write.
278333
elif channel_type == "din":
279334
deb_mode = kwargs.get("debounce_mode", "simple")
280335
self.opta._cmd(
@@ -309,18 +364,18 @@ def get_bool(k, d):
309364

310365

311366
class Opta:
312-
def __init__(self, bus_id, freq=400_000, det=None):
367+
def __init__(self, bus_id, freq=400_000, pin_id="BUS_DETECT"):
313368
"""
314369
Initializes an Opta controller.
315370
316371
Parameters:
317372
- bus_id : The I2C bus identifier.
318373
- freq : I2C bus frequency (default=400_000).
319-
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
374+
- pin_id : GPIO pin ID used for bus detection (default="BUS_DETECT").
320375
"""
321-
self.bus = I2C(bus_id, freq=freq)
376+
self.pin = IOPin(pin_id)
377+
self.bus = I2CBus(bus_id, freq)
322378
self.cmd_buf = memoryview(bytearray(256 + 2))
323-
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
324379
self.exp_types = {
325380
0x02: ("digital", "Opta Digital Mechanical"),
326381
0x03: ("digital", "Opta Digital Solid State"),
@@ -336,14 +391,14 @@ def _log_enabled(self, level):
336391
return logging.getLogger().isEnabledFor(level)
337392

338393
def _bus_read(self, addr, buf):
339-
self.bus.readfrom_into(addr, buf)
394+
self.bus.read(addr, buf)
340395
if self._log_enabled(logging.DEBUG):
341396
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))
342397

343398
def _bus_write(self, addr, buf):
344399
if self._log_enabled(logging.DEBUG):
345400
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
346-
self.bus.writeto(addr, buf)
401+
self.bus.write(addr, buf)
347402

348403
def _crc8(self, buf, poly=0x07, crc=0x00):
349404
for byte in buf:
@@ -382,7 +437,7 @@ def _cmd(self, addr, cmd, fmt=None, *args):
382437

383438
def _reset_bus(self, addr):
384439
self._cmd(addr, _CMD_CHIP_RESET, "B", 0x56)
385-
sleep_ms(2000)
440+
sleep(2)
386441

387442
def _set_address(self, addr, addr_new=None):
388443
if addr_new is not None:
@@ -413,9 +468,9 @@ def enum_devices(self):
413468

414469
addr = _MAX_ADDRESS
415470
# Assign temp I2C addresses to expansions.
416-
while not self.det.value():
471+
while not self.pin.read():
417472
self._set_address(0x0A, addr_new=addr)
418-
sleep_ms(100)
473+
sleep(0.100)
419474
try:
420475
xaddr, xtype = self._set_address(addr)
421476
if xaddr == addr:
@@ -428,7 +483,7 @@ def enum_devices(self):
428483
# Assign final I2C addresses to expansions.
429484
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
430485
self._set_address(addr - 1, addr_new)
431-
sleep_ms(100)
486+
sleep(0.100)
432487
try:
433488
xaddr, xtype = self._set_address(addr_new)
434489
addr -= 1

0 commit comments

Comments
 (0)
0