developer_intro.md

Flexconnect Developer Guide

This document provides an overview of what customer developers need to know to work with Flexconnect, including the architecture, key components, and how to get started with development.

Overview

Flexconnect is a cloud-based platform that enables data collection, processing, and visualization for various applications.

Developers can build their own applications leveraging the ingestion features of Flexconnect without the need to manage the underlying infrastructure.

Architecture

High Level Architecture

The system is split into 4 main layers:

  1. Data Source Layer: This layer consists of the devices that collect data and send it to the cloud. The devices can be any telemetry device that supports MQTT protocol. Each device is identified by a set of unique credentials provided by Audesse to be used for the authentication of the device and the encryption of the data.
    • The devices publish the telemetry data to a specific topic in the MQTT broker. The topic is structured in a way that allows the system to identify the device and the type of data being sent.
    • The device is recommended to compress the data before sending it to the cloud to reduce the amount of data being sent over the network. The compression can be done using gzip.
  2. Ingestion Layer: This layer is responsible for receiving the data from the devices, processing it, and storing it in a time-series database. Depending on the type of data, the ingestion layer can also perform additional processing, such as filtering, alerting, and data transformation.
    1. This layer can accept data in multiple formats depending on the use case, see Data Formats for more details.
  3. Storage Layer: This layer is responsible for storing the data collected from the devices. The data is stored in a time-series database that allows for efficient querying and visualization of the data. The data is stored in a way that allows for easy retrieval and analysis.
  4. Presentation Layer: Audesse provides a restful API that allows developers to query the data stored in the time-series database to use for their own frontend deployments.
    • Audesse provides a Grafana-based dashboard that allows users to visualize the data collected from the devices. The dashboard is customizable and can be used to create various visualizations, such as graphs, charts, and tables.

Connecting to Flexconnect

To connect to Flexconnect, the device must use the provided certificates and credentials to authenticate with the MQTT broker:

  • Root CA Certificate: The root CA certificate is used to verify the identity of the MQTT broker. The device must have this certificate installed to establish a secure connection.
  • Client Key: The client key is used to authenticate the device with the MQTT broker. The device must use this key to sign the connection request.
  • Client Certificate: The client certificate is used to identify the device to the MQTT broker. The device must present this certificate when establishing a connection.
# Example of connecting to Flexconnect using AWS IoT SDK in Python [AWSIoTPythonSDK](https://github.com/aws/aws-iot-device-sdk-python-v2) from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient client = AWSIoTMQTTClient(device_id) client.configureEndpoint(hostName=client_config["endpoint"], portNumber=8883) client.configureCredentials(CAFilePath="./root-CA.crt", KeyPath="./device_key.key", CertificatePath="./device_cert.pem") client.connect()

Data Formats

The Ingestion Layer of Flexconnect supports two main data formats for telemetry data and their compressed versions:

  • JSON: The data is sent in JSON format according to the Flexconnect JSON Schema. These are the main fields to look for when building your json payload:
    • b64: The only field in the message if the data is compressed. This field contains the base64 encoded string of the compressed data. Should not be used if the data is not compressed.
    • device_id: The unique identifier of the device sending the data.
    • token: The authentication token provided by Audesse to the device. This token is used to authenticate the device with the system.
    • tags: An array of tags for categorizing and filtering data. Each tag is a key-value string pair.
    • points: An array of data points. Each point includes:
      • timestamp: The data point timestamp in Unix epoch format (milliseconds since 1970-01-01T00:00:00Z).
      • tags: Tags specific to this data point.
      • fields: The actual measurement data. Each field contains:
        • name: The field identifier.
        • value: The field value (string or number).
  • Binary Message: Binary data compatible with the CAN protocol. This format requires a DBC file to define the data structure. The message format is:

Binary Message

Field Description Value Example Size (Bytes) Endianness
Token Length Length of the authentication token 0x40 4 Big
Token Authentication token provided by Audesse - Variable (Token Length) -
CAN points Repeated CAN points see Can Point -

CAN Point Header

Each CAN point contains a header followed by one or more CAN readings

Field Description Value Example Size (Bytes) Endianness
Timestamp Data point timestamp in Unix epoch format (milliseconds since 1970-01-01T00:00:00Z) 1622547800000 8 Little
Can Readings Count Number of CAN readings in this point 0x02 2 Little
Length Total length of the CAN point in bytes (including header and readings) 0x2A 2 Little

CAN Point Payload

A CAN point contains multiple readings, each with the following structure:

Field Description Value Example Size (Bytes) Endianness
CAN ID Unique identifier for the CAN message 0x123 4 Little
BUS|dlc Bus number (upper 3 bits) and data length code (DLC, lower 5 bits) 0b01101101 (Bus=3,dlc=13) 1 -
Data CAN data payload in CAN FD format as defined in the associated DBC file 0x01020304 Variable -

Note

  • For compressed data, prefix the message with byte 0xFC to indicate compression. The remainder contains the compressed binary message data.

Example Usage

JSON Format Example

Here is an example of how to send telemetry data to Flexconnect using the JSON format:

