Flexconnect Developer Guide
This document provides an overview of what customer developers need to know to work with Flexconnect, including the architecture, key components, and how to get started with development.
Overview
Flexconnect is a cloud-based platform that enables data collection, processing, and visualization for various applications.
Developers can build their own applications leveraging the ingestion features of Flexconnect without the need to manage the underlying infrastructure.
Architecture
The system is split into 4 main layers:
- Data Source Layer: This layer consists of the devices that collect data and send it to the cloud. The devices can be any telemetry device that supports MQTT protocol. Each device is identified by a set of unique credentials provided by Audesse to be used for the authentication of the device and the encryption of the data.
- The devices publish the telemetry data to a specific topic in the MQTT broker. The topic is structured in a way that allows the system to identify the device and the type of data being sent.
- The device is recommended to compress the data before sending it to the cloud to reduce the amount of data being sent over the network. The compression can be done using gzip.
- Ingestion Layer: This layer is responsible for receiving the data from the devices, processing it, and storing it in a time-series database. Depending on the type of data, the ingestion layer can also perform additional processing, such as filtering, alerting, and data transformation.
- This layer can accept data in multiple formats depending on the use case, see Data Formats for more details.
- Storage Layer: This layer is responsible for storing the data collected from the devices. The data is stored in a time-series database that allows for efficient querying and visualization of the data. The data is stored in a way that allows for easy retrieval and analysis.
- Presentation Layer: Audesse provides a restful API that allows developers to query the data stored in the time-series database to use for their own frontend deployments.
- Audesse provides a Grafana-based dashboard that allows users to visualize the data collected from the devices. The dashboard is customizable and can be used to create various visualizations, such as graphs, charts, and tables.
Connecting to Flexconnect
To connect to Flexconnect, the device must use the provided certificates and credentials to authenticate with the MQTT broker:
- Root CA Certificate: The root CA certificate is used to verify the identity of the MQTT broker. The device must have this certificate installed to establish a secure connection.
- Client Key: The client key is used to authenticate the device with the MQTT broker. The device must use this key to sign the connection request.
- Client Certificate: The client certificate is used to identify the device to the MQTT broker. The device must present this certificate when establishing a connection.
# Example of connecting to Flexconnect using AWS IoT SDK in Python [AWSIoTPythonSDK](https://github.com/aws/aws-iot-device-sdk-python-v2)
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
client = AWSIoTMQTTClient(device_id)
client.configureEndpoint(hostName=client_config["endpoint"], portNumber=8883)
client.configureCredentials(CAFilePath="./root-CA.crt", KeyPath="./device_key.key", CertificatePath="./device_cert.pem")
client.connect()
Data Formats
The Ingestion Layer of Flexconnect supports two main data formats for telemetry data and their compressed versions:
- JSON: The data is sent in JSON format according to the Flexconnect JSON Schema. These are the main fields to look for when building your json payload:
b64: The only field in the message if the data is compressed. This field contains the base64 encoded string of the compressed data. Should not be used if the data is not compressed.device_id: The unique identifier of the device sending the data.token: The authentication token provided by Audesse to the device. This token is used to authenticate the device with the system.tags: An array of tags for categorizing and filtering data. Each tag is a key-value string pair.points: An array of data points. Each point includes:timestamp: The data point timestamp in Unix epoch format (milliseconds since 1970-01-01T00:00:00Z).tags: Tags specific to this data point.fields: The actual measurement data. Each field contains:name: The field identifier.value: The field value (string or number).
- Binary Message: Binary data compatible with the CAN protocol. This format requires a DBC file to define the data structure. The message format is:
Binary Message
| Field | Description | Value Example | Size (Bytes) | Endianness |
|---|---|---|---|---|
| Token Length | Length of the authentication token | 0x40 | 4 | Big |
| Token | Authentication token provided by Audesse | - | Variable (Token Length) | - |
| CAN points | Repeated CAN points | see Can Point | - |
CAN Point Header
Each CAN point contains a header followed by one or more CAN readings
| Field | Description | Value Example | Size (Bytes) | Endianness |
|---|---|---|---|---|
| Timestamp | Data point timestamp in Unix epoch format (milliseconds since 1970-01-01T00:00:00Z) | 1622547800000 |
8 | Little |
| Can Readings Count | Number of CAN readings in this point | 0x02 |
2 | Little |
| Length | Total length of the CAN point in bytes (including header and readings) | 0x2A |
2 | Little |
CAN Point Payload
A CAN point contains multiple readings, each with the following structure:
| Field | Description | Value Example | Size (Bytes) | Endianness |
|---|---|---|---|---|
| CAN ID | Unique identifier for the CAN message | 0x123 |
4 | Little |
| BUS|dlc | Bus number (upper 3 bits) and data length code (DLC, lower 5 bits) | 0b01101101 (Bus=3,dlc=13) |
1 | - |
| Data | CAN data payload in CAN FD format as defined in the associated DBC file | 0x01020304 |
Variable | - |
Note
- For compressed data, prefix the message with byte
0xFCto indicate compression. The remainder contains the compressed binary message data.
Example Usage
JSON Format Example
Here is an example of how to send telemetry data to Flexconnect using the JSON format:
# Import necessary libraries
# Json library for handling JSON data and dumping it to a string
import json
# gzip library for compressing the data
import gzip
# base64 library for encoding the compressed data
import base64
MQTT_URL = "mqtt.audesseinc.com"
MQTT_PORT = 8883
DEVICE_ID = "provided_device_id"
TOKEN = "provided_token"
ROOT_CA_PATH = "./root-CA.crt"
CLIENT_KEY_PATH = "./device_key.key"
CLIENT_CERT_PATH = "./device_cert.pem"
MQTT_TOPIC = "flexconnect/mqtt_topic"
# Create a JSON payload
data = {
"device_id": DEVICE_ID,
"token": TOKEN,
"tags": [{"tag1": "value1"}, {"tag2": "value2"}],
"points": [
{
"timestamp": 1622547800000,
"tags": [{"point_tag1": "point_value1"}],
"fields": [
{"name": "temperature", "value": 22.5},
{"name": "humidity", "value": 45.0}
]
}
]
}
# Convert the data to a JSON string
json_data = json.dumps(data)
# Compress the JSON data
compressed_data = gzip.compress(json_data.encode('utf-8'))
# Encode the compressed data to base64
b64_data = base64.b64encode(compressed_data).decode('utf-8')
# Create the final message
final_message_uncompressed = json_data
final_message_compressed = json.dumps({"b64": b64_data})
# Connect to the MQTT broker and publish the message
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
client = AWSIoTMQTTClient(DEVICE_ID)
client.configureEndpoint(MQTT_URL, MQTT_PORT)
client.configureCredentials(ROOT_CA_PATH, CLIENT_KEY_PATH, CLIENT_CERT_PATH)
client.connect()
# Publish the message to the MQTT topic
client.publishAsync(MQTT_TOPIC, final_message_compressed, QoS=1)
sleep(1) # Wait for the message to be sent
client.disconnect()
Binary Format Example
Here is an example of how to send telemetry data to Flexconnect using the binary format:
# Import necessary libraries
import struct
import gzip
from time import time
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
MQTT_URL = "mqtt.audesseinc.com"
MQTT_PORT = 8883
DEVICE_ID = "provided_device_id"
TOKEN = "provided_token"
ROOT_CA_PATH = "./root-CA.crt"
CLIENT_KEY_PATH = "./device_key.key"
CLIENT_CERT_PATH = "./device_cert.pem"
MQTT_TOPIC = "flexconnect/mqtt_topic_binary"
LITTLE_ENDIAN = '<' # Little Endian Operator
BIG_ENDIAN = '>' # Big Endian Operator
# Create binary message according to specification
def create_binary_message(token, can_points):
# Token section
token_bytes = token.encode('utf-8')
token_length = len(token_bytes)
# Build the message
message = bytearray()
# Token Length (4 bytes, big endian)
message.extend(struct.pack(BIG_ENDIAN + 'I', token_length))
# Token (variable length)
message.extend(token_bytes)
# CAN Points
for point in can_points:
timestamp_ms = point['timestamp']
readings = point['readings']
readings_count = len(readings)
# Calculate point length (header + all readings)
point_length = 12 # Header size (8 + 2 + 2)
for reading in readings:
point_length += 5 + len(reading['data']) # CAN ID (4) + BUS|DLC (1) + Data
# CAN Point Header
message.extend(struct.pack(LITTLE_ENDIAN + 'Q', timestamp_ms)) # Timestamp (8 bytes, little endian)
message.extend(struct.pack(LITTLE_ENDIAN + 'H', readings_count)) # CAN Readings Count (2 bytes, little endian)
message.extend(struct.pack(LITTLE_ENDIAN + 'H', point_length)) # Length (2 bytes, little endian)
# CAN Point Readings
for reading in readings:
can_id = reading['can_id']
bus = reading['bus']
dlc = reading['dlc']
data = reading['data']
# CAN ID (4 bytes, little endian)
message.extend(struct.pack(LITTLE_ENDIAN + 'I', can_id))
# BUS|DLC (1 byte: upper 3 bits = bus, lower 5 bits = dlc)
bus_dlc = (bus << 5) | (dlc & 0x1F)
message.extend(struct.pack('B', bus_dlc))
# Data (variable length)
message.extend(data)
return bytes(message)
# Example CAN points data with multiple readings per point
now = int(time() * 1000) # Current time in milliseconds
can_points = [
{
'timestamp': now, # Current time in milliseconds
'readings': [
{
'can_id': 0x123,
'bus': 1,
'dlc': 8,
'data': b'\x01\x02\x03\x04\x05\x06\x07\x08'
},
{
'can_id': 0x456,
'bus': 1,
'dlc': 4,
'data': b'\xAA\xBB\xCC\xDD'
}
]
},
{
'timestamp': now + 100, # 100ms later
'readings': [
{
'can_id': 0x789,
'bus': 2,
'dlc': 6,
'data': b'\x11\x22\x33\x44\x55\x66'
}
]
}
]
# Create the binary message
binary_message = create_binary_message(TOKEN, can_points)
# Optional: Compress the data
# For compressed data, prefix with 0xFC
compressed_data = gzip.compress(binary_message)
final_message = b'\xFC' + compressed_data
# Connect to MQTT broker and publish
client = AWSIoTMQTTClient(DEVICE_ID)
client.configureEndpoint(MQTT_URL, MQTT_PORT)
client.configureCredentials(ROOT_CA_PATH, CLIENT_KEY_PATH, CLIENT_CERT_PATH)
client.connect()
# Publish the binary message
client.publish(MQTT_TOPIC, final_message, QoS=1)
sleep(1) # Wait for the message to be sent
client.disconnect()