Package shimmer_listener
shimmer-listener
This library allows you to connect to a Shimmer2/2r mote via Bluetooth both in Master and Slave mode, interacting with the applications on the mote.
When communicating with a Shimmer mote using an app made with this library as the Bluetooth Master, you have to implement the presentation protocol inside of the Shimmer nesC application. This protocol is a way to inform the Bt Master about the data format used when sending messages.
Presentation protocol
The protocol is implemented by sending a simple struct via Bluetooth as the very first message after successfully establishing a connection with the Bt Master. Its format is the following:
typedef char key_string[MAX_STR_LEN];
typedef struct {
uint8_t framesize;
uint8_t chunklen;
char format[MAX_FMT_LEN];
key_string keys[MAX_KEY_NUM];
} frameinfo;
The presentation frame is automatically interpreted from the BtStream, so you don't have to do anything from this side of the communication.
Callbacks
A series of callbacks can be used and set as properties to intercept certain events:
-
on_connect(mac: str, info: frameinfo) -> None
Called when a mote identified by mac succesfully connects. All the information regarding the exchanged data format, obtained through the presentatin protocol are accessible via info.
-
on_message(mac: str, frame: Dict[str, Any]) -> None
Called when a message is received from a mote identified by mac. The message is returned as a dict with the keys previously obtained from the presentation protocol.
-
on_disconnect(mac: str, lost: bool) -> None
Called when a mote identified by mac disconnects. If lost is true, the disconnect event happened because the connection has been lost.
Expand source code
"""
## shimmer-listener
This library allows you to connect to a Shimmer2/2r mote via Bluetooth both in Master and Slave
mode, interacting with the applications on the mote.
When communicating with a Shimmer mote using an app made with this library as the Bluetooth Master,
you have to implement the presentation protocol inside of the Shimmer nesC application. This protocol is a way
to inform the Bt Master about the data format used when sending messages.
## Presentation protocol
The protocol is implemented by sending a simple struct via Bluetooth as the very first message after successfully
establishing a connection with the Bt Master. Its format is the following:
```c
typedef char key_string[MAX_STR_LEN];
typedef struct {
uint8_t framesize;
uint8_t chunklen;
char format[MAX_FMT_LEN];
key_string keys[MAX_KEY_NUM];
} frameinfo;
```
The presentation frame is automatically interpreted from the BtStream, so you don't have to do anything from this side
of the communication.
## Callbacks
A series of callbacks can be used and set as properties to intercept certain events:
- **on_connect(mac: str, info: frameinfo) -> None**
Called when a mote identified by **mac** succesfully connects. All the information
regarding the exchanged data format, obtained through *the presentatin protocol* are
accessible via **info**.
- **on_message(mac: str, frame: Dict[str, Any]) -> None**
Called when a message is received from a mote identified by **mac**. The message is
returned as a dict with the keys previously obtained from the *presentation protocol*.
- **on_disconnect(mac: str, lost: bool) -> None**
Called when a mote identified by **mac** disconnects. If **lost** is true, the disconnect
event happened because the connection has been lost.
"""
from ._streams import BtStream, BtSlaveInputStream, BtMasterInputStream, Frameinfo
from ._slave import _slave_init, _slave_listen, _slave_close
from ._master import _master_listen, _master_close
from typing import Optional, Callable, Any, Dict, List, Union
import enum
__all__ = ["bt_init", "bt_listen", "bt_close", "Frameinfo", "BtMode", "BtStream",
"BtMasterInputStream", "BtSlaveInputStream"]
class BtMode(enum.Enum):
"""
Enum used to set the mode in which the library is acting towards the shimmer devices.
"""
MASTER = 0
SLAVE = 1
@property
def index(self):
"""
Returns a numerical representation of the enum values, where MASTER = 0, SLAVE = 1.
"""
return self.value
listen: List[Callable] = [_master_listen, _slave_listen]
close: List[Callable] = [_master_close, _slave_close]
_op_mode: Optional[BtMode] = None
_running: bool = False
def bt_init(mode: BtMode) -> None:
"""
Initializes the bluetooth server socket interface.
Call this at the beginning of your program.
"""
global _op_mode, _running
if _running:
raise ValueError("Trying to initialize an already started interface")
if mode == BtMode.SLAVE:
_slave_init()
_op_mode = mode
_running = True
def bt_listen(connect_handle: Optional[Callable[[str, Frameinfo], None]] = None,
message_handle: Optional[Callable[[str, Dict[str, Any]], None]] = None,
disconnect_handle: Optional[Callable[[str, bool], None]] = None,
**kwargs: Union[str, float]) -> None:
"""
Starts the listen loop, attaching the passed handlers as event callbacks to each
stream that is started. Various options can be passed as keyword arguments
depending on the stream type.
If the application is in master mode, you can specify the duration of the lookup
and scan operations using the following keyword arguments:
- **lookup_duration**: defaults to 5 seconds
- **scan_interval**: default to 5 seconds
"""
global _op_mode
if _op_mode is None or not _running:
raise ValueError("Listen operation on non initialized interface")
listen[_op_mode.index](connect_handle, message_handle, disconnect_handle, **kwargs)
def bt_close() -> None:
"""
Gracefully stops any open connection.
"""
global _op_mode, _running
if _op_mode is None:
raise ValueError("Trying to close a non initialized interface")
close[_op_mode.index]()
_op_mode = None
_running = False
Sub-modules
shimmer_listener.console_scripts
Functions
def bt_init(mode: BtMode) ‑> None
-
Initializes the bluetooth server socket interface. Call this at the beginning of your program.
Expand source code
def bt_init(mode: BtMode) -> None: """ Initializes the bluetooth server socket interface. Call this at the beginning of your program. """ global _op_mode, _running if _running: raise ValueError("Trying to initialize an already started interface") if mode == BtMode.SLAVE: _slave_init() _op_mode = mode _running = True
def bt_listen(connect_handle: Optional[Callable[[str, shimmer_listener._streams.Frameinfo], None]] = None, message_handle: Optional[Callable[[str, Dict[str, Any]], None]] = None, disconnect_handle: Optional[Callable[[str, bool], None]] = None, **kwargs: Union[str, float]) ‑> None
-
Starts the listen loop, attaching the passed handlers as event callbacks to each stream that is started. Various options can be passed as keyword arguments depending on the stream type.
If the application is in master mode, you can specify the duration of the lookup and scan operations using the following keyword arguments:
-
lookup_duration: defaults to 5 seconds
-
scan_interval: default to 5 seconds
Expand source code
def bt_listen(connect_handle: Optional[Callable[[str, Frameinfo], None]] = None, message_handle: Optional[Callable[[str, Dict[str, Any]], None]] = None, disconnect_handle: Optional[Callable[[str, bool], None]] = None, **kwargs: Union[str, float]) -> None: """ Starts the listen loop, attaching the passed handlers as event callbacks to each stream that is started. Various options can be passed as keyword arguments depending on the stream type. If the application is in master mode, you can specify the duration of the lookup and scan operations using the following keyword arguments: - **lookup_duration**: defaults to 5 seconds - **scan_interval**: default to 5 seconds """ global _op_mode if _op_mode is None or not _running: raise ValueError("Listen operation on non initialized interface") listen[_op_mode.index](connect_handle, message_handle, disconnect_handle, **kwargs)
-
def bt_close() ‑> None
-
Gracefully stops any open connection.
Expand source code
def bt_close() -> None: """ Gracefully stops any open connection. """ global _op_mode, _running if _op_mode is None: raise ValueError("Trying to close a non initialized interface") close[_op_mode.index]() _op_mode = None _running = False
Classes
class Frameinfo (framesize, lenchunks, format, keys)
-
A description of the format used by the shimmer device to communicate. The data received through the presentation protocol at startup is contained in an instance of this class.
Expand source code
class Frameinfo(namedtuple("frameinfo", ["framesize", "lenchunks", "format", "keys"])): """A description of the format used by the shimmer device to communicate. The data received through the presentation protocol at startup is contained in an instance of this class.""" pass
Ancestors
- shimmer_listener._streams.frameinfo
- builtins.tuple
class BtMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enum used to set the mode in which the library is acting towards the shimmer devices.
Expand source code
class BtMode(enum.Enum): """ Enum used to set the mode in which the library is acting towards the shimmer devices. """ MASTER = 0 SLAVE = 1 @property def index(self): """ Returns a numerical representation of the enum values, where MASTER = 0, SLAVE = 1. """ return self.value
Ancestors
- enum.Enum
Class variables
var MASTER
var SLAVE
Instance variables
var index
-
Returns a numerical representation of the enum values, where MASTER = 0, SLAVE = 1.
Expand source code
@property def index(self): """ Returns a numerical representation of the enum values, where MASTER = 0, SLAVE = 1. """ return self.value
class BtStream (mac: str)
-
Abstraction of a Bluetooth input stream coming from a mote with given mac over a given socket. A series of callbacks can be used and set as properties to intercept certain events:
-
on_connect(mac: str, info: frameinfo) -> None
Called when a mote identified by mac succesfully connects. All the information regarding the exchanged data format, obtained through the presentatin protocol are accessible via info.
-
on_message(mac: str, frame: Dict[str, Any]) -> None
Called when a message is received from a mote identified by mac. The message is returned as a dict with the keys previously obtained from the presentation protocol.
-
on_disconnect(mac: str, lost: bool) -> None
Called when a mote identified by mac disconnects. If lost is true, the disconnect event happened because the connection has been lost.
Abstract class that describes and implements common functionalities of the Bt streams: the start/stop mechanism, callbacks, identification via Bt mac address.
Expand source code
class BtStream(ABC): """ Abstraction of a Bluetooth input stream coming from a mote with given mac over a given socket. A series of callbacks can be used and set as properties to intercept certain events: - **on_connect(mac: str, info: frameinfo) -> None** Called when a mote identified by **mac** succesfully connects. All the information regarding the exchanged data format, obtained through *the presentatin protocol* are accessible via **info**. - **on_message(mac: str, frame: Dict[str, Any]) -> None** Called when a message is received from a mote identified by **mac**. The message is returned as a dict with the keys previously obtained from the *presentation protocol*. - **on_disconnect(mac: str, lost: bool) -> None** Called when a mote identified by **mac** disconnects. If **lost** is true, the disconnect event happened because the connection has been lost. """ def __init__(self, mac: str): """ Abstract class that describes and implements common functionalities of the Bt streams: the start/stop mechanism, callbacks, identification via Bt mac address. """ super().__init__() self._mac = mac self._running = False # Callbacks self._on_connect: Optional[Callable[[str, Frameinfo], None]] = None self._on_message: Optional[Callable[[str, Dict[str, Any]], None]] = None self._on_disconnect: Optional[Callable[[str, bool], None]] = None @property def on_connect(self): return self._on_connect @on_connect.setter def on_connect(self, callback): self._on_connect = callback @property def on_message(self): return self._on_message @on_message.setter def on_message(self, callback): self._on_message = callback @property def on_disconnect(self): return self._on_disconnect @on_disconnect.setter def on_disconnect(self, callback): self._on_disconnect = callback @property def open(self) -> bool: """ Property that is True if the BtStream was started and is currently active. """ return self._running def stop(self) -> None: """ Stops the Input stream: N.B. if this is called while when the stream loop is doing work, that iteration of the loop won't be stopped. """ if self._running: self._running = False @abstractmethod def _loop(self): pass def start(self): """ Starts the Input stream, non blocking. """ if not self._running: Thread(target=self._loop).start() def loop_forever(self): if not self._running: self._loop()
Ancestors
- abc.ABC
Subclasses
- shimmer_listener._streams.BtMasterInputStream
- shimmer_listener._streams.BtSlaveInputStream
Instance variables
var on_connect
-
Expand source code
@property def on_connect(self): return self._on_connect
var on_message
-
Expand source code
@property def on_message(self): return self._on_message
var on_disconnect
-
Expand source code
@property def on_disconnect(self): return self._on_disconnect
var open : bool
-
Property that is True if the BtStream was started and is currently active.
Expand source code
@property def open(self) -> bool: """ Property that is True if the BtStream was started and is currently active. """ return self._running
Methods
def stop(self) ‑> None
-
Stops the Input stream: N.B. if this is called while when the stream loop is doing work, that iteration of the loop won't be stopped.
Expand source code
def stop(self) -> None: """ Stops the Input stream: N.B. if this is called while when the stream loop is doing work, that iteration of the loop won't be stopped. """ if self._running: self._running = False
def start(self)
-
Starts the Input stream, non blocking.
Expand source code
def start(self): """ Starts the Input stream, non blocking. """ if not self._running: Thread(target=self._loop).start()
def loop_forever(self)
-
Expand source code
def loop_forever(self): if not self._running: self._loop()
-
class BtMasterInputStream (mac: str, sock: bluetooth.msbt.BluetoothSocket, uuid: str)
-
Abstraction for the data input stream coming from a master device running on a shimmer2 mote.
Initializes a new input stream from a Master mote.
Expand source code
class BtMasterInputStream(BtStream): """ Abstraction for the data input stream coming from a master device running on a shimmer2 mote. """ # Standard framesize in the tinyos Bluetooth implementation taken from the shimmer apps repo # In this case a frame contains exactly one chunk, hence framesize = chunklen _framesize = 22 _slave_frameinfo = Frameinfo(_framesize, _framesize, "HHHHHHHB", ["mac", "accel_x", "accel_y", "accel_z", "gyro_x", "gyro_y", "gyro_z"]) def __init__(self, mac: str, sock: bluetooth.BluetoothSocket, uuid: str): """ Initializes a new input stream from a Master mote. """ super().__init__(mac=mac) self._uuid = uuid self._sock = sock def _loop(self) -> None: if self.on_connect: self.on_connect(self._mac, self._slave_frameinfo) self._running = True try: while self._running: numbytes = 0 ddata = bytearray() while numbytes < self._framesize: ddata += bytearray(self._sock.recv(self._framesize)) numbytes = len(ddata) # the following data split refers to the 22 B long frame structure discussed earlier # the first seven and the last two fields (crc, end) are ignored since we don't need them # in this particular app data = ddata[0:self._framesize] (accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, _, _) = struct.unpack("HHHHHHHB", data[7:22]) fmt_data = SlaveDataTuple(self._mac, accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z) if self.on_message: self.on_message(self._mac, fmt_data._asdict()) if self.on_disconnect: self.on_disconnect(self._mac, False) except bluetooth.btcommon.BluetoothError: if self.on_disconnect: self.on_disconnect(self._mac, True) finally: self._running = False self._sock.close()
Ancestors
- shimmer_listener._streams.BtStream
- abc.ABC
class BtSlaveInputStream (mac: str)
-
Abstraction for the data input stream coming from a slave device running on a shimmer2 mote.
The first frame sent by the shimmer is special and contains information about its data format. The size is fixed by a simple protocol to 112 B. Its internal format is made as such:
- 1 unsigned byte containing the message frame size that is going to be used in the communication.
- 1 unsigned byte containing the chunk size of each chunk packed into a frame.
- a string of length 10, containing the format of packed data contained in each chunk.
- a string of length 100, containing a max of 10, 10 char long key names of each value packed in the chunk.
Initializes a new input stream from a Master mote.
Expand source code
class BtSlaveInputStream(BtStream): """ Abstraction for the data input stream coming from a slave device running on a shimmer2 mote. The first frame sent by the shimmer is special and contains information about its data format. The size is fixed by a simple protocol to 112 B. Its internal format is made as such: - 1 unsigned byte containing the message frame size that is going to be used in the communication. - 1 unsigned byte containing the chunk size of each chunk packed into a frame. - a string of length 10, containing the format of packed data contained in each chunk. - a string of length 100, containing a max of 10, 10 char long key names of each value packed in the chunk. """ _pres_frame_size = 112 _pres_frame_fmt = "BB10s100s" def __init__(self, mac: str): """ Initializes a new input stream from a Master mote. """ super().__init__(mac=mac) self._info = None self._sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) def _to_dict(self, raw_data: tuple): d = {} for idx, key in enumerate(self._info.keys): d[key] = raw_data[idx] return d def _init_frameinfo(self, info: bytearray): fmt_unp = struct.unpack(BtSlaveInputStream._pres_frame_fmt, info) framesize = fmt_unp[0] lenchunks = fmt_unp[1] chunk_fmt = fmt_unp[2].decode().rstrip("\x00") unfmt_keys = fmt_unp[3].decode() data_keys = [] key_len = 10 keys_len = 100 for base in range(0, keys_len, key_len): data_keys.append(unfmt_keys[base:base + key_len].rstrip("\x00")) data_keys = [elem for elem in data_keys if elem != ''] # Check for the validity of the information passed by the mote if framesize <= 0 or lenchunks <= 0 or lenchunks > framesize \ or framesize % lenchunks != 0 or struct.calcsize(chunk_fmt) != lenchunks \ or len(data_keys) != len(struct.unpack(chunk_fmt, b'\x00' * struct.calcsize(chunk_fmt))): raise ValueError self._info = Frameinfo(framesize, lenchunks, chunk_fmt, data_keys) def _loop(self): try: # Init BT socket and loop # BUG in Win10 implementation, this will try to connect to previously paired # devices, even when not on or close enough, raising an OSError rf_port = 1 self._sock.connect((self._mac, rf_port)) self._running = True # Wait for a 112 B presentation frame fmt_len = 0 fmt_frame = bytearray() while fmt_len < BtSlaveInputStream._pres_frame_size: fmt_frame += bytearray(self._sock.recv(BtSlaveInputStream._pres_frame_size)) fmt_len = len(fmt_frame) # Parse presentation and notify the on connect callback self._init_frameinfo(fmt_frame) if self.on_connect: self.on_connect(self._mac, self._info) # Data reading loop based on frameinfo format while self._running: data_len = 0 data = bytearray() while data_len < self._info.framesize: data += bytearray(self._sock.recv(self._info.framesize)) data_len = len(data) for idx in range(0, self._info.framesize, self._info.lenchunks): raw_data = struct.unpack(self._info.format, data[idx:idx + self._info.lenchunks]) # Msg received: Notify on message callback if self.on_message: self.on_message(self._mac, self._to_dict(raw_data)) if self.on_disconnect: self.on_disconnect(self._mac, False) except bluetooth.btcommon.BluetoothError: if self._running and self.on_disconnect: self.on_disconnect(self._mac, True) else: raise bluetooth.BluetoothError(f"BT MAC {self._mac}: couldn't connect to the bluetooth interface") except (ValueError, struct.error): if self._running and self.on_disconnect: self.on_disconnect(self._mac, True) else: raise ConnectionError(f"BT MAC {self._mac}: error in decoding presentation frame!") finally: self._running = False self._sock.close()
Ancestors
- shimmer_listener._streams.BtStream
- abc.ABC