mqtt_client.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /*
  2. * mqtt_client.c
  3. *
  4. * Created on: Jun 10, 2024
  5. * Author: jakubski
  6. */
  7. #include <stdio.h>
  8. #include "FreeRTOS.h"
  9. #include "task.h"
  10. #include "main.h"
  11. #include "cmsis_os.h"
  12. #include <string.h>
  13. #include "lwip.h"
  14. #include "lwip/api.h"
  15. #include "MQTTClient.h"
  16. #include "MQTTInterface.h"
  17. #include "node-red-config.h"
  18. #include "measurements.h"
  19. #include "cJSON.h"
  20. #include "interprocess_data.h"
  21. #include "uart_tasks.h"
  22. #define MQTT_BUFSIZE 1024
  23. char* const subscribeTopicNames[MASTER_BOARD + SLAVES_COUNT] = { "Set/0", "Set/1", "Set/2", "Set/3", "Set/4" };
  24. #define MAX_COMMANDS_IN_MQTT_PAYLOAD 17
  25. char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = { "fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent", "clearPeakElectricalMeasurements", "mainBoardRelay",
  26. "setEncoderXValue", "setEncoderYValue", "setVoltageMeasGains", "setVoltageMeasOffsets", "setCurrentMeasGains", "setCurrentMeasOffsets", "resetSystem", "setPositionX", "setPositionY" };
  27. enum _Topics
  28. {
  29. fanSpeedTopic = 0,
  30. motorXonTopic,
  31. motorYonTopic,
  32. diodeTopic,
  33. motorXMaxCurrentTopic,
  34. motorYMaxCurrentTopic,
  35. clearPeakElectricalMeasurementsTopic,
  36. mainBoardRelayTopic,
  37. setEncoderXValue,
  38. setEncoderYValue,
  39. setVoltageMeasGains,
  40. setVoltageMeasOffsets,
  41. setCurrentMeasGains,
  42. setCurrentMeasOffsets,
  43. resetSystem,
  44. setPositionX,
  45. setPositionY
  46. };
  47. enum _BoardNoOverTopic
  48. {
  49. main_board = 0,
  50. board_1,
  51. board_2,
  52. board_3,
  53. board_4,
  54. unknownBoard
  55. };
  56. typedef enum _BoardNoOverTopic BoardNoOverTopic;
  57. extern struct netif gnetif; //extern gnetif
  58. extern UartTaskData uart1TaskData; // Board 1
  59. extern UartTaskData uart3TaskData; // Board 2
  60. extern UartTaskData uart6TaskData; // Board 3
  61. extern UartTaskData uart2TaskData; // Board 4
  62. extern UartTaskData uart8TaskData; // Debug
  63. extern osTimerId_t relay1TimerHandle;
  64. extern osTimerId_t relay2TimerHandle;
  65. extern osTimerId_t relay3TimerHandle;
  66. extern osTimerId_t relay4TimerHandle;
  67. const osThreadAttr_t mqttClientSubTaskAttr =
  68. { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  69. .priority = (osPriority_t) osPriorityNormal, };
  70. const osThreadAttr_t mqttClientPubTaskAttr =
  71. { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  72. .priority = (osPriority_t) osPriorityNormal, };
  73. osThreadId mqttClientSubTaskHandle; //mqtt client task handle
  74. osThreadId mqttClientPubTaskHandle; //mqtt client task handle
  75. Network net; //mqtt network
  76. MQTTClient mqttClient; //mqtt client
  77. uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
  78. uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
  79. void RelayCtrl(int32_t relayNumber, int32_t relayTimeOn);
  80. void MqttClientSubTask (void* argument); // mqtt client subscribe task function
  81. void MqttClientPubTask (void* argument); // mqtt client publish task function
  82. uint32_t MqttConnectBroker (void); // mqtt broker connect function
  83. void MqttMessageArrived (MessageData* msg); // mqtt message callback function
  84. void MqttClientSubTask (void* argument) {
  85. while (1) {
  86. // waiting for valid ip address
  87. if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
  88. {
  89. osDelay (pdMS_TO_TICKS (1000));
  90. continue;
  91. } else {
  92. printf ("DHCP/Static IP O.K.\n");
  93. break;
  94. }
  95. }
  96. while (1) {
  97. if (!mqttClient.isconnected) {
  98. // try to connect to the broker
  99. if (MqttConnectBroker () != MQTT_SUCCESS) {
  100. osDelay (pdMS_TO_TICKS (1000));
  101. }
  102. } else {
  103. MQTTYield (&mqttClient, 500); // handle timer
  104. osDelay (pdMS_TO_TICKS (100));
  105. }
  106. }
  107. }
  108. void MqttClientPubTask (void* argument) {
  109. char messageBuffer[512] = { 0x00 };
  110. char topicTextBuffer[32] = { 0x00 };
  111. uint32_t bytesInBuffer = 0;
  112. uint8_t boardNumber = 0;
  113. MQTTMessage message;
  114. while (1) {
  115. if (mqttClient.isconnected) {
  116. if (is_link_up ()) {
  117. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  118. osMutexAcquire (resMeasurementsMutex, osWaitForever);
  119. RESMeasurements* resMeas = &resMeasurements[boardNumber];
  120. sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
  121. bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
  122. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
  123. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
  124. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
  125. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
  126. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]);
  127. osMutexRelease (resMeasurementsMutex);
  128. message.payload = (void*)messageBuffer;
  129. message.payloadlen = strlen (messageBuffer);
  130. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  131. }
  132. for (boardNumber = 0; boardNumber < SLAVES_COUNT + 1; boardNumber++) {
  133. if (boardNumber > 0) {
  134. osMutexAcquire (sensorsInfoMutex, osWaitForever);
  135. SesnorsInfo* sensors = &sensorsInfo[boardNumber - 1];
  136. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  137. bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
  138. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
  139. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderX\":%.2f, ", sensors->pvEncoderX);
  140. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderY\":%.2f, ", sensors->pvEncoderY);
  141. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
  142. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
  143. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
  144. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
  145. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
  146. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
  147. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchUp\":%d, ", sensors->limitXSwitchUp);
  148. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchDown\":%d, ", sensors->limitXSwitchDown);
  149. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchCenter\":%d, ", sensors->limitXSwitchCenter);
  150. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchUp\":%d, ", sensors->limitYSwitchUp);
  151. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchDown\":%d, ", sensors->limitYSwitchDown);
  152. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchCenter\":%d, ", sensors->limitYSwitchCenter);
  153. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentXPosition\":%.2f, ", sensors->currentXPosition);
  154. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentYPosition\":%.2f, ", sensors->currentYPosition);
  155. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionXWeak\":%d, ", sensors->positionXWeak);
  156. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionYWeak\":%d, ", sensors->positionYWeak);
  157. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
  158. osMutexRelease (sensorsInfoMutex);
  159. } else {
  160. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  161. uint8_t mainBoardPowerSupplyFailMask = ~((HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_4) << 1) | HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_2)) & 0x3;
  162. bytesInBuffer = sprintf (messageBuffer, "{\"powerSupplyFailMask\":%d}", mainBoardPowerSupplyFailMask);
  163. }
  164. message.payload = (void*)messageBuffer;
  165. message.payloadlen = strlen (messageBuffer);
  166. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  167. }
  168. }
  169. }
  170. osDelay (pdMS_TO_TICKS (1000));
  171. }
  172. }
  173. uint32_t MqttConnectBroker () {
  174. uint8_t boardNumber = 0;
  175. int ret;
  176. NewNetwork (&net);
  177. ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
  178. if (ret != MQTT_SUCCESS) {
  179. printf ("ConnectNetwork failed.\n");
  180. return -1;
  181. }
  182. MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
  183. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  184. data.willFlag = 0;
  185. data.MQTTVersion = 3;
  186. data.clientID.cstring = "test_user1";
  187. data.username.cstring = "test_user1";
  188. data.password.cstring = "1234";
  189. data.keepAliveInterval = 100;
  190. data.cleansession = 1;
  191. ret = MQTTConnect (&mqttClient, &data);
  192. if (ret != MQTT_SUCCESS) {
  193. net_disconnect (&net);
  194. printf ("MQTTConnect failed. Code %d\n", ret);
  195. return ret;
  196. }
  197. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  198. ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
  199. if (ret != MQTT_SUCCESS) {
  200. net_disconnect (&net);
  201. printf ("MQTTSubscribe failed.\n");
  202. return ret;
  203. }
  204. }
  205. printf ("MQTT_ConnectBroker O.K.\n");
  206. return MQTT_SUCCESS;
  207. }
  208. void RelayCtrl (int32_t relayNumber, int32_t relayTimeOn) {
  209. switch (relayNumber) {
  210. case 1:
  211. if (relayTimeOn > 0) {
  212. osTimerStart (relay1TimerHandle, relayTimeOn * 1000);
  213. } else {
  214. osTimerStop (relay1TimerHandle);
  215. }
  216. if (relayTimeOn != 0) {
  217. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_SET);
  218. } else {
  219. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_RESET);
  220. }
  221. break;
  222. case 2:
  223. if (relayTimeOn > 0) {
  224. osTimerStart (relay2TimerHandle, relayTimeOn * 1000);
  225. } else {
  226. osTimerStop (relay2TimerHandle);
  227. }
  228. if (relayTimeOn != 0) {
  229. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_SET);
  230. } else {
  231. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_RESET);
  232. }
  233. break;
  234. case 3:
  235. if (relayTimeOn > 0) {
  236. osTimerStart (relay3TimerHandle, relayTimeOn * 1000);
  237. } else {
  238. osTimerStop (relay3TimerHandle);
  239. }
  240. if (relayTimeOn != 0) {
  241. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_SET);
  242. } else {
  243. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_RESET);
  244. }
  245. break;
  246. case 4:
  247. if (relayTimeOn > 0) {
  248. osTimerStart (relay4TimerHandle, relayTimeOn * 1000);
  249. } else {
  250. osTimerStop (relay4TimerHandle);
  251. }
  252. if (relayTimeOn != 0) {
  253. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_SET);
  254. } else {
  255. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_RESET);
  256. }
  257. break;
  258. default: break;
  259. }
  260. }
  261. void MqttMessageArrived (MessageData* msg) {
  262. SerialProtocolCommands spCommand = spUnknown;
  263. BoardNoOverTopic topicForBoard = unknownBoard;
  264. uint8_t boardNumber = 0;
  265. MQTTMessage* message = msg->message;
  266. char topicName[32] = { 0 };
  267. memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
  268. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  269. if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
  270. topicForBoard = (BoardNoOverTopic)(boardNumber);
  271. break;
  272. }
  273. }
  274. if (topicForBoard == unknownBoard) {
  275. return;
  276. }
  277. cJSON* json = cJSON_Parse (message->payload);
  278. const cJSON* objectItem = NULL;
  279. InterProcessData data = { 0 };
  280. uint32_t arraySize = 0;
  281. if (topicForBoard != main_board) {
  282. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  283. spCommand = spUnknown;
  284. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  285. if (objectItem != NULL) {
  286. switch (topicCmdNumber) {
  287. case fanSpeedTopic: spCommand = spSetFanSpeed;
  288. case motorXonTopic:
  289. if (spCommand == spUnknown) {
  290. spCommand = spSetMotorXOn;
  291. }
  292. case motorYonTopic:
  293. if (spCommand == spUnknown) {
  294. spCommand = spSetMotorYOn;
  295. }
  296. if (cJSON_IsArray (objectItem)) {
  297. data.spCommand = spCommand;
  298. arraySize = cJSON_GetArraySize (objectItem);
  299. if (arraySize == 2) {
  300. for (int i = 0; i < arraySize; i++) {
  301. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  302. if (cJSON_IsNumber (item)) {
  303. data.values.integerValues.value[i] = item->valueint;
  304. }
  305. }
  306. }
  307. }
  308. break;
  309. case diodeTopic:
  310. if (cJSON_IsNumber (objectItem)) {
  311. data.spCommand = spSetDiodeOn;
  312. data.values.integerValues.value[0] = objectItem->valueint;
  313. }
  314. break;
  315. case motorXMaxCurrentTopic: spCommand = spSetmotorXMaxCurrent;
  316. case motorYMaxCurrentTopic:
  317. if (cJSON_IsNumber (objectItem)) {
  318. if (spCommand == spUnknown) {
  319. spCommand = spSetmotorYMaxCurrent;
  320. }
  321. data.spCommand = spCommand;
  322. data.values.flaotValues.value[0] = objectItem->valuedouble;
  323. }
  324. break;
  325. case clearPeakElectricalMeasurementsTopic:
  326. if (cJSON_IsNumber (objectItem)) {
  327. if (objectItem->valueint == 1) {
  328. data.spCommand = spClearPeakMeasurments;
  329. }
  330. }
  331. break;
  332. case mainBoardRelayTopic:
  333. if (cJSON_IsArray (objectItem)) {
  334. arraySize = cJSON_GetArraySize (objectItem);
  335. if (arraySize == 2) {
  336. int32_t relayNumber = -1;
  337. cJSON* item = cJSON_GetArrayItem (objectItem, 0);
  338. if (cJSON_IsNumber (item)) {
  339. relayNumber = item->valueint;
  340. }
  341. int32_t relayTimeOn = 0;
  342. item = cJSON_GetArrayItem (objectItem, 1);
  343. if (cJSON_IsNumber (item)) {
  344. relayTimeOn = item->valueint;
  345. }
  346. RelayCtrl (relayNumber, relayTimeOn);
  347. }
  348. }
  349. break;
  350. case setEncoderXValue:
  351. if (cJSON_IsNumber (objectItem)) {
  352. data.spCommand = spSetEncoderXValue;
  353. data.values.flaotValues.value[0] = objectItem->valuedouble;
  354. }
  355. break;
  356. case setEncoderYValue:
  357. if (cJSON_IsNumber (objectItem)) {
  358. data.spCommand = spSetEncoderYValue;
  359. data.values.flaotValues.value[0] = objectItem->valuedouble;
  360. }
  361. break;
  362. case setVoltageMeasGains: spCommand = spSetVoltageMeasGains;
  363. case setVoltageMeasOffsets:
  364. if (spCommand == spUnknown) {
  365. spCommand = spSetVoltageMeasOffsets;
  366. }
  367. case setCurrentMeasGains:
  368. if (spCommand == spUnknown) {
  369. spCommand = spSetCurrentMeasGains;
  370. }
  371. case setCurrentMeasOffsets:
  372. if (spCommand == spUnknown) {
  373. spCommand = spSetCurrentMeasOffsets;
  374. }
  375. if (cJSON_IsArray (objectItem)) {
  376. data.spCommand = spCommand;
  377. arraySize = cJSON_GetArraySize (objectItem);
  378. if (arraySize == 3) {
  379. for (int i = 0; i < arraySize; i++) {
  380. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  381. if (cJSON_IsNumber (item)) {
  382. data.values.flaotValues.value[i] = item->valuedouble;
  383. }
  384. }
  385. }
  386. }
  387. break;
  388. case resetSystem:
  389. if (cJSON_IsNumber (objectItem)) {
  390. if (objectItem->valueint == 1) {
  391. data.spCommand = spResetSystem;
  392. break;
  393. }
  394. }
  395. break;
  396. case setPositionX:
  397. if (cJSON_IsNumber (objectItem)) {
  398. data.spCommand = spSetPositonX;
  399. data.values.flaotValues.value[0] = objectItem->valuedouble;
  400. }
  401. break;
  402. case setPositionY:
  403. if (cJSON_IsNumber (objectItem)) {
  404. data.spCommand = spSetPositonY;
  405. data.values.flaotValues.value[0] = objectItem->valuedouble;
  406. }
  407. break;
  408. default: break;
  409. }
  410. }
  411. }
  412. } else {
  413. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  414. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  415. if (objectItem != NULL) {
  416. if (topicCmdNumber == resetSystem) {
  417. __disable_irq ();
  418. NVIC_SystemReset ();
  419. break;
  420. }
  421. }
  422. }
  423. }
  424. switch (topicForBoard) {
  425. case main_board: break;
  426. case board_1:
  427. #ifdef USE_UART8_INSTEAD_UART1
  428. if (uart8TaskData.sendCmdToSlaveQueue != NULL) {
  429. osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  430. }
  431. #else
  432. if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
  433. osMessageQueuePut (uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  434. }
  435. #endif
  436. printf ("Send cmd to board 1\n");
  437. break;
  438. case board_2:
  439. if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
  440. osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  441. }
  442. printf ("Send cmd to board 2\n");
  443. break;
  444. case board_3:
  445. if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
  446. osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  447. }
  448. printf ("Send cmd to board 3\n");
  449. break;
  450. case board_4:
  451. if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
  452. osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  453. }
  454. printf ("Send cmd to board 4\n");
  455. break;
  456. default: break;
  457. }
  458. cJSON_Delete (json);
  459. printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
  460. }
  461. void mqtt_cli_init (void) {
  462. mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
  463. mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
  464. }