Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ with ThingSetTCP("192.0.2.1") as client:
client.exec(0x52, []) # HMCU/xSaveNVM()
```

`update` also accepts list values, which are sent as a CBOR array (binary
transports) or JSON array (text transport):

```python
with ThingSetTCP("192.0.2.1") as client:
client.update(0x70A, [3.7, 3.7, 3.6], parent_id=0x07) # array of floats
```

### CAN

```python
Expand Down Expand Up @@ -217,6 +225,27 @@ thingset get Metadata/rBoard -p /dev/ttyACM0
thingset schema -p /dev/ttyACM0
```

### List values

`update` accepts a list either as multiple value tokens (one per element) or
as a single JSON-array literal. The two forms are equivalent:

```sh
# Binary transports — parent_id value_id <values...>
thingset update 7 70A 3.7 3.7 3.6 -i 192.0.2.1
thingset update 7 70A '[3.7,3.7,3.6]' -i 192.0.2.1
thingset update 7 70A 3.7 3.7 3.6 -c vcan0 -t 10

# Serial (text) — path <values...>
thingset update Module/aCells 3.7 3.7 3.6 -p /dev/ttyACM0
thingset update Module/aCells '[3.7,3.7,3.6]' -p /dev/ttyACM0
```

On binary transports the list goes on the wire as a CBOR array; float
elements are coerced to 32-bit so an embedded target sees `float[]`
(not `double[]`). On serial it becomes a JSON array inside the text
update payload.

Output is decorated with names and types when the firmware exposes them via
the metadata overlay:

Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ build-backend = "hatchling.build"

[project]
name = "python_thingset"
version = "0.3.1"
version = "0.4.0"
description = "A Python library for ThingSet functionality"
authors = [
{ name = "Adam Mitchell", email = "adam.mitchell@brillpower.com" }
]
license = "Apache-2.0"
license-files = [ "LICEN[CS]E*" ]
license = { text = "Apache-2.0" }
readme = "README.md"
requires-python = ">=3.10"
classifiers = [
Expand Down
63 changes: 44 additions & 19 deletions python_thingset/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
#!/usr/bin/env python3

import argparse
import json
from time import sleep
from typing import Union
from typing import Any, Union

from ._protocol import WireFormat
from .client import ThingSetClient
Expand Down Expand Up @@ -36,6 +37,26 @@ def process_args(args: list) -> list:
return processed_args


def _parse_update_value(tokens: list) -> Any:
"""Convert the CLI's value tokens into the value to send.

- One token that starts with ``[`` is parsed as a JSON array.
- Two or more tokens become a list of typed values.
- A single non-JSON token becomes a scalar (int/float/str).
"""
if len(tokens) == 1:
s = tokens[0].strip()
if s.startswith("["):
try:
parsed = json.loads(s)
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
pass
return process_args(tokens)[0]
return process_args(tokens)


def get_schema(
ts: ThingSetClient,
object_id: Union[int, str],
Expand Down Expand Up @@ -175,9 +196,11 @@ def setup_args() -> argparse.Namespace:
)
update_parser.add_argument(
"update_args",
help="If using -p/--port: path value - Path of value to update (example: "
"Module/sCanMaxLogLevel 3) (value is decimal if numeric). If using -c/--can-bus: "
"parent_id value_id value - (example: 0F F02 MyValue)",
help="If using -p/--port: path value [value ...] - Path of value to update (example: "
"Module/sCanMaxLogLevel 3). If using -c/--can-bus or -i/--ip: "
"parent_id value_id value [value ...] - (example: 0F F02 MyValue). "
"Multiple value tokens become a list; a single token starting with "
"'[' is parsed as a JSON array.",
nargs="*",
)

Expand Down Expand Up @@ -205,39 +228,42 @@ def setup_args() -> argparse.Namespace:
arg_parser.error("-t/--target-address is required with -c/--can_bus")

if args.method == "update":
if len(args.update_args) != 3:
if len(args.update_args) < 3:
arg_parser.error(
"When using update with -c/--can-bus you must suply a "
"parent_id, value_id and value "
"(example: thingset update f f03 MyValue -c vcan0"
"parent_id, value_id and value (or values) "
"(example: thingset update f f03 MyValue -c vcan0, or "
"thingset update f f03 1.0 2.0 3.0 -c vcan0)"
)
else:
args.parent_id = args.update_args[0]
args.value_id = args.update_args[1]
args.value = [args.update_args[2]]
args.value = _parse_update_value(args.update_args[2:])
elif args.port:
if args.method == "update":
if len(args.update_args) != 2:
if len(args.update_args) < 2:
arg_parser.error(
"When using update with -p/--port you must suply a path "
"and a value (example: "
"thingset update Module/sCanMaxLogLevel 4 -p /dev/pts/5"
"and a value (or values) (example: "
"thingset update Module/sCanMaxLogLevel 4 -p /dev/pts/5, "
"or thingset update Module/aFloats 1.0 2.0 3.0 -p /dev/pts/5)"
)
else:
args.parent_id = args.update_args[0]
args.value = [args.update_args[1]]
args.value = _parse_update_value(args.update_args[1:])
elif args.ip:
if args.method == "update":
if len(args.update_args) != 3:
if len(args.update_args) < 3:
arg_parser.error(
"When using update with -i/--ip you must suply a "
"parent_id, value_id and value "
"(example: thingset update f f03 MyValue -i 192.0.2.1"
"parent_id, value_id and value (or values) "
"(example: thingset update f f03 MyValue -i 192.0.2.1, or "
"thingset update f f03 1.0 2.0 3.0 -i 192.0.2.1)"
)
else:
args.parent_id = args.update_args[0]
args.value_id = args.update_args[1]
args.value = [args.update_args[2]]
args.value = _parse_update_value(args.update_args[2:])

if not (args.can_bus or args.port or args.ip):
arg_parser.error("One of -c/--can_bus, -i/--ip or -p/--port is required")
Expand Down Expand Up @@ -295,16 +321,15 @@ def _dispatch(ts: ThingSetClient, args: argparse.Namespace):
case "update":
if is_serial:
return ts.update(args.parent_id, args.value)
p_args = process_args(args.value)
if is_tcp:
return ts.update(
int(args.value_id, 16),
p_args[0],
args.value,
parent_id=int(args.parent_id, 16),
)
return ts.update(
int(args.value_id, 16),
p_args[0],
args.value,
int(args.target_address, 16),
int(args.parent_id, 16),
)
Expand Down
9 changes: 7 additions & 2 deletions python_thingset/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
_METADATA_KEY_NAME = 26 # 0x1A
_METADATA_KEY_TYPE = 27 # 0x1B
_METADATA_KEY_ACCESS = 28 # 0x1C
_RECURSIVE_TYPE = "group"
# Types whose metadata advertises a parent node we can descend into.
# "group" is the conventional namespace; "record"/"record[]" appear when
# a property exposes a struct or array-of-struct (see TS++ ThingSetType.hpp:
# the ThingSetType default is "record" and array suffixes append "[]"),
# and the inner record members are registered as parent-scoped children.
_RECURSIVE_TYPES = {"group", "record", "record[]"}


class ThingSetClient(ABC):
Expand Down Expand Up @@ -146,7 +151,7 @@ def _walk_schema(
full_path = f"{path_prefix}/{name}" if path_prefix else name

children: List[SchemaNode] = []
if type_str == _RECURSIVE_TYPE:
if type_str in _RECURSIVE_TYPES:
children = self._walk_schema(
cid, full_path, node_id, by_id, by_path
)
Expand Down
22 changes: 17 additions & 5 deletions python_thingset/encoders/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,30 @@ def encode_exec(self, value_id: int, args: List[Union[Any, None]]) -> bytes:
)

def encode_update(self, parent_id: int, value_id: int, value: Any) -> bytes:
if isinstance(value, float):
value = self.to_f32(value)
if isinstance(value, str):
if value.lower() == "true" or value.lower() == "false":
value = json.loads(value.lower())
value = self._coerce_value(value)

return bytes(
[ThingSetRequest.UPDATE]
+ list(cbor2.dumps(parent_id))
+ list(cbor2.dumps({value_id: value}, canonical=True))
)

def _coerce_value(self, value: Any) -> Any:
"""Recursively coerce a value for embedded targets: doubles → float32,
and ``"true"``/``"false"`` strings → bool. Lists are walked element-wise
so an array of floats round-trips as a CBOR array of float32s."""
if isinstance(value, list):
return [self._coerce_value(v) for v in value]
# bool is a subclass of int — keep as-is, don't coerce
if isinstance(value, bool):
return value
if isinstance(value, float):
return self.to_f32(value)
if isinstance(value, str):
if value.lower() == "true" or value.lower() == "false":
return json.loads(value.lower())
return value

def to_f32(self, value: float) -> float:
"""In Python, all floats are actually doubles. This does not map well to embedded targets where
there is a clear distinction between the two.
Expand Down
34 changes: 22 additions & 12 deletions python_thingset/encoders/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,17 @@ def encode_exec(self, value_id: str, args: List[Union[Any, None]]) -> bytes:

def encode_update(self, parent_id: None, value_id: str, value: Any) -> bytes:
"""properly format strings for transmission, add args to stringified list"""
value = value[0]

val = None

if isinstance(value, int):
val = int(value)

if isinstance(value, float):
val = float(value)

if val is None:
val = f'\\"{value}\\"'
# Legacy CLI convention: scalars are wrapped in a single-element list
# before being passed in. Unwrap so the wire format stays scalar.
# Multi-element or nested lists fall through and become JSON arrays.
if (
isinstance(value, list)
and len(value) == 1
and not isinstance(value[0], list)
):
value = value[0]

val = self._encode_value(value)

path = " "
value_name = None
Expand All @@ -76,3 +75,14 @@ def encode_update(self, parent_id: None, value_id: str, value: Any) -> bytes:
value_path = value_path.replace("£", "{").replace("$", "}")

return f"""thingset ={value_path}\n""".encode()

def _encode_value(self, value: Any) -> str:
"""Render a single value for the text wire format."""
# bool is a subclass of int — check it first
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
return "[" + ",".join(self._encode_value(v) for v in value) + "]"
if isinstance(value, (int, float)):
return str(value)
return f'\\"{value}\\"'
35 changes: 35 additions & 0 deletions tests/encoders/binary/test_enc_bin_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,38 @@ def test_update_bool_true():
def test_update_bool_false():
encoded = encoder.encode_update(0x0, 0x4F, "false")
assert encoded == b"\x07\x00\xa1\x18O\xf4"


def test_update_list_of_ints():
import cbor2

encoded = encoder.encode_update(0x0, 0x4F, [1, 2, 3])
expected = bytes([0x07]) + cbor2.dumps(0x0) + cbor2.dumps({0x4F: [1, 2, 3]}, canonical=True)
assert encoded == expected


def test_update_list_of_floats_uses_f32():
# Each element should be coerced to float32, matching the scalar
# behaviour. 3.14 → 0xfa40 48f5c3.
import cbor2

encoded = encoder.encode_update(0x0, 0x4F, [3.14, 1.0])
coerced = [encoder.to_f32(3.14), encoder.to_f32(1.0)]
expected = bytes([0x07]) + cbor2.dumps(0x0) + cbor2.dumps({0x4F: coerced}, canonical=True)
assert encoded == expected


def test_update_empty_list():
import cbor2

encoded = encoder.encode_update(0x0, 0x4F, [])
expected = bytes([0x07]) + cbor2.dumps(0x0) + cbor2.dumps({0x4F: []}, canonical=True)
assert encoded == expected


def test_update_list_of_strings():
import cbor2

encoded = encoder.encode_update(0x0, 0x4F, ["a", "b"])
expected = bytes([0x07]) + cbor2.dumps(0x0) + cbor2.dumps({0x4F: ["a", "b"]}, canonical=True)
assert encoded == expected
36 changes: 36 additions & 0 deletions tests/encoders/text/test_enc_txt_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,39 @@ def test_update_value_at_depth_one():
def test_update_value_at_depth_two():
encoded = encoder.encode_update(None, "One/Two/Value", [1])
assert encoded == """thingset =One/Two {\\"Value\\":1}\n""".encode()


def test_update_value_scalar_int_unwrapped():
# New API: scalar value passed directly (no list wrapper)
encoded = encoder.encode_update(None, "Value", 1)
assert encoded == """thingset = {\\"Value\\":1}\n""".encode()


def test_update_value_list_of_ints():
encoded = encoder.encode_update(None, "Value", [1, 2, 3])
assert encoded == """thingset = {\\"Value\\":[1,2,3]}\n""".encode()


def test_update_value_list_of_floats():
encoded = encoder.encode_update(None, "Value", [1.0, 2.5, 3.14])
assert encoded == """thingset = {\\"Value\\":[1.0,2.5,3.14]}\n""".encode()


def test_update_value_list_of_strings():
encoded = encoder.encode_update(None, "Value", ["a", "b"])
assert encoded == """thingset = {\\"Value\\":[\\"a\\",\\"b\\"]}\n""".encode()


def test_update_value_empty_list():
encoded = encoder.encode_update(None, "Value", [])
assert encoded == """thingset = {\\"Value\\":[]}\n""".encode()


def test_update_value_bool_true():
encoded = encoder.encode_update(None, "Value", True)
assert encoded == """thingset = {\\"Value\\":true}\n""".encode()


def test_update_value_list_at_depth_one():
encoded = encoder.encode_update(None, "One/Value", [1, 2, 3])
assert encoded == """thingset =One {\\"Value\\":[1,2,3]}\n""".encode()
Loading