/* * mqtt_client.c * * Created on: Jun 10, 2024 * Author: jakubski */ #include #if 1 #include "FreeRTOS.h" #include "task.h" #include "main.h" #include "cmsis_os.h" #include #include "lwip.h" #include "lwip/api.h" #include "MQTTClient.h" #include "MQTTInterface.h" #include "node-red-config.h" #include "measurements.h" #include "cJSON.h" #include "interprocess_data.h" #include "uart_tasks.h" #define MQTT_BUFSIZE 1024 char* const subscribeTopicNames[SLAVES_COUNT] = { "Set/1", "Set/2", "Set/3", "Set/4" }; #define MAX_COMMANDS_IN_MQTT_PAYLOAD 6 char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = {"fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent"}; enum _BoardNoOverTopic { board_1 = 1, board_2, board_3, board_4, unknownBoard }; typedef enum _BoardNoOverTopic BoardNoOverTopic; extern struct netif gnetif; //extern gnetif extern UartTaskData uart1TaskData; // Board 1 extern UartTaskData uart3TaskData; // Board 2 extern UartTaskData uart6TaskData; // Board 3 extern UartTaskData uart2TaskData; // Board 4 extern UartTaskData uart8TaskData; // Debug const osThreadAttr_t mqttClientSubTaskAttr = { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4, .priority = (osPriority_t) osPriorityNormal, }; const osThreadAttr_t mqttClientPubTaskAttr = { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4, .priority = (osPriority_t) osPriorityNormal, }; osThreadId mqttClientSubTaskHandle; //mqtt client task handle osThreadId mqttClientPubTaskHandle; //mqtt client task handle Network net; //mqtt network MQTTClient mqttClient; //mqtt client uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer void MqttClientSubTask(void *argument); //mqtt client subscribe task function void MqttClientPubTask(void *argument); //mqtt client publish task function int MqttConnectBroker(void); //mqtt broker connect function void MqttMessageArrived(MessageData *msg); //mqtt message callback function void MqttClientSubTask (void* argument) { while (1) { // waiting for valid ip address if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address { osDelay (pdMS_TO_TICKS (1000)); continue; } else { printf ("DHCP/Static IP O.K.\n"); break; } } while (1) { if (!mqttClient.isconnected) { // try to connect to the broker if (MqttConnectBroker () != MQTT_SUCCESS) { osDelay (pdMS_TO_TICKS (1000)); } } else { MQTTYield (&mqttClient, 500); // handle timer osDelay (pdMS_TO_TICKS (100)); } } } void MqttClientPubTask (void* argument) { char messageBuffer[512] = { 0x00 }; char topicTextBuffer[32] = { 0x00 }; uint32_t bytesInBuffer = 0; uint8_t boardNumber = 0; MQTTMessage message; while (1) { if (mqttClient.isconnected) { if (is_link_up ()) { for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { osMutexAcquire (resMeasurementsMutex, osWaitForever); RESMeasurements* resMeas = &resMeasurements[boardNumber]; sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1); bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]); osMutexRelease (resMeasurementsMutex); message.payload = (void*)messageBuffer; message.payloadlen = strlen (messageBuffer); MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message } for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { osMutexAcquire (sensorsInfoMutex, osWaitForever); SesnorsInfo* sensors = &sensorsInfo[boardNumber]; sprintf (topicTextBuffer, "Sensors/%d", boardNumber + 1); bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoder\":%.2f, ", sensors->pvEncoder); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchUp\":%d, ", sensors->limitSwitchUp); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchDown\":%d, ", sensors->limitSwitchDown); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchCenter\":%d, ", sensors->limitSwitchCenter); bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask); osMutexRelease (sensorsInfoMutex); message.payload = (void*)messageBuffer; message.payloadlen = strlen (messageBuffer); MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message } } } osDelay (pdMS_TO_TICKS (1000)); } } int MqttConnectBroker () { uint8_t boardNumber = 0; int ret; NewNetwork (&net); ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT); if (ret != MQTT_SUCCESS) { printf ("ConnectNetwork failed.\n"); return -1; } MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer)); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.willFlag = 0; data.MQTTVersion = 3; data.clientID.cstring = "test_user1"; data.username.cstring = "test_user1"; data.password.cstring = "1234"; data.keepAliveInterval = 100; data.cleansession = 1; ret = MQTTConnect (&mqttClient, &data); if (ret != MQTT_SUCCESS) { net_disconnect (&net); printf ("MQTTConnect failed. Code %d\n", ret); return ret; } for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived); if (ret != MQTT_SUCCESS) { net_disconnect (&net); printf ("MQTTSubscribe failed.\n"); return ret; } } printf ("MQTT_ConnectBroker O.K.\n"); return MQTT_SUCCESS; } void MqttMessageArrived (MessageData* msg) { SerialProtocolCommands spCommand = spUnknown; BoardNoOverTopic topicForBoard = unknownBoard; uint8_t boardNumber = 0; MQTTMessage* message = msg->message; char topicName[32] = { 0 }; memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len); for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) { if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) { topicForBoard = (BoardNoOverTopic)(boardNumber + 1); } } cJSON* json = cJSON_Parse (message->payload); const cJSON* objectItem = NULL; InterProcessData data = { 0 }; for (int topicCmdNumber = 0; topicCmdNumber < 6; topicCmdNumber++) { spCommand = spUnknown; objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]); if (objectItem != NULL) { switch (topicCmdNumber) { case 0: spCommand = spSetFanSpeed; case 1: if (spCommand == spUnknown) { spCommand = spSetMotorXOn; } case 2: if (spCommand == spUnknown) { spCommand = spSetMotorYOn; } if (cJSON_IsArray (objectItem)) { data.spCommand = spCommand; int arraySize = cJSON_GetArraySize (objectItem); if (arraySize == 2) { for (int i = 0; i < arraySize; i++) { cJSON* item = cJSON_GetArrayItem (objectItem, i); if (cJSON_IsNumber (item)) { data.values.integerValues.value[i] = item->valueint; } } } } break; case 3: data.spCommand = spSetDiodeOn; data.values.integerValues.value[0] = objectItem->valueint; data.values.integerValues.value[1] = 0; break; case 4: spCommand = spSetmotorXMaxCurrent; case 5: if (spCommand == spUnknown) { spCommand = spSetmotorYMaxCurrent; } data.spCommand = spCommand; data.values.flaotValues.value[0] = objectItem->valuedouble; data.values.flaotValues.value[1] = 0.0; break; default: break; } switch (topicForBoard) { case board_1: #if 0 if (uart1TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut ( uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } #else if (uart8TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } #endif printf ("Send cmd to board 1\n"); break; case board_2: if (uart3TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 2\n"); break; case board_3: if (uart6TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 3\n"); break; case board_4: if (uart2TaskData.sendCmdToSlaveQueue != NULL) { osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100); } printf ("Send cmd to board 4\n"); break; default: break; } } } cJSON_Delete (json); // printf ("MQTT MSG[%d]:%s\n", (int)message->payloadlen, (char*)message->payload); printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload); } void mqtt_cli_init (void) { mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task } #else #include "lwip/opt.h" #include "lwip/arch.h" #include "lwip/api.h" #include "lwip/apps/mqtt.h" #include "cmsis_os.h" #define MQTT_CLI_THREAD_PRIO ( tskIDLE_PRIORITY + 4 ) #ifndef LWIP_MQTT_EXAMPLE_IPADDR_INIT #if LWIP_IPV4 #define LWIP_MQTT_EXAMPLE_IPADDR_INIT = IPADDR4_INIT(PP_HTONL(IPADDR_LOOPBACK)) #else #define LWIP_MQTT_EXAMPLE_IPADDR_INIT #endif #endif static ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT; static const struct mqtt_connect_client_info_t mqtt_client_info = { "test_user1", "test_user1", /* user */ "1234", /* pass */ 100, /* keep alive */ NULL, /* will_topic */ NULL, /* will_msg */ 0, /* will_qos */ 0 /* will_retain */ #if LWIP_ALTCP && LWIP_ALTCP_TLS , NULL #endif }; static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; // LWIP_UNUSED_ARG(data); LWIP_PLATFORM_DIAG(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags)); LWIP_PLATFORM_DIAG(("Data:\n%s\n", data)); } static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; LWIP_PLATFORM_DIAG(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len)); } static void mqtt_request_cb(void *arg, err_t err) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err)); }VES_COUNT; boardNumber++) { static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; // LWIP_UNUSED_ARG(client); LWIP_PLATFORM_DIAG(("MQTT client \"%s\" connection cb: status %d\n", client_info->client_id, (int)status)); if (status == MQTT_CONNECT_ACCEPTED) { mqtt_sub_unsub(client, "topic_qos1", 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); mqtt_sub_unsub(client, "topic_qos0", 0, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); } } void mqtt_cli_thread(void *arg) { mqtt_client_t* mqtt_client; osDelay(pdMS_TO_TICKS(7000)); ipaddr_aton("192.168.1.34", &mqtt_ip); LOCK_TCPIP_CORE(); mqtt_client = mqtt_client_new(); mqtt_client_connect(mqtt_client, &mqtt_ip, MQTT_PORT, mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info), &mqtt_client_info); mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info)); UNLOCK_TCPIP_CORE(); while(1) { osDelay(pdMS_TO_TICKS(100)); } } void mqtt_cli_init(void) { sys_thread_new("mqtt_cli_netconn", mqtt_cli_thread, NULL, DEFAULT_THREAD_STACKSIZE, MQTT_CLI_THREAD_PRIO); } #endif