# Import necessary libraries # Json library for handling JSON data and dumping it to a string import json # gzip library for compressing the data import gzip # base64 library for encoding the compressed data import base64 MQTT_URL = "mqtt.audesseinc.com" MQTT_PORT = 8883 DEVICE_ID = "provided_device_id" TOKEN = "provided_token" ROOT_CA_PATH = "./root-CA.crt" CLIENT_KEY_PATH = "./device_key.key" CLIENT_CERT_PATH = "./device_cert.pem" MQTT_TOPIC = "flexconnect/mqtt_topic" # Create a JSON payload data = { "device_id": DEVICE_ID, "token": TOKEN, "tags": [{"tag1": "value1"}, {"tag2": "value2"}], "points": [ { "timestamp": 1622547800000, "tags": [{"point_tag1": "point_value1"}], "fields": [ {"name": "temperature", "value": 22.5}, {"name": "humidity", "value": 45.0} ] } ] } # Convert the data to a JSON string json_data = json.dumps(data) # Compress the JSON data compressed_data = gzip.compress(json_data.encode('utf-8')) # Encode the compressed data to base64 b64_data = base64.b64encode(compressed_data).decode('utf-8') # Create the final message final_message_uncompressed = json_data final_message_compressed = json.dumps({"b64": b64_data}) # Connect to the MQTT broker and publish the message from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient client = AWSIoTMQTTClient(DEVICE_ID) client.configureEndpoint(MQTT_URL, MQTT_PORT) client.configureCredentials(ROOT_CA_PATH, CLIENT_KEY_PATH, CLIENT_CERT_PATH) client.connect() # Publish the message to the MQTT topic client.publishAsync(MQTT_TOPIC, final_message_compressed, QoS=1) sleep(1) # Wait for the message to be sent client.disconnect()

Binary Format Example

Here is an example of how to send telemetry data to Flexconnect using the binary format:

# Import necessary libraries import struct import gzip from time import time from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient MQTT_URL = "mqtt.audesseinc.com" MQTT_PORT = 8883 DEVICE_ID = "provided_device_id" TOKEN = "provided_token" ROOT_CA_PATH = "./root-CA.crt" CLIENT_KEY_PATH = "./device_key.key" CLIENT_CERT_PATH = "./device_cert.pem" MQTT_TOPIC = "flexconnect/mqtt_topic_binary" LITTLE_ENDIAN = '<' # Little Endian Operator BIG_ENDIAN = '>' # Big Endian Operator # Create binary message according to specification def create_binary_message(token, can_points): # Token section token_bytes = token.encode('utf-8') token_length = len(token_bytes) # Build the message message = bytearray() # Token Length (4 bytes, big endian) message.extend(struct.pack(BIG_ENDIAN + 'I', token_length)) # Token (variable length) message.extend(token_bytes) # CAN Points for point in can_points: timestamp_ms = point['timestamp'] readings = point['readings'] readings_count = len(readings) # Calculate point length (header + all readings) point_length = 12 # Header size (8 + 2 + 2) for reading in readings: point_length += 5 + len(reading['data']) # CAN ID (4) + BUS|DLC (1) + Data # CAN Point Header message.extend(struct.pack(LITTLE_ENDIAN + 'Q', timestamp_ms)) # Timestamp (8 bytes, little endian) message.extend(struct.pack(LITTLE_ENDIAN + 'H', readings_count)) # CAN Readings Count (2 bytes, little endian) message.extend(struct.pack(LITTLE_ENDIAN + 'H', point_length)) # Length (2 bytes, little endian) # CAN Point Readings for reading in readings: can_id = reading['can_id'] bus = reading['bus'] dlc = reading['dlc'] data = reading['data'] # CAN ID (4 bytes, little endian) message.extend(struct.pack(LITTLE_ENDIAN + 'I', can_id)) # BUS|DLC (1 byte: upper 3 bits = bus, lower 5 bits = dlc) bus_dlc = (bus << 5) | (dlc & 0x1F) message.extend(struct.pack('B', bus_dlc)) # Data (variable length) message.extend(data) return bytes(message) # Example CAN points data with multiple readings per point now = int(time() * 1000) # Current time in milliseconds can_points = [ { 'timestamp': now, # Current time in milliseconds 'readings': [ { 'can_id': 0x123, 'bus': 1, 'dlc': 8, 'data': b'\x01\x02\x03\x04\x05\x06\x07\x08' }, { 'can_id': 0x456, 'bus': 1, 'dlc': 4, 'data': b'\xAA\xBB\xCC\xDD' } ] }, { 'timestamp': now + 100, # 100ms later 'readings': [ { 'can_id': 0x789, 'bus': 2, 'dlc': 6, 'data': b'\x11\x22\x33\x44\x55\x66' } ] } ] # Create the binary message binary_message = create_binary_message(TOKEN, can_points) # Optional: Compress the data # For compressed data, prefix with 0xFC compressed_data = gzip.compress(binary_message) final_message = b'\xFC' + compressed_data # Connect to MQTT broker and publish client = AWSIoTMQTTClient(DEVICE_ID) client.configureEndpoint(MQTT_URL, MQTT_PORT) client.configureCredentials(ROOT_CA_PATH, CLIENT_KEY_PATH, CLIENT_CERT_PATH) client.connect() # Publish the binary message client.publish(MQTT_TOPIC, final_message, QoS=1) sleep(1) # Wait for the message to be sent client.disconnect()