|
@@ -18,19 +18,44 @@
|
|
|
#include "MQTTInterface.h"
|
|
|
#include "node-red-config.h"
|
|
|
#include "measurements.h"
|
|
|
+#include "cJSON.h"
|
|
|
+#include "interprocess_data.h"
|
|
|
+#include "uart_tasks.h"
|
|
|
|
|
|
#define MQTT_BUFSIZE 1024
|
|
|
|
|
|
//extern RESMeasurements resMeasurements[SLAVES_COUNT];
|
|
|
//extern SesnorsInfo sensorsInfo[SLAVES_COUNT];
|
|
|
|
|
|
+uint32_t lastSeen[SLAVES_COUNT] = { 0 };
|
|
|
RESMeasurements resMeasurements[SLAVES_COUNT] = { 0 };
|
|
|
SesnorsInfo sensorsInfo[SLAVES_COUNT] = { 0 };
|
|
|
osMutexId_t resMeasurementsMutex;
|
|
|
osMutexId_t sensorsInfoMutex;
|
|
|
|
|
|
+char* const subscribeTopicNames[SLAVES_COUNT] = { "Set/1", "Set/2", "Set/3", "Set/4" };
|
|
|
+#define MAX_COMMANDS_IN_MQTT_PAYLOAD 6
|
|
|
+char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = {"fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent"};
|
|
|
+
|
|
|
+enum _BoardNoOverTopic
|
|
|
+{
|
|
|
+ board_1 = 1,
|
|
|
+ board_2,
|
|
|
+ board_3,
|
|
|
+ board_4,
|
|
|
+ unknownBoard
|
|
|
+};
|
|
|
+
|
|
|
+typedef enum _BoardNoOverTopic BoardNoOverTopic;
|
|
|
+
|
|
|
extern struct netif gnetif; //extern gnetif
|
|
|
|
|
|
+extern UartTaskData uart1TaskData; // Board 1
|
|
|
+extern UartTaskData uart3TaskData; // Board 2
|
|
|
+extern UartTaskData uart6TaskData; // Board 3
|
|
|
+extern UartTaskData uart2TaskData; // Board 4
|
|
|
+extern UartTaskData uart8TaskData; // Debug
|
|
|
+
|
|
|
const osThreadAttr_t mqttClientSubTaskAttr =
|
|
|
{ .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
|
|
|
.priority = (osPriority_t) osPriorityNormal, };
|
|
@@ -47,168 +72,246 @@ MQTTClient mqttClient; //mqtt client
|
|
|
|
|
|
uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
|
|
|
uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
|
|
|
-uint8_t msgBuffer[MQTT_BUFSIZE]; //mqtt message buffer
|
|
|
|
|
|
void MqttClientSubTask(void *argument); //mqtt client subscribe task function
|
|
|
void MqttClientPubTask(void *argument); //mqtt client publish task function
|
|
|
int MqttConnectBroker(void); //mqtt broker connect function
|
|
|
void MqttMessageArrived(MessageData *msg); //mqtt message callback function
|
|
|
|
|
|
-void MqttClientSubTask(void *argument)
|
|
|
-{
|
|
|
- while (1)
|
|
|
- {
|
|
|
- //waiting for valid ip address
|
|
|
- if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0
|
|
|
- || gnetif.gw.addr == 0) //system has no valid ip address
|
|
|
- {
|
|
|
- osDelay(pdMS_TO_TICKS(1000));
|
|
|
- continue;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- printf("DHCP/Static IP O.K.\n");
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- while (1)
|
|
|
- {
|
|
|
- if (!mqttClient.isconnected)
|
|
|
- {
|
|
|
- //try to connect to the broker
|
|
|
- if (MqttConnectBroker() != MQTT_SUCCESS)
|
|
|
- {
|
|
|
- osDelay(pdMS_TO_TICKS(1000));
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- MQTTYield(&mqttClient, 500); //handle timer
|
|
|
- osDelay(pdMS_TO_TICKS(100));
|
|
|
- }
|
|
|
- }
|
|
|
+void MqttClientSubTask (void* argument) {
|
|
|
+ while (1) {
|
|
|
+ // waiting for valid ip address
|
|
|
+ if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
|
|
|
+ {
|
|
|
+ osDelay (pdMS_TO_TICKS (1000));
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ printf ("DHCP/Static IP O.K.\n");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ while (1) {
|
|
|
+ if (!mqttClient.isconnected) {
|
|
|
+ // try to connect to the broker
|
|
|
+ if (MqttConnectBroker () != MQTT_SUCCESS) {
|
|
|
+ osDelay (pdMS_TO_TICKS (1000));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ MQTTYield (&mqttClient, 500); // handle timer
|
|
|
+ osDelay (pdMS_TO_TICKS (100));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-void MqttClientPubTask(void *argument)
|
|
|
-{
|
|
|
-// const char *str = "MQTT message from STM32";
|
|
|
- char messageBuffer[256] = {0x00};
|
|
|
- char topicTextBuffer[32] = {0x00};
|
|
|
- uint32_t bytesInBuffer = 0;
|
|
|
- uint8_t boardNumber = 0;
|
|
|
- MQTTMessage message;
|
|
|
-
|
|
|
- resMeasurementsMutex = osMutexNew (NULL);
|
|
|
- sensorsInfoMutex = osMutexNew (NULL);
|
|
|
-
|
|
|
- while (1)
|
|
|
- {
|
|
|
- if (mqttClient.isconnected)
|
|
|
- {
|
|
|
- if (is_link_up())
|
|
|
- {
|
|
|
- for( boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++)
|
|
|
- {
|
|
|
- osMutexAcquire (resMeasurementsMutex, osWaitForever);
|
|
|
- RESMeasurements *resMeas = &resMeasurements[boardNumber];
|
|
|
- sprintf(topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
|
|
|
- bytesInBuffer = sprintf(messageBuffer,"\"voltageRMS\":{%.2f, %.2f, %.2f}, ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"voltagePeak\":{%.2f, %.2f, %.2f}, ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"currentRMS\":{%.3f, %.3f, %.3f}, ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"currentPeak\":{%.3f, %.3f, %.3f}, ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"power\":{%.2f, %.2f, %.2f}", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
|
|
|
- osMutexRelease(resMeasurementsMutex);
|
|
|
- message.payload = (void*) messageBuffer;
|
|
|
- message.payloadlen = strlen(messageBuffer);
|
|
|
- MQTTPublish(&mqttClient, topicTextBuffer, &message); //publish a message
|
|
|
- }
|
|
|
-
|
|
|
- for( boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++)
|
|
|
- {
|
|
|
- osMutexAcquire (sensorsInfoMutex, osWaitForever);
|
|
|
- SesnorsInfo *sensors = &sensorsInfo[boardNumber];
|
|
|
- sprintf(topicTextBuffer, "Sensors/%d", boardNumber + 1);
|
|
|
- bytesInBuffer = sprintf(messageBuffer,"\"pvTemperature\":{%.1f, %.1f}, ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"fanVoltage\":%.2f, ", sensors->fanVoltage);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"pvEncoder\":%.2f, ", sensors->pvEncoder);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorXStatus\":%d, ", sensors->motorXStatus);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorYStatus\":%d, ", sensors->motorYStatus);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
|
|
|
- bytesInBuffer += sprintf(&messageBuffer[bytesInBuffer],"\"motorYPeakCurrent\":%.3f", sensors->motorYPeakCurrent);
|
|
|
- osMutexRelease(sensorsInfoMutex);
|
|
|
- message.payload = (void*) messageBuffer;
|
|
|
- message.payloadlen = strlen(messageBuffer);
|
|
|
- MQTTPublish(&mqttClient, topicTextBuffer, &message); //publish a message
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- osDelay(pdMS_TO_TICKS(1000));
|
|
|
- }
|
|
|
+void MqttClientPubTask (void* argument) {
|
|
|
+ char messageBuffer[512] = { 0x00 };
|
|
|
+ char topicTextBuffer[32] = { 0x00 };
|
|
|
+ uint32_t bytesInBuffer = 0;
|
|
|
+ uint8_t boardNumber = 0;
|
|
|
+ MQTTMessage message;
|
|
|
+
|
|
|
+ resMeasurementsMutex = osMutexNew (NULL);
|
|
|
+ sensorsInfoMutex = osMutexNew (NULL);
|
|
|
+
|
|
|
+ while (1) {
|
|
|
+ if (mqttClient.isconnected) {
|
|
|
+ if (is_link_up ()) {
|
|
|
+ for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
|
|
|
+ osMutexAcquire (resMeasurementsMutex, osWaitForever);
|
|
|
+ RESMeasurements* resMeas = &resMeasurements[boardNumber];
|
|
|
+ sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
|
|
|
+ bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", lastSeen[boardNumber]);
|
|
|
+ osMutexRelease (resMeasurementsMutex);
|
|
|
+ message.payload = (void*)messageBuffer;
|
|
|
+ message.payloadlen = strlen (messageBuffer);
|
|
|
+ MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
|
|
|
+ }
|
|
|
+
|
|
|
+ for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
|
|
|
+ osMutexAcquire (sensorsInfoMutex, osWaitForever);
|
|
|
+ SesnorsInfo* sensors = &sensorsInfo[boardNumber];
|
|
|
+ sprintf (topicTextBuffer, "Sensors/%d", boardNumber + 1);
|
|
|
+ bytesInBuffer = sprintf (messageBuffer, "\"{pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoder\":%.2f, ", sensors->pvEncoder);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchUp\":%d, ", sensors->limitSwitchUp);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchDown\":%d, ", sensors->limitSwitchDown);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchCenter\":%d, ", sensors->limitSwitchCenter);
|
|
|
+ bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
|
|
|
+
|
|
|
+ osMutexRelease (sensorsInfoMutex);
|
|
|
+ message.payload = (void*)messageBuffer;
|
|
|
+ message.payloadlen = strlen (messageBuffer);
|
|
|
+ MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ osDelay (pdMS_TO_TICKS (1000));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-int MqttConnectBroker()
|
|
|
-{
|
|
|
- int ret;
|
|
|
-
|
|
|
- NewNetwork(&net);
|
|
|
- ret = ConnectNetwork(&net, BROKER_IP, MQTT_PORT);
|
|
|
- if (ret != MQTT_SUCCESS)
|
|
|
- {
|
|
|
- printf("ConnectNetwork failed.\n");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- MQTTClientInit(&mqttClient, &net, 1000, sndBuffer, sizeof(sndBuffer),
|
|
|
- rcvBuffer, sizeof(rcvBuffer));
|
|
|
-
|
|
|
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
|
|
|
- data.willFlag = 0;
|
|
|
- data.MQTTVersion = 3;
|
|
|
- data.clientID.cstring = "test_user1";
|
|
|
- data.username.cstring = "test_user1";
|
|
|
- data.password.cstring = "1234";
|
|
|
- data.keepAliveInterval = 100;
|
|
|
- data.cleansession = 1;
|
|
|
-
|
|
|
- ret = MQTTConnect(&mqttClient, &data);
|
|
|
- if (ret != MQTT_SUCCESS)
|
|
|
- {
|
|
|
- net_disconnect(&net);
|
|
|
- printf("MQTTConnect failed. Code %d\n", ret);
|
|
|
- return ret;
|
|
|
- }
|
|
|
-
|
|
|
- ret = MQTTSubscribe(&mqttClient, "test_second", QOS0, MqttMessageArrived);
|
|
|
- if (ret != MQTT_SUCCESS)
|
|
|
- {
|
|
|
- net_disconnect(&net);
|
|
|
- printf("MQTTSubscribe failed.\n");
|
|
|
- return ret;
|
|
|
- }
|
|
|
- printf("MQTT_ConnectBroker O.K.\n");
|
|
|
-
|
|
|
- return MQTT_SUCCESS;
|
|
|
+int MqttConnectBroker () {
|
|
|
+ uint8_t boardNumber = 0;
|
|
|
+ int ret;
|
|
|
+
|
|
|
+ NewNetwork (&net);
|
|
|
+ ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
|
|
|
+ if (ret != MQTT_SUCCESS) {
|
|
|
+ printf ("ConnectNetwork failed.\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
|
|
|
+
|
|
|
+ MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
|
|
|
+ data.willFlag = 0;
|
|
|
+ data.MQTTVersion = 3;
|
|
|
+ data.clientID.cstring = "test_user1";
|
|
|
+ data.username.cstring = "test_user1";
|
|
|
+ data.password.cstring = "1234";
|
|
|
+ data.keepAliveInterval = 100;
|
|
|
+ data.cleansession = 1;
|
|
|
+
|
|
|
+ ret = MQTTConnect (&mqttClient, &data);
|
|
|
+ if (ret != MQTT_SUCCESS) {
|
|
|
+ net_disconnect (&net);
|
|
|
+ printf ("MQTTConnect failed. Code %d\n", ret);
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
|
|
|
+ ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
|
|
|
+ if (ret != MQTT_SUCCESS) {
|
|
|
+ net_disconnect (&net);
|
|
|
+ printf ("MQTTSubscribe failed.\n");
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ printf ("MQTT_ConnectBroker O.K.\n");
|
|
|
+
|
|
|
+ return MQTT_SUCCESS;
|
|
|
}
|
|
|
|
|
|
-void MqttMessageArrived(MessageData *msg)
|
|
|
-{
|
|
|
- MQTTMessage *message = msg->message;
|
|
|
- memset(msgBuffer, 0, sizeof(msgBuffer));
|
|
|
- memcpy(msgBuffer, message->payload, message->payloadlen);
|
|
|
-
|
|
|
- printf("MQTT MSG[%d]:%s\n", (int) message->payloadlen, msgBuffer);
|
|
|
+void MqttMessageArrived (MessageData* msg) {
|
|
|
+ SerialProtocolCommands spCommand = spUnknown;
|
|
|
+ BoardNoOverTopic topicForBoard = unknownBoard;
|
|
|
+ uint8_t boardNumber = 0;
|
|
|
+ MQTTMessage* message = msg->message;
|
|
|
+ char topicName[32] = { 0 };
|
|
|
+ memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
|
|
|
+
|
|
|
+ for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
|
|
|
+ if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
|
|
|
+ topicForBoard = (BoardNoOverTopic)(boardNumber + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ cJSON* json = cJSON_Parse (message->payload);
|
|
|
+ const cJSON* objectItem = NULL;
|
|
|
+ InterProcessData data = { 0 };
|
|
|
+ for (int topicCmdNumber = 0; topicCmdNumber < 6; topicCmdNumber++) {
|
|
|
+ spCommand = spUnknown;
|
|
|
+ objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
|
|
|
+ if (objectItem != NULL) {
|
|
|
+ switch (topicCmdNumber) {
|
|
|
+ case 0: spCommand = spSetFanSpeed;
|
|
|
+ case 1:
|
|
|
+ if (spCommand == spUnknown) {
|
|
|
+ spCommand = spSetMotorXOn;
|
|
|
+ }
|
|
|
+ case 2:
|
|
|
+ if (spCommand == spUnknown) {
|
|
|
+ spCommand = spSetMotorYOn;
|
|
|
+ }
|
|
|
+ if (cJSON_IsArray (objectItem)) {
|
|
|
+ data.spCommand = spCommand;
|
|
|
+ int arraySize = cJSON_GetArraySize (objectItem);
|
|
|
+ if (arraySize == 2) {
|
|
|
+ for (int i = 0; i < arraySize; i++) {
|
|
|
+ cJSON* item = cJSON_GetArrayItem (objectItem, i);
|
|
|
+ if (cJSON_IsNumber (item)) {
|
|
|
+ data.values.integerValues.value[i] = item->valueint;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
+ data.spCommand = spSetDiodeOn;
|
|
|
+ data.values.integerValues.value[0] = objectItem->valueint;
|
|
|
+ data.values.integerValues.value[1] = 0;
|
|
|
+ break;
|
|
|
+ case 4: spCommand = spSetmotorXMaxCurrent;
|
|
|
+ case 5:
|
|
|
+ if (spCommand == spUnknown) {
|
|
|
+ spCommand = spSetmotorYMaxCurrent;
|
|
|
+ }
|
|
|
+ data.spCommand = spCommand;
|
|
|
+ data.values.flaotValues.value[0] = objectItem->valuedouble;
|
|
|
+ data.values.flaotValues.value[1] = 0.0;
|
|
|
+ break;
|
|
|
+ default: break;
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (topicForBoard) {
|
|
|
+ case board_1:
|
|
|
+#if 0
|
|
|
+ if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
|
|
|
+ osMessageQueuePut (
|
|
|
+ uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
|
|
|
+ }
|
|
|
+#else
|
|
|
+ if (uart8TaskData.sendCmdToSlaveQueue != NULL) {
|
|
|
+ osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
|
|
|
+ }
|
|
|
+#endif
|
|
|
+ printf ("Send cmd to board 1\n");
|
|
|
+ break;
|
|
|
+ case board_2:
|
|
|
+ if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
|
|
|
+ osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
|
|
|
+ }
|
|
|
+ printf ("Send cmd to board 2\n");
|
|
|
+ break;
|
|
|
+ case board_3:
|
|
|
+ if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
|
|
|
+ osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
|
|
|
+ }
|
|
|
+ printf ("Send cmd to board 3\n");
|
|
|
+ break;
|
|
|
+ case board_4:
|
|
|
+ if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
|
|
|
+ osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
|
|
|
+ }
|
|
|
+ printf ("Send cmd to board 4\n");
|
|
|
+ break;
|
|
|
+ default: break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cJSON_Delete (json);
|
|
|
+
|
|
|
+ // printf ("MQTT MSG[%d]:%s\n", (int)message->payloadlen, (char*)message->payload);
|
|
|
+
|
|
|
+ printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
|
|
|
}
|
|
|
|
|
|
-void mqtt_cli_init(void)
|
|
|
-{
|
|
|
- mqttClientSubTaskHandle = osThreadNew(MqttClientSubTask, NULL,
|
|
|
- &mqttClientSubTaskAttr); //subscribe task
|
|
|
- mqttClientPubTaskHandle = osThreadNew(MqttClientPubTask, NULL,
|
|
|
- &mqttClientPubTaskAttr); //publish task
|
|
|
+void mqtt_cli_init (void) {
|
|
|
+ mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
|
|
|
+ mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
|
|
|
}
|
|
|
|
|
|
#else
|
|
@@ -267,7 +370,8 @@ static void mqtt_request_cb(void *arg, err_t err)
|
|
|
const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
|
|
|
|
|
|
LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err));
|
|
|
-}
|
|
|
+}VES_COUNT; boardNumber++)
|
|
|
+ {
|
|
|
|
|
|
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
|
|
|
{
|