diff --git a/micropython/lora/lora-lorawan/lora/lorawan.py b/micropython/lora/lora-lorawan/lora/lorawan.py new file mode 100644 index 000000000..1b5f085bf --- /dev/null +++ b/micropython/lora/lora-lorawan/lora/lorawan.py @@ -0,0 +1,397 @@ +""" +LoRaWAN 1.0.x MAC layer for MicroPython. + +Implements Class A device operation with OTAA and ABP activation. +Runs on top of the SX127x raw LoRa driver. + +Usage (OTAA): + from lora import LoRa + from lorawan import LoRaWAN + + radio = LoRa(frequency=868100000, sf=7, bw=125000) + wan = LoRaWAN(radio, mode=LoRaWAN.OTAA, + dev_eui=bytes.fromhex('...'), + app_eui=bytes.fromhex('...'), + app_key=bytes.fromhex('...')) + wan.join() + wan.send(1, b'Hello') + +Usage (ABP): + wan = LoRaWAN(radio, mode=LoRaWAN.ABP, + dev_addr=0x01234567, + nwk_skey=bytes.fromhex('...'), + app_skey=bytes.fromhex('...')) + wan.send(1, b'Hello') +""" + +from micropython import const +import struct +import time +import os +from lora.lorawan_crypto import ( + aes_cmac, + aes_ctr_encrypt, + aes_ctr_decrypt, + compute_mic, + derive_session_keys, +) + +# LoRaWAN MHDR message types +_MTYPE_JOIN_REQUEST = const(0x00) +_MTYPE_JOIN_ACCEPT = const(0x20) +_MTYPE_UNCONFIRMED_UP = const(0x40) +_MTYPE_UNCONFIRMED_DOWN = const(0x60) +_MTYPE_CONFIRMED_UP = const(0x80) +_MTYPE_CONFIRMED_DOWN = const(0xA0) + +# Direction constants +_DIR_UP = const(0) +_DIR_DOWN = const(1) + +# LoRaWAN data rate table for EU868 +_DR_TABLE_EU868 = { + 0: (12, 125000), + 1: (11, 125000), + 2: (10, 125000), + 3: (9, 125000), + 4: (8, 125000), + 5: (7, 125000), +} + +# Default RX2 parameters (EU868) +_RX2_FREQ = 869525000 +_RX2_DR = 0 # SF12/125kHz + +# Class A timing +_JOIN_ACCEPT_DELAY1 = 5000 # ms +_JOIN_ACCEPT_DELAY2 = 6000 # ms +_RECEIVE_DELAY1 = 1000 # ms +_RECEIVE_DELAY2 = 2000 # ms + + +class LoRaWAN: + """LoRaWAN 1.0.x Class A device.""" + + OTAA = 0 + ABP = 1 + + def __init__( + self, + radio, + mode=0, + dev_eui=None, + app_eui=None, + app_key=None, + dev_addr=None, + nwk_skey=None, + app_skey=None, + ): + """ + Initialize LoRaWAN MAC layer. + + Args: + radio: LoRa radio instance (from lora module) + mode: LoRaWAN.OTAA or LoRaWAN.ABP + dev_eui: 8-byte Device EUI (OTAA) + app_eui: 8-byte Application EUI / JoinEUI (OTAA) + app_key: 16-byte Application Key (OTAA) + dev_addr: 4-byte Device Address as int (ABP) + nwk_skey: 16-byte Network Session Key (ABP) + app_skey: 16-byte Application Session Key (ABP) + """ + self._radio = radio + self._mode = mode + self._joined = False + + # Frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + # RX1 delay and data rate offset + self._rx1_delay = _RECEIVE_DELAY1 + self._rx1_dr_offset = 0 + + if mode == self.OTAA: + if not all([dev_eui, app_eui, app_key]): + raise ValueError("OTAA requires dev_eui, app_eui, app_key") + self._dev_eui = dev_eui + self._app_eui = app_eui + self._app_key = app_key + self._dev_addr = None + self._nwk_skey = None + self._app_skey = None + elif mode == self.ABP: + if not all([dev_addr is not None, nwk_skey, app_skey]): + raise ValueError("ABP requires dev_addr, nwk_skey, app_skey") + self._dev_addr = dev_addr + self._nwk_skey = nwk_skey + self._app_skey = app_skey + self._joined = True + else: + raise ValueError("mode must be OTAA or ABP") + + @property + def joined(self): + """Whether the device has joined the network.""" + return self._joined + + @property + def dev_addr(self): + """Device address (available after join).""" + return self._dev_addr + + def join(self, timeout=30000): + """ + Perform OTAA join procedure. + + Args: + timeout: Total timeout in ms for join attempts (default 30s) + + Raises: + RuntimeError: If join fails after timeout + """ + if self._mode != self.OTAA: + raise RuntimeError("join() only for OTAA mode") + + # Configure radio for join + self._radio._radio.set_sync_word(0x34) # LoRaWAN public sync word + + start = time.ticks_ms() + while time.ticks_diff(time.ticks_ms(), start) < timeout: + dev_nonce = os.urandom(2) + join_req = self._build_join_request(dev_nonce) + + # Send join request + self._radio._radio.send(join_req) + tx_end = time.ticks_ms() + + # RX1 window: JoinAcceptDelay1 + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY1) + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # RX2 window: JoinAcceptDelay2 at RX2 frequency/DR + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY2) + + # Restore original settings + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # Backoff before retry + time.sleep_ms(1000) + + raise RuntimeError("Join failed: timeout") + + def send(self, port, payload, confirmed=False): + """ + Send an uplink frame. + + Args: + port: FPort (1-223) + payload: Data bytes to send + confirmed: If True, send confirmed uplink + + Returns: + Downlink data bytes or None + """ + if not self._joined: + raise RuntimeError("Not joined") + + if port < 1 or port > 223: + raise ValueError("FPort must be 1-223") + + mtype = _MTYPE_CONFIRMED_UP if confirmed else _MTYPE_UNCONFIRMED_UP + frame = self._build_data_frame(mtype, port, payload) + + # Send frame + self._radio._radio.set_sync_word(0x34) + self._radio._radio.send(frame) + tx_end = time.ticks_ms() + + self._fcnt_up += 1 + + # Class A: open RX1, then RX2 + # RX1 + rx_data = self._rx_window(tx_end, self._rx1_delay) + if rx_data: + return self._process_downlink(rx_data) + + # RX2 + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + rx_data = self._rx_window(tx_end, _RECEIVE_DELAY2) + + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if rx_data: + return self._process_downlink(rx_data) + + return None + + def _rx_window(self, tx_end, delay_ms): + """Open a receive window at tx_end + delay_ms.""" + now = time.ticks_ms() + wait = time.ticks_diff(tx_end + delay_ms, now) + if wait > 0: + time.sleep_ms(wait) + return self._radio._radio.recv(timeout_ms=1000) + + def _build_join_request(self, dev_nonce): + """Build LoRaWAN join-request message.""" + mhdr = bytes([_MTYPE_JOIN_REQUEST | 0x00]) # Major=0 + # AppEUI and DevEUI in little-endian + body = mhdr + self._app_eui[::-1] + self._dev_eui[::-1] + dev_nonce + mic = aes_cmac(self._app_key, body)[:4] + return body + mic + + def _process_join_accept(self, data, dev_nonce): + """Process and validate a join-accept message.""" + if len(data) < 17: + return False + + # Decrypt join-accept (the entire payload after MHDR is encrypted) + mhdr = data[0] + if (mhdr & 0xE0) != _MTYPE_JOIN_ACCEPT: + return False + + from ucryptolib import aes + + encrypted = data[1:] + decrypted = bytearray() + for i in range(0, len(encrypted), 16): + block = encrypted[i : i + 16] + if len(block) < 16: + block = block + b"\x00" * (16 - len(block)) + decrypted += aes(self._app_key, 1).encrypt(block) + + # Parse: AppNonce(3) | NetID(3) | DevAddr(4) | DLSettings(1) | RxDelay(1) [| CFList] + app_nonce = bytes(decrypted[0:3]) + net_id = bytes(decrypted[3:6]) + self._dev_addr = struct.unpack("> 4) & 0x07 + + # Parse RxDelay + if rx_delay == 0: + self._rx1_delay = _RECEIVE_DELAY1 + else: + self._rx1_delay = rx_delay * 1000 + + # Derive session keys + self._nwk_skey, self._app_skey = derive_session_keys( + self._app_key, app_nonce, net_id, dev_nonce + ) + + # Reset frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + return True + + def _build_data_frame(self, mtype, port, payload): + """Build a LoRaWAN data frame (uplink).""" + mhdr = bytes([mtype | 0x00]) + + # FHDR: DevAddr(4) | FCtrl(1) | FCnt(2) + fctrl = 0x00 # No ADR, no ACK, no FOptsLen + fhdr = struct.pack(" 0 and len(msg) % _BLK == 0 + + X = b"\x00" * _BLK + for i in range(n - 1): + block = msg[i * _BLK : (i + 1) * _BLK] + X = _aes_ecb(key, _xor(X, block)) + + # Last block + if last_complete: + block = msg[(n - 1) * _BLK : n * _BLK] + block = _xor(block, K1) + else: + block = msg[(n - 1) * _BLK :] + block = block + b"\x80" + b"\x00" * (_BLK - 1 - len(block)) + block = _xor(block, K2) + + return _aes_ecb(key, _xor(X, block)) + + +def aes_ctr_encrypt(key, payload, dev_addr, fcnt, direction): + """ + LoRaWAN payload encryption using AES-128 in CTR mode. + + Per LoRaWAN 1.0.x spec section 4.3.3. + + Args: + key: 16-byte AppSKey (for FRMPayload) or NwkSKey (for FOpts) + payload: plaintext bytes + dev_addr: 4-byte device address (little-endian int) + fcnt: frame counter (uint32) + direction: 0 = uplink, 1 = downlink + + Returns: + encrypted/decrypted bytes (symmetric) + """ + k = (len(payload) + _BLK - 1) // _BLK + S = bytearray() + for i in range(1, k + 1): + A_i = struct.pack("> 16) & 0xFF) + self._write_reg(_REG_FRF_MID, (frf >> 8) & 0xFF) + self._write_reg(_REG_FRF_LSB, frf & 0xFF) + + def set_spreading_factor(self, sf): + """Set spreading factor (6-12).""" + if sf < 6 or sf > 12: + raise ValueError("SF must be 6-12") + + # Detection optimize and threshold for SF6 + if sf == 6: + self._write_reg(_REG_DETECTION_OPTIMIZE, 0xC5) + self._write_reg(_REG_DETECTION_THRESHOLD, 0x0C) + else: + self._write_reg(_REG_DETECTION_OPTIMIZE, 0xC3) + self._write_reg(_REG_DETECTION_THRESHOLD, 0x0A) + + val = self._read_reg(_REG_MODEM_CONFIG_2) + self._write_reg(_REG_MODEM_CONFIG_2, (val & 0x0F) | (sf << 4)) + + def set_bandwidth(self, bw_hz): + """Set signal bandwidth in Hz.""" + if self._chip == 1276: + if bw_hz not in _BW_TABLE_SX1276: + raise ValueError("Unsupported BW for SX1276") + bw_val = _BW_TABLE_SX1276[bw_hz] + val = self._read_reg(_REG_MODEM_CONFIG_1) + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0x0F) | (bw_val << 4)) + else: + if bw_hz not in _BW_TABLE_SX1272: + raise ValueError("Unsupported BW for SX1272") + bw_val = _BW_TABLE_SX1272[bw_hz] + val = self._read_reg(_REG_MODEM_CONFIG_1) + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0x3F) | (bw_val << 6)) + + def set_coding_rate(self, cr): + """Set coding rate denominator (5-8, i.e. 4/5 to 4/8).""" + if cr < 5 or cr > 8: + raise ValueError("CR must be 5-8") + cr_val = cr - 4 + val = self._read_reg(_REG_MODEM_CONFIG_1) + if self._chip == 1276: + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0xF1) | (cr_val << 1)) + else: + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0xC7) | (cr_val << 3)) + + def set_preamble_length(self, length): + """Set preamble length in symbols.""" + self._write_reg(_REG_PREAMBLE_MSB, (length >> 8) & 0xFF) + self._write_reg(_REG_PREAMBLE_LSB, length & 0xFF) + + def set_sync_word(self, sw): + """Set sync word (0x12 = private, 0x34 = LoRaWAN public).""" + self._write_reg(_REG_SYNC_WORD, sw) + + def set_tx_power(self, level, use_pa_boost=True): + """Set transmit power in dBm.""" + self._tx_power = level + if use_pa_boost: + if level > 17: + # Enable +20dBm on PA_BOOST + level = min(level, 20) + self._write_reg(_REG_PA_DAC, 0x87) + self.set_ocp(140) + self._write_reg(_REG_PA_CONFIG, _PA_BOOST | (level - 5)) + else: + self._write_reg(_REG_PA_DAC, 0x84) # default + self.set_ocp(100) + level = max(2, min(level, 17)) + self._write_reg(_REG_PA_CONFIG, _PA_BOOST | (level - 2)) + else: + # RFO pin + level = max(0, min(level, 14)) + self._write_reg(_REG_PA_CONFIG, 0x70 | level) + + def set_ocp(self, ma): + """Set over-current protection trim in mA.""" + if ma <= 120: + ocp_trim = int((ma - 45) / 5) + elif ma <= 240: + ocp_trim = int((ma + 30) / 10) + else: + ocp_trim = 27 + self._write_reg(_REG_OCP, 0x20 | (ocp_trim & 0x1F)) + + def set_implicit_header(self, implicit): + """Enable or disable implicit header mode.""" + val = self._read_reg(_REG_MODEM_CONFIG_1) + if self._chip == 1276: + if implicit: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x01) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFE) + else: + if implicit: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x04) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFB) + + def set_crc(self, enable): + """Enable or disable CRC checking.""" + if self._chip == 1276: + val = self._read_reg(_REG_MODEM_CONFIG_2) + if enable: + self._write_reg(_REG_MODEM_CONFIG_2, val | 0x04) + else: + self._write_reg(_REG_MODEM_CONFIG_2, val & 0xFB) + else: + val = self._read_reg(_REG_MODEM_CONFIG_1) + if enable: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x02) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFD) + + def send(self, data): + """Transmit data (bytes). Blocks until TX_DONE.""" + self.standby() + + # Set FIFO pointer to TX base + self._write_reg(_REG_FIFO_ADDR_PTR, 0) + # Write payload + self._write_buf(_REG_FIFO, data) + # Set payload length + self._write_reg(_REG_PAYLOAD_LENGTH, len(data)) + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to TX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x40) + + # Start TX + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_TX) + + # Wait for TX_DONE + while not (self._read_reg(_REG_IRQ_FLAGS) & _IRQ_TX_DONE): + time.sleep_ms(1) + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + self.standby() + + def recv(self, timeout_ms=0): + """ + Receive a packet. + + Args: + timeout_ms: 0 = single RX with register timeout, >0 = poll for ms + + Returns: + bytes or None if timeout/CRC error + """ + self.standby() + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to RX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x00) + + if timeout_ms > 0: + # Continuous RX, poll with Python timeout + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_CONTINUOUS) + start = time.ticks_ms() + while time.ticks_diff(time.ticks_ms(), start) < timeout_ms: + irq = self._read_reg(_REG_IRQ_FLAGS) + if irq & _IRQ_RX_DONE: + break + time.sleep_ms(1) + else: + self.standby() + return None + else: + # Single RX with hardware timeout + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_SINGLE) + while True: + irq = self._read_reg(_REG_IRQ_FLAGS) + if irq & (_IRQ_RX_DONE | _IRQ_RX_TIMEOUT): + break + time.sleep_ms(1) + if irq & _IRQ_RX_TIMEOUT: + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + self.standby() + return None + + # Check CRC + irq = self._read_reg(_REG_IRQ_FLAGS) + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + if irq & _IRQ_PAYLOAD_CRC_ERROR: + self.standby() + return None + + # Read payload + nb_bytes = self._read_reg(_REG_RX_NB_BYTES) + rx_addr = self._read_reg(_REG_FIFO_RX_CURRENT_ADDR) + self._write_reg(_REG_FIFO_ADDR_PTR, rx_addr) + payload = self._read_buf(_REG_FIFO, nb_bytes) + + self.standby() + return bytes(payload) + + def on_recv(self, callback): + """ + Set up asynchronous receive via DIO0 interrupt. + + callback(payload_bytes) is called when a packet is received. + Pass None to disable. + """ + if callback is None: + self._dio0.irq(handler=None) + self._irq_callback = None + self.standby() + return + + self._irq_callback = callback + self.standby() + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to RX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x00) + + # Set up IRQ + self._dio0.irq(trigger=Pin.IRQ_RISING, handler=self._handle_irq) + + # Start continuous RX + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_CONTINUOUS) + + def _handle_irq(self, pin): + """DIO0 interrupt handler — reads IRQ flags to disambiguate.""" + irq = self._read_reg(_REG_IRQ_FLAGS) + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + if irq & _IRQ_RX_DONE and not (irq & _IRQ_PAYLOAD_CRC_ERROR): + nb_bytes = self._read_reg(_REG_RX_NB_BYTES) + rx_addr = self._read_reg(_REG_FIFO_RX_CURRENT_ADDR) + self._write_reg(_REG_FIFO_ADDR_PTR, rx_addr) + payload = self._read_buf(_REG_FIFO, nb_bytes) + if self._irq_callback: + self._irq_callback(bytes(payload)) + + @property + def rssi(self): + """Last packet RSSI in dBm.""" + raw = self._read_reg(_REG_PKT_RSSI_VALUE) + if self._chip == 1272: + return -139 + raw + else: + # SX1276 HF port (>862 MHz) + if self._frequency >= 862000000: + return -157 + raw + else: + return -164 + raw + + @property + def snr(self): + """Last packet SNR in dB.""" + raw = self._read_reg(_REG_PKT_SNR_VALUE) + if raw > 127: + raw -= 256 + return raw * 0.25 + + # --- SPI register access --- + + def _read_reg(self, addr): + self._cs.value(0) + self._spi.write(bytearray([addr & 0x7F])) + result = self._spi.read(1) + self._cs.value(1) + return result[0] + + def _write_reg(self, addr, value): + self._cs.value(0) + self._spi.write(bytearray([addr | 0x80, value])) + self._cs.value(1) + + def _read_buf(self, addr, length): + self._cs.value(0) + self._spi.write(bytearray([addr & 0x7F])) + result = self._spi.read(length) + self._cs.value(1) + return result + + def _write_buf(self, addr, data): + self._cs.value(0) + self._spi.write(bytearray([addr | 0x80]) + data) + self._cs.value(1) diff --git a/micropython/lora/lora-sx127x-pycom/manifest.py b/micropython/lora/lora-sx127x-pycom/manifest.py new file mode 100644 index 000000000..f1f2331ba --- /dev/null +++ b/micropython/lora/lora-sx127x-pycom/manifest.py @@ -0,0 +1,2 @@ +metadata(description="SX1272/SX1276 unified LoRa driver for Pycom boards.", version="0.1.0") +package("lora")