/* * mqtt_client.c * * Created on: Jun 10, 2024 * Author: jakubski */ #include #include "FreeRTOS.h" #include "task.h" #include "main.h" #include "cmsis_os.h" #include #include "lwip.h" #include "lwip/api.h" #include "MQTTClient.h" #include "MQTTInterface.h" #include "node-red-config.h" #include "measurements.h" #include "cJSON.h" #include "interprocess_data.h" #include "uart_tasks.h" #define MQTT_BUFSIZE 1024 char* const subscribeTopicNames[MASTER_BOARD + SLAVES_COUNT] = { "Set/0", "Set/1", "Set/2", "Set/3", "Set/4" }; #define MAX_COMMANDS_IN_MQTT_PAYLOAD 17 char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = { "fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent", "clearPeakElectricalMeasurements", "mainBoardRelay", "setEncoderXValue", "setEncoderYValue", "setVoltageMeasGains", "setVoltageMeasOffsets", "setCurrentMeasGains", "setCurrentMeasOffsets", "resetSystem", "setPositionX", "setPositionY" }; enum _Topics { fanSpeedTopic = 0, motorXonTopic, motorYonTopic, diodeTopic, motorXMaxCurrentTopic, motorYMaxCurrentTopic, clearPeakElectricalMeasurementsTopic, mainBoardRelayTopic, setEncoderXValue, setEncoderYValue, setVoltageMeasGains, setVoltageMeasOffsets, setCurrentMeasGains, setCurrentMeasOffsets, resetSystem, setPositionX, setPositionY }; enum _BoardNoOverTopic { main_board = 0, board_1, board_2, board_3, board_4, unknownBoard }; typedef enum _BoardNoOverTopic BoardNoOverTopic; extern struct netif gnetif; //extern gnetif extern UartTaskData uart1TaskData; // Board 1 extern UartTaskData uart3TaskData; // Board 2 extern UartTaskData uart6TaskData; // Board 3 extern UartTaskData uart2TaskData; // Board 4 extern UartTaskData uart8TaskData; // Debug extern osTimerId_t relay1TimerHandle; extern osTimerId_t relay2TimerHandle; extern osTimerId_t relay3TimerHandle; extern osTimerId_t relay4TimerHandle; const osThreadAttr_t mqttClientSubTaskAttr = { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4, .priority = (osPriority_t) osPriorityNormal, }; const osThreadAttr_t mqttClientPubTaskAttr = { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4, .priority = (osPriority_t) osPriorityNormal, }; osThreadId mqttClientSubTaskHandle; //mqtt client task handle osThreadId mqttClientPubTaskHandle; //mqtt client task handle Network net; //mqtt network MQTTClient mqttClient; //mqtt client uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer void RelayCtrl(int32_t relayNumber, int32_t relayTimeOn); void MqttClientSubTask (void* argument); // mqtt client subscribe task function void MqttClientPubTask (void* argument); // mqtt client publish task function uint32_t MqttConnectBroker (void); // mqtt broker connect function void MqttMessageArrived (MessageData* msg); // mqtt message callback function void MqttClientSubTask (void* argument) { uint8_t connectionTryCounter = 0; while (1) { // waiting for valid ip address if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address { osDelay (pdMS_TO_TICKS (1000)); continue; } else { printf ("DHCP/Static IP O.K.\n"); break; } } while (1) { if (!mqttClient.isconnected) { // try to connect to the broker if (MqttConnectBroker () != MQTT_SUCCESS) { osDelay (pdMS_TO_TICKS (1000)); connectionTryCounter++; if (connectionTryCounter > 10) { __disable_irq (); NVIC_SystemReset (); } } } else { connectionTryCounter = 0; MQTTYield (&mqttClient, 500); // handle timer osDelay (pdMS_TO_TICKS (100)); } } } void MqttClientPubTask (void* argument) { char messageBuffer[512] = { 0x00 }; char topicTextBuffer[32] = { 0x00 }; uint32_t bytesInBuffer = 0; uint8_t boardNumber = 0; MQTTMessage message; while (1) { if (mqttClient.isconnected) { if (is_link_up ()) { for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { osMutexAcquire (resMeasurementsMutex, osWaitForever); RESMeasurements* resMeas = &resMeasurements[boardNumber]; sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1); bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]); osMutexRelease (resMeasurementsMutex); message.payload = (void*)messageBuffer; message.payloadlen = strlen (messageBuffer); MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message } for (boardNumber = 0; boardNumber < SLAVES_COUNT + 1; boardNumber++) { if (boardNumber > 0) { osMutexAcquire (sensorsInfoMutex, osWaitForever); SesnorsInfo* sensors = &sensorsInfo[boardNumber - 1]; sprintf (topicTextBuffer, "Sensors/%d", boardNumber); bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderX\":%.2f, ", sensors->pvEncoderX); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderY\":%.2f, ", sensors->pvEncoderY); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchUp\":%d, ", sensors->limitXSwitchUp); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchDown\":%d, ", sensors->limitXSwitchDown); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchCenter\":%d, ", sensors->limitXSwitchCenter); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchUp\":%d, ", sensors->limitYSwitchUp); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchDown\":%d, ", sensors->limitYSwitchDown); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchCenter\":%d, ", sensors->limitYSwitchCenter); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentXPosition\":%.2f, ", sensors->currentXPosition); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentYPosition\":%.2f, ", sensors->currentYPosition); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionXWeak\":%d, ", sensors->positionXWeak); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionYWeak\":%d, ", sensors->positionYWeak); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask); osMutexRelease (sensorsInfoMutex); } else { sprintf (topicTextBuffer, "Sensors/%d", boardNumber); uint8_t mainBoardPowerSupplyFailMask = ~((HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_4) << 1) | HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_2)) & 0x3; bytesInBuffer = sprintf (messageBuffer, "{\"powerSupplyFailMask\":%d}", mainBoardPowerSupplyFailMask); } message.payload = (void*)messageBuffer; message.payloadlen = strlen (messageBuffer); MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message } } } osDelay (pdMS_TO_TICKS (1000)); } } uint32_t MqttConnectBroker () { uint8_t boardNumber = 0; int ret; NewNetwork (&net); ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT); if (ret != MQTT_SUCCESS) { printf ("ConnectNetwork failed.\n"); return -1; } MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer)); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.willFlag = 0; data.MQTTVersion = 3; data.clientID.cstring = "test_user1"; data.username.cstring = "test_user1"; data.password.cstring = "1234"; data.keepAliveInterval = 100; data.cleansession = 1; ret = MQTTConnect (&mqttClient, &data); if (ret != MQTT_SUCCESS) { net_disconnect (&net); printf ("MQTTConnect failed. Code %d\n", ret); return ret; } for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived); if (ret != MQTT_SUCCESS) { net_disconnect (&net); printf ("MQTTSubscribe failed.\n"); return ret; } } printf ("MQTT_ConnectBroker O.K.\n"); return MQTT_SUCCESS; } void RelayCtrl (int32_t relayNumber, int32_t relayTimeOn) { switch (relayNumber) { case 1: if (relayTimeOn > 0) { osTimerStart (relay1TimerHandle, relayTimeOn * 1000); } else { osTimerStop (relay1TimerHandle); } if (relayTimeOn != 0) { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_SET); } else { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_RESET); } break; case 2: if (relayTimeOn > 0) { osTimerStart (relay2TimerHandle, relayTimeOn * 1000); } else { osTimerStop (relay2TimerHandle); } if (relayTimeOn != 0) { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_SET); } else { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_RESET); } break; case 3: if (relayTimeOn > 0) { osTimerStart (relay3TimerHandle, relayTimeOn * 1000); } else { osTimerStop (relay3TimerHandle); } if (relayTimeOn != 0) { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_SET); } else { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_RESET); } break; case 4: if (relayTimeOn > 0) { osTimerStart (relay4TimerHandle, relayTimeOn * 1000); } else { osTimerStop (relay4TimerHandle); } if (relayTimeOn != 0) { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_SET); } else { HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_RESET); } break; default: break; } } void MqttMessageArrived (MessageData* msg) { SerialProtocolCommands spCommand = spUnknown; BoardNoOverTopic topicForBoard = unknownBoard; uint8_t boardNumber = 0; MQTTMessage* message = msg->message; char topicName[32] = { 0 }; memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len); for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) { topicForBoard = (BoardNoOverTopic)(boardNumber); break; } } if (topicForBoard == unknownBoard) { return; } cJSON* json = cJSON_Parse (message->payload); const cJSON* objectItem = NULL; InterProcessData data = { 0 }; uint32_t arraySize = 0; if (topicForBoard != main_board) { for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) { spCommand = spUnknown; objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]); if (objectItem != NULL) { switch (topicCmdNumber) { case fanSpeedTopic: spCommand = spSetFanSpeed; case motorXonTopic: if (spCommand == spUnknown) { spCommand = spSetMotorXOn; } case motorYonTopic: if (spCommand == spUnknown) { spCommand = spSetMotorYOn; } if (cJSON_IsArray (objectItem)) { data.spCommand = spCommand; arraySize = cJSON_GetArraySize (objectItem); if (arraySize == 2) { for (int i = 0; i < arraySize; i++) { cJSON* item = cJSON_GetArrayItem (objectItem, i); if (cJSON_IsNumber (item)) { data.values.integerValues.value[i] = item->valueint; } } } } break; case diodeTopic: if (cJSON_IsNumber (objectItem)) { data.spCommand = spSetDiodeOn; data.values.integerValues.value[0] = objectItem->valueint; } break; case motorXMaxCurrentTopic: spCommand = spSetmotorXMaxCurrent; case motorYMaxCurrentTopic: if (cJSON_IsNumber (objectItem)) { if (spCommand == spUnknown) { spCommand = spSetmotorYMaxCurrent; } data.spCommand = spCommand; data.values.flaotValues.value[0] = objectItem->valuedouble; } break; case clearPeakElectricalMeasurementsTopic: if (cJSON_IsNumber (objectItem)) { if (objectItem->valueint == 1) { data.spCommand = spClearPeakMeasurments; } } break; case mainBoardRelayTopic: if (cJSON_IsArray (objectItem)) { arraySize = cJSON_GetArraySize (objectItem); if (arraySize == 2) { int32_t relayNumber = -1; cJSON* item = cJSON_GetArrayItem (objectItem, 0); if (cJSON_IsNumber (item)) { relayNumber = item->valueint; } int32_t relayTimeOn = 0; item = cJSON_GetArrayItem (objectItem, 1); if (cJSON_IsNumber (item)) { relayTimeOn = item->valueint; } RelayCtrl (relayNumber, relayTimeOn); } } break; case setEncoderXValue: if (cJSON_IsNumber (objectItem)) { data.spCommand = spSetEncoderXValue; data.values.flaotValues.value[0] = objectItem->valuedouble; } break; case setEncoderYValue: if (cJSON_IsNumber (objectItem)) { data.spCommand = spSetEncoderYValue; data.values.flaotValues.value[0] = objectItem->valuedouble; } break; case setVoltageMeasGains: spCommand = spSetVoltageMeasGains; case setVoltageMeasOffsets: if (spCommand == spUnknown) { spCommand = spSetVoltageMeasOffsets; } case setCurrentMeasGains: if (spCommand == spUnknown) { spCommand = spSetCurrentMeasGains; } case setCurrentMeasOffsets: if (spCommand == spUnknown) { spCommand = spSetCurrentMeasOffsets; } if (cJSON_IsArray (objectItem)) { data.spCommand = spCommand; arraySize = cJSON_GetArraySize (objectItem); if (arraySize == 3) { for (int i = 0; i < arraySize; i++) { cJSON* item = cJSON_GetArrayItem (objectItem, i); if (cJSON_IsNumber (item)) { data.values.flaotValues.value[i] = item->valuedouble; } } } } break; case resetSystem: if (cJSON_IsNumber (objectItem)) { if (objectItem->valueint == 1) { data.spCommand = spResetSystem; break; } } break; case setPositionX: if (cJSON_IsNumber (objectItem)) { data.spCommand = spSetPositonX; data.values.flaotValues.value[0] = objectItem->valuedouble; } break; case setPositionY: if (cJSON_IsNumber (objectItem)) { data.spCommand = spSetPositonY; data.values.flaotValues.value[0] = objectItem->valuedouble; } break; default: break; } } } } else { for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) { objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]); if (objectItem != NULL) { if (topicCmdNumber == resetSystem) { __disable_irq (); NVIC_SystemReset (); break; } } } } switch (topicForBoard) { case main_board: break; case board_1: #ifdef USE_UART8_INSTEAD_UART1 if (uart8TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } #else if (uart1TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } #endif printf ("Send cmd to board 1\n"); break; case board_2: if (uart3TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 2\n"); break; case board_3: if (uart6TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 3\n"); break; case board_4: if (uart2TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 4\n"); break; default: break; } cJSON_Delete (json); printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload); } void mqtt_cli_init (void) { mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task }