Python API

The conduyt-py package. Requires Python 3.10+.

pip install conduyt-py

ConduytDevice

Async API. All I/O methods are coroutines.

Constructor

ConduytDevice(transport: ConduytTransport, timeout_ms: int = 5000)

Methods

MethodReturnDescription
async connect()HelloRespConnect and perform HELLO handshake
async disconnect()NoneClose the transport
async ping()NoneSend PING, wait for PONG
async reset()NoneSend RESET command
pin(num)PinProxyGet a pin proxy

Properties

PropertyTypeDescription
connectedboolConnection state
capabilitiesHelloResp | NoneParsed HELLO_RESP, None before connect

Usage

import asyncio
from conduyt import ConduytDevice
from conduyt.transports.serial import SerialTransport

async def main():
    transport = SerialTransport('/dev/ttyUSB0')
    device = ConduytDevice(transport, timeout_ms=5000)

    hello = await device.connect()
    print(hello.firmware_name)
    print(len(hello.pins), "pins")

    await device.ping()
    await device.disconnect()

asyncio.run(main())

ConduytDeviceSync

Synchronous wrapper around ConduytDevice. Same interface minus async/await. Uses close() instead of disconnect().

from conduyt import ConduytDeviceSync
from conduyt.transports.mock import MockTransport

device = ConduytDeviceSync(MockTransport())
hello = device.connect()
device.ping()
device.close()
MethodReturnDescription
connect()HelloRespConnect and perform HELLO handshake
close()NoneClose the transport
ping()NoneSend PING, wait for PONG
reset()NoneSend RESET command
pin(num)PinProxyGet a pin proxy

PinProxy

Returned by device.pin(num).

MethodReturnDescription
async mode(mode)PinProxy (self, chainable)Set pin mode
async write(value)NoneWrite digital or PWM value
async read()intRead pin value

Pin modes: "input", "output", "pwm", "analog", "input_pullup".

pin = device.pin(13)
await pin.mode('output')
await pin.write(1)

value = await device.pin(0).read()

Module Wrappers

Module wrappers use raw _send_command() internally to communicate with firmware modules. Import from conduyt.modules.

ClassFirmware Module
ConduytServoservo
ConduytNeoPixelneopixel
ConduytEncoderencoder
ConduytStepperstepper
ConduytDHTdht
ConduytOLEDoled1306
ConduytPIDpid
from conduyt.modules import ConduytServo, ConduytNeoPixel, ConduytDHT

servo = ConduytServo(device, module_id=0)
await servo.attach(9)
await servo.write(90)
await servo.detach()

neo = ConduytNeoPixel(device, module_id=1)
await neo.begin(pin=6, count=30)
await neo.fill(255, 0, 0)
await neo.show()

dht = ConduytDHT(device, module_id=2)
await dht.begin(pin=4, sensor_type=22)
reading = await dht.read()
print(reading.temperature, reading.humidity)

Error Classes

ConduytNAKError

Raised when the device responds with NAK.

AttributeTypeDescription
error_namestrError name (e.g., "INVALID_PIN"). Snake_case attribute name.
codeintRaw error code byte

ConduytTimeoutError

Raised when a command receives no response within the timeout period.

ConduytDisconnectedError

Raised when a command is attempted on a disconnected device.

from conduyt import ConduytNAKError, ConduytTimeoutError, ConduytDisconnectedError

try:
    await device.pin(99).write(1)
except ConduytNAKError as e:
    print(e.error_name)  # "INVALID_PIN"
    print(e.code)        # 0x04
except ConduytTimeoutError:
    print("Command timed out")
except ConduytDisconnectedError:
    print("Device disconnected")

Transports

TransportImportExtras
Mockconduyt.transports.mock(none)
Serialconduyt.transports.serialpip install conduyt-py[serial]
MQTTconduyt.transports.mqttpip install conduyt-py[mqtt]

Install all extras: pip install conduyt-py[all].

All transports implement the ConduytTransport protocol:

class ConduytTransport(Protocol):
    async def open(self) -> None: ...
    async def close(self) -> None: ...
    async def send(self, packet: bytes) -> None: ...
    def on_packet(self, handler: Callable[[bytes], None]) -> None: ...
    @property
    def needs_cobs(self) -> bool: ...

Types

All types are dataclasses.

@dataclass
class HelloResp:
    firmware_name: str
    firmware_version: tuple[int, int, int]
    mcu_id: bytes
    ota_capable: bool
    pins: list[PinCapability]
    i2c_buses: int
    spi_buses: int
    uart_count: int
    max_payload: int
    modules: list[ModuleDescriptor]
    datastreams: list[DatastreamDescriptor]

@dataclass
class PinCapability:
    pin: int
    capabilities: int  # bitmask of PIN_CAP values

@dataclass
class ModuleDescriptor:
    module_id: int
    name: str
    version_major: int
    version_minor: int
    pins: list[int]

@dataclass
class DatastreamDescriptor:
    index: int
    name: str
    type: int
    unit: str
    writable: bool
    pin_ref: int
    retain: bool

@dataclass
class ConduytPacket:
    version: int
    type: int
    seq: int
    payload: bytes