|
- /*
- * mqtt_client.c
- *
- * Created on: Jun 10, 2024
- * Author: jakubski
- */
- #include <stdio.h>
- #include "FreeRTOS.h"
- #include "task.h"
- #include "main.h"
- #include "cmsis_os.h"
- #include <string.h>
- #include "lwip.h"
- #include "lwip/api.h"
- #include "MQTTClient.h"
- #include "MQTTInterface.h"
- #include "node-red-config.h"
- #include "measurements.h"
- #include "cJSON.h"
- #include "interprocess_data.h"
- #include "uart_tasks.h"
- #define MQTT_BUFSIZE 1024
- char* const subscribeTopicNames[MASTER_BOARD + SLAVES_COUNT] = { "Set/0", "Set/1", "Set/2", "Set/3", "Set/4" };
- #define MAX_COMMANDS_IN_MQTT_PAYLOAD 17
- char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = { "fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent", "clearPeakElectricalMeasurements", "mainBoardRelay",
- "setEncoderXValue", "setEncoderYValue", "setVoltageMeasGains", "setVoltageMeasOffsets", "setCurrentMeasGains", "setCurrentMeasOffsets", "resetSystem", "setPositionX", "setPositionY" };
- enum _Topics
- {
- fanSpeedTopic = 0,
- motorXonTopic,
- motorYonTopic,
- diodeTopic,
- motorXMaxCurrentTopic,
- motorYMaxCurrentTopic,
- clearPeakElectricalMeasurementsTopic,
- mainBoardRelayTopic,
- setEncoderXValue,
- setEncoderYValue,
- setVoltageMeasGains,
- setVoltageMeasOffsets,
- setCurrentMeasGains,
- setCurrentMeasOffsets,
- resetSystem,
- setPositionX,
- setPositionY
- };
- enum _BoardNoOverTopic
- {
- main_board = 0,
- board_1,
- board_2,
- board_3,
- board_4,
- unknownBoard
- };
- typedef enum _BoardNoOverTopic BoardNoOverTopic;
- extern struct netif gnetif; //extern gnetif
- extern UartTaskData uart1TaskData; // Board 1
- extern UartTaskData uart3TaskData; // Board 2
- extern UartTaskData uart6TaskData; // Board 3
- extern UartTaskData uart2TaskData; // Board 4
- extern UartTaskData uart8TaskData; // Debug
- extern osTimerId_t relay1TimerHandle;
- extern osTimerId_t relay2TimerHandle;
- extern osTimerId_t relay3TimerHandle;
- extern osTimerId_t relay4TimerHandle;
- const osThreadAttr_t mqttClientSubTaskAttr =
- { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
- .priority = (osPriority_t) osPriorityNormal, };
- const osThreadAttr_t mqttClientPubTaskAttr =
- { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
- .priority = (osPriority_t) osPriorityNormal, };
- osThreadId mqttClientSubTaskHandle; //mqtt client task handle
- osThreadId mqttClientPubTaskHandle; //mqtt client task handle
- Network net; //mqtt network
- MQTTClient mqttClient; //mqtt client
- uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
- uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
- void RelayCtrl(int32_t relayNumber, int32_t relayTimeOn);
- void MqttClientSubTask (void* argument); // mqtt client subscribe task function
- void MqttClientPubTask (void* argument); // mqtt client publish task function
- uint32_t MqttConnectBroker (void); // mqtt broker connect function
- void MqttMessageArrived (MessageData* msg); // mqtt message callback function
- void MqttClientSubTask (void* argument) {
- uint8_t connectionTryCounter = 0;
- while (1) {
- // waiting for valid ip address
- if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
- {
- osDelay (pdMS_TO_TICKS (1000));
- continue;
- } else {
- printf ("DHCP/Static IP O.K.\n");
- break;
- }
- }
- while (1) {
- if (!mqttClient.isconnected) {
- // try to connect to the broker
- if (MqttConnectBroker () != MQTT_SUCCESS) {
- osDelay (pdMS_TO_TICKS (1000));
- connectionTryCounter++;
- if (connectionTryCounter > 10) {
- __disable_irq ();
- NVIC_SystemReset ();
- }
- }
- } else {
- connectionTryCounter = 0;
- MQTTYield (&mqttClient, 500); // handle timer
- osDelay (pdMS_TO_TICKS (100));
- }
- }
- }
- void MqttClientPubTask (void* argument) {
- char messageBuffer[512] = { 0x00 };
- char topicTextBuffer[32] = { 0x00 };
- uint32_t bytesInBuffer = 0;
- uint8_t boardNumber = 0;
- MQTTMessage message;
- while (1) {
- if (mqttClient.isconnected) {
- if (is_link_up ()) {
- for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
- osMutexAcquire (resMeasurementsMutex, osWaitForever);
- RESMeasurements* resMeas = &resMeasurements[boardNumber];
- sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
- bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]);
- osMutexRelease (resMeasurementsMutex);
- message.payload = (void*)messageBuffer;
- message.payloadlen = strlen (messageBuffer);
- MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
- }
- for (boardNumber = 0; boardNumber < SLAVES_COUNT + 1; boardNumber++) {
- if (boardNumber > 0) {
- osMutexAcquire (sensorsInfoMutex, osWaitForever);
- SesnorsInfo* sensors = &sensorsInfo[boardNumber - 1];
- sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
- bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderX\":%.2f, ", sensors->pvEncoderX);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderY\":%.2f, ", sensors->pvEncoderY);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchUp\":%d, ", sensors->limitXSwitchUp);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchDown\":%d, ", sensors->limitXSwitchDown);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchCenter\":%d, ", sensors->limitXSwitchCenter);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchUp\":%d, ", sensors->limitYSwitchUp);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchDown\":%d, ", sensors->limitYSwitchDown);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchCenter\":%d, ", sensors->limitYSwitchCenter);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentXPosition\":%.2f, ", sensors->currentXPosition);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentYPosition\":%.2f, ", sensors->currentYPosition);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionXWeak\":%d, ", sensors->positionXWeak);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionYWeak\":%d, ", sensors->positionYWeak);
- bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
- osMutexRelease (sensorsInfoMutex);
- } else {
- sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
- uint8_t mainBoardPowerSupplyFailMask = ~((HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_4) << 1) | HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_2)) & 0x3;
- bytesInBuffer = sprintf (messageBuffer, "{\"powerSupplyFailMask\":%d}", mainBoardPowerSupplyFailMask);
- }
- message.payload = (void*)messageBuffer;
- message.payloadlen = strlen (messageBuffer);
- MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
- }
- }
- }
- osDelay (pdMS_TO_TICKS (1000));
- }
- }
- uint32_t MqttConnectBroker () {
- uint8_t boardNumber = 0;
- int ret;
- NewNetwork (&net);
- ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
- if (ret != MQTT_SUCCESS) {
- printf ("ConnectNetwork failed.\n");
- return -1;
- }
- MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.willFlag = 0;
- data.MQTTVersion = 3;
- data.clientID.cstring = "test_user1";
- data.username.cstring = "test_user1";
- data.password.cstring = "1234";
- data.keepAliveInterval = 100;
- data.cleansession = 1;
- ret = MQTTConnect (&mqttClient, &data);
- if (ret != MQTT_SUCCESS) {
- net_disconnect (&net);
- printf ("MQTTConnect failed. Code %d\n", ret);
- return ret;
- }
- for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
- ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
- if (ret != MQTT_SUCCESS) {
- net_disconnect (&net);
- printf ("MQTTSubscribe failed.\n");
- return ret;
- }
- }
- printf ("MQTT_ConnectBroker O.K.\n");
- return MQTT_SUCCESS;
- }
- void RelayCtrl (int32_t relayNumber, int32_t relayTimeOn) {
- switch (relayNumber) {
- case 1:
- if (relayTimeOn > 0) {
- osTimerStart (relay1TimerHandle, relayTimeOn * 1000);
- } else {
- osTimerStop (relay1TimerHandle);
- }
- if (relayTimeOn != 0) {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_SET);
- } else {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_RESET);
- }
- break;
- case 2:
- if (relayTimeOn > 0) {
- osTimerStart (relay2TimerHandle, relayTimeOn * 1000);
- } else {
- osTimerStop (relay2TimerHandle);
- }
- if (relayTimeOn != 0) {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_SET);
- } else {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_RESET);
- }
- break;
- case 3:
- if (relayTimeOn > 0) {
- osTimerStart (relay3TimerHandle, relayTimeOn * 1000);
- } else {
- osTimerStop (relay3TimerHandle);
- }
- if (relayTimeOn != 0) {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_SET);
- } else {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_RESET);
- }
- break;
- case 4:
- if (relayTimeOn > 0) {
- osTimerStart (relay4TimerHandle, relayTimeOn * 1000);
- } else {
- osTimerStop (relay4TimerHandle);
- }
- if (relayTimeOn != 0) {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_SET);
- } else {
- HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_RESET);
- }
- break;
- default: break;
- }
- }
- void MqttMessageArrived (MessageData* msg) {
- SerialProtocolCommands spCommand = spUnknown;
- BoardNoOverTopic topicForBoard = unknownBoard;
- uint8_t boardNumber = 0;
- MQTTMessage* message = msg->message;
- char topicName[32] = { 0 };
- memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
- for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
- if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
- topicForBoard = (BoardNoOverTopic)(boardNumber);
- break;
- }
- }
- if (topicForBoard == unknownBoard) {
- return;
- }
- cJSON* json = cJSON_Parse (message->payload);
- const cJSON* objectItem = NULL;
- InterProcessData data = { 0 };
- uint32_t arraySize = 0;
- if (topicForBoard != main_board) {
- for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
- spCommand = spUnknown;
- objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
- if (objectItem != NULL) {
- switch (topicCmdNumber) {
- case fanSpeedTopic: spCommand = spSetFanSpeed;
- case motorXonTopic:
- if (spCommand == spUnknown) {
- spCommand = spSetMotorXOn;
- }
- case motorYonTopic:
- if (spCommand == spUnknown) {
- spCommand = spSetMotorYOn;
- }
- if (cJSON_IsArray (objectItem)) {
- data.spCommand = spCommand;
- arraySize = cJSON_GetArraySize (objectItem);
- if (arraySize == 2) {
- for (int i = 0; i < arraySize; i++) {
- cJSON* item = cJSON_GetArrayItem (objectItem, i);
- if (cJSON_IsNumber (item)) {
- data.values.integerValues.value[i] = item->valueint;
- }
- }
- }
- }
- break;
- case diodeTopic:
- if (cJSON_IsNumber (objectItem)) {
- data.spCommand = spSetDiodeOn;
- data.values.integerValues.value[0] = objectItem->valueint;
- }
- break;
- case motorXMaxCurrentTopic: spCommand = spSetmotorXMaxCurrent;
- case motorYMaxCurrentTopic:
- if (cJSON_IsNumber (objectItem)) {
- if (spCommand == spUnknown) {
- spCommand = spSetmotorYMaxCurrent;
- }
- data.spCommand = spCommand;
- data.values.flaotValues.value[0] = objectItem->valuedouble;
- }
- break;
- case clearPeakElectricalMeasurementsTopic:
- if (cJSON_IsNumber (objectItem)) {
- if (objectItem->valueint == 1) {
- data.spCommand = spClearPeakMeasurments;
- }
- }
- break;
- case mainBoardRelayTopic:
- if (cJSON_IsArray (objectItem)) {
- arraySize = cJSON_GetArraySize (objectItem);
- if (arraySize == 2) {
- int32_t relayNumber = -1;
- cJSON* item = cJSON_GetArrayItem (objectItem, 0);
- if (cJSON_IsNumber (item)) {
- relayNumber = item->valueint;
- }
- int32_t relayTimeOn = 0;
- item = cJSON_GetArrayItem (objectItem, 1);
- if (cJSON_IsNumber (item)) {
- relayTimeOn = item->valueint;
- }
- RelayCtrl (relayNumber, relayTimeOn);
- }
- }
- break;
- case setEncoderXValue:
- if (cJSON_IsNumber (objectItem)) {
- data.spCommand = spSetEncoderXValue;
- data.values.flaotValues.value[0] = objectItem->valuedouble;
- }
- break;
- case setEncoderYValue:
- if (cJSON_IsNumber (objectItem)) {
- data.spCommand = spSetEncoderYValue;
- data.values.flaotValues.value[0] = objectItem->valuedouble;
- }
- break;
- case setVoltageMeasGains: spCommand = spSetVoltageMeasGains;
- case setVoltageMeasOffsets:
- if (spCommand == spUnknown) {
- spCommand = spSetVoltageMeasOffsets;
- }
- case setCurrentMeasGains:
- if (spCommand == spUnknown) {
- spCommand = spSetCurrentMeasGains;
- }
- case setCurrentMeasOffsets:
- if (spCommand == spUnknown) {
- spCommand = spSetCurrentMeasOffsets;
- }
- if (cJSON_IsArray (objectItem)) {
- data.spCommand = spCommand;
- arraySize = cJSON_GetArraySize (objectItem);
- if (arraySize == 3) {
- for (int i = 0; i < arraySize; i++) {
- cJSON* item = cJSON_GetArrayItem (objectItem, i);
- if (cJSON_IsNumber (item)) {
- data.values.flaotValues.value[i] = item->valuedouble;
- }
- }
- }
- }
- break;
- case resetSystem:
- if (cJSON_IsNumber (objectItem)) {
- if (objectItem->valueint == 1) {
- data.spCommand = spResetSystem;
- break;
- }
- }
- break;
- case setPositionX:
- if (cJSON_IsNumber (objectItem)) {
- data.spCommand = spSetPositonX;
- data.values.flaotValues.value[0] = objectItem->valuedouble;
- }
- break;
- case setPositionY:
- if (cJSON_IsNumber (objectItem)) {
- data.spCommand = spSetPositonY;
- data.values.flaotValues.value[0] = objectItem->valuedouble;
- }
- break;
- default: break;
- }
- }
- }
- } else {
- for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
- objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
- if (objectItem != NULL) {
- if (topicCmdNumber == resetSystem) {
- __disable_irq ();
- NVIC_SystemReset ();
- break;
- }
- }
- }
- }
- switch (topicForBoard) {
- case main_board: break;
- case board_1:
- #ifdef USE_UART8_INSTEAD_UART1
- if (uart8TaskData.sendCmdToSlaveQueue != NULL) {
- osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
- }
- #else
- if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
- osMessageQueuePut (uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
- }
- #endif
- printf ("Send cmd to board 1\n");
- break;
- case board_2:
- if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
- osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
- }
- printf ("Send cmd to board 2\n");
- break;
- case board_3:
- if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
- osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
- }
- printf ("Send cmd to board 3\n");
- break;
- case board_4:
- if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
- osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
- }
- printf ("Send cmd to board 4\n");
- break;
- default: break;
- }
- cJSON_Delete (json);
- printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
- }
- void mqtt_cli_init (void) {
- mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
- mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
- }
|