Use Datastreams

Datastreams are named, typed data channels between a device and host. Use them for structured application data: sensor readings with units, configuration values, status indicators. For direct hardware control (toggling pins, reading raw ADC), use the pin API instead.

1. Declare datastreams on the device

Register datastreams before calling device.begin():

#include <Conduyt.h>

ConduytSerial transport(Serial, 115200);
ConduytDevice device("TempNode", "1.0.0", transport);

void setup() {
  Serial.begin(115200);

  // Read-only: device pushes values, host reads/subscribes
  // Args: name, type, unit, writable
  device.addDatastream("temperature", CONDUYT_TYPE_FLOAT32, "celsius", false);

  // Writable: host writes values, device receives them
  device.addDatastream("setpoint", CONDUYT_TYPE_FLOAT32, "celsius", true);

  device.begin();
}

The fourth argument controls direction:

  • false = read-only — device pushes, host reads
  • true = writable — host writes, device receives

2. Push values from the device

Use non-blocking timing to push values at a fixed interval:

unsigned long lastPush = 0;

void loop() {
  device.poll();

  // Push temperature every 1 second
  if (millis() - lastPush >= 1000) {
    lastPush = millis();

    float temp = analogRead(A0) * 0.48828125f;  // rough voltage-to-celsius
    device.writeDatastream("temperature", temp);
  }
}

Each writeDatastream() call sends a DS_EVENT packet to the host. Don't call it every loop iteration — the serial link can't keep up with tens of thousands of updates per second.

3. Receive writes on the device

Register a callback for writable datastreams:

device.onDatastreamWrite("setpoint", [](ConduytPayloadReader &payload, ConduytContext &ctx) {
  float value = payload.readFloat32();
  Serial.print("New setpoint: ");
  Serial.println(value);
  // Apply the value to your control logic here
  ctx.ack();  // confirm receipt
});

If you don't call ctx.ack(), the device auto-acks. Call ctx.nak(errorCode) to reject a write.

Available type codes

CodeConstantSizeRange
0x01CONDUYT_TYPE_BOOL1 byte0 or 1
0x02CONDUYT_TYPE_INT81 byte-128 to 127
0x03CONDUYT_TYPE_UINT81 byte0 to 255
0x04CONDUYT_TYPE_INT162 bytes-32768 to 32767
0x05CONDUYT_TYPE_UINT162 bytes0 to 65535
0x06CONDUYT_TYPE_INT324 bytes-2^31 to 2^31-1
0x07CONDUYT_TYPE_FLOAT324 bytesIEEE 754 float
0x08CONDUYT_TYPE_STRINGvariableUTF-8, null-terminated
0x09CONDUYT_TYPE_BYTESvariableRaw binary

Host: JavaScript

Read the current value

const temp = await device.datastream('temperature').read()
console.log('Temperature:', temp)

Write a value

await device.datastream('setpoint').write(25.0)

Subscribe to live updates

// Yields a new value each time the device pushes an update
for await (const value of device.datastream('temperature').subscribe()) {
  console.log('Temperature:', value)
}

Full working example

// datastreams.mjs
import { ConduytDevice } from 'conduyt-js'
import { SerialTransport } from 'conduyt-js/transports/serial'

const device = await ConduytDevice.connect(
  new SerialTransport({ path: '<YOUR_PORT>', baudRate: 115200 })
)

// Read current temperature
const temp = await device.datastream('temperature').read()
console.log('Current temp:', temp)

// Set target temperature
await device.datastream('setpoint').write(22.0)
console.log('Setpoint set to 22.0')

// Stream live updates (runs until you Ctrl+C)
console.log('Streaming temperature updates...')
for await (const value of device.datastream('temperature').subscribe()) {
  console.log('Temperature:', value)
}
node datastreams.mjs

Host: Python

# datastreams.py
import asyncio
from conduyt import ConduytDevice
from conduyt.transports.serial import SerialTransport
from conduyt.protocol import CMD_DS_READ, CMD_DS_WRITE, CMD_DS_SUBSCRIBE
import struct


async def main():
    device = ConduytDevice(SerialTransport('<YOUR_PORT>'))
    await device.connect()

    # Read current temperature
    # Send DS_READ command with the datastream name
    resp = await device._send_command(
        CMD_DS_READ,
        b'temperature\x00'  # null-terminated datastream name
    )
    temp = struct.unpack('<f', resp[:4])[0]
    print(f"Current temp: {temp:.1f}")

    # Write setpoint
    # Send DS_WRITE with name + float32 value
    payload = b'setpoint\x00' + struct.pack('<f', 22.0)
    await device._send_command(CMD_DS_WRITE, payload)
    print("Setpoint set to 22.0")

    await device.disconnect()


asyncio.run(main())

Note: The Python SDK doesn't have a .datastream() proxy like JavaScript. You send raw DS_READ/DS_WRITE/DS_SUBSCRIBE commands using _send_command(). The built-in module wrappers in conduyt.modules use the same pattern internally.

Full firmware example

#include <Conduyt.h>

ConduytSerial transport(Serial, 115200);
ConduytDevice device("TempNode", "1.0.0", transport);

float currentSetpoint = 20.0f;

void setup() {
  Serial.begin(115200);

  device.addDatastream("temperature", CONDUYT_TYPE_FLOAT32, "celsius", false);
  device.addDatastream("setpoint", CONDUYT_TYPE_FLOAT32, "celsius", true);

  device.onDatastreamWrite("setpoint", [](ConduytPayloadReader &payload, ConduytContext &ctx) {
    currentSetpoint = payload.readFloat32();
    Serial.print("New setpoint: ");
    Serial.println(currentSetpoint);
    ctx.ack();
  });

  device.begin();
}

unsigned long lastPush = 0;

void loop() {
  device.poll();

  if (millis() - lastPush >= 1000) {
    lastPush = millis();
    float temp = analogRead(A0) * 0.48828125f;
    device.writeDatastream("temperature", temp);
  }
}

For wire format details and type encoding, see Datastream Types.