mqtt_client.c 22 KB


  1. /*
  2. * mqtt_client.c
  3. *
  4. * Created on: Jun 10, 2024
  5. * Author: jakubski
  6. */
  7. #include <stdio.h>
  8. #include "FreeRTOS.h"
  9. #include "task.h"
  10. #include "main.h"
  11. #include "cmsis_os.h"
  12. #include <string.h>
  13. #include "lwip.h"
  14. #include "lwip/api.h"
  15. #include "MQTTClient.h"
  16. #include "MQTTInterface.h"
  17. #include "node-red-config.h"
  18. #include "measurements.h"
  19. #include "cJSON.h"
  20. #include "interprocess_data.h"
  21. #include "uart_tasks.h"
  22. #define MQTT_BUFSIZE 1024
  23. char* const subscribeTopicNames[MASTER_BOARD + SLAVES_COUNT] = { "Set/0", "Set/1", "Set/2", "Set/3", "Set/4" };
  24. #define MAX_COMMANDS_IN_MQTT_PAYLOAD 17
  25. char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = { "fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent", "clearPeakElectricalMeasurements", "mainBoardRelay",
  26. "setEncoderXValue", "setEncoderYValue", "setVoltageMeasGains", "setVoltageMeasOffsets", "setCurrentMeasGains", "setCurrentMeasOffsets", "resetSystem", "setPositionX", "setPositionY" };
  27. enum _Topics
  28. {
  29. fanSpeedTopic = 0,
  30. motorXonTopic,
  31. motorYonTopic,
  32. diodeTopic,
  33. motorXMaxCurrentTopic,
  34. motorYMaxCurrentTopic,
  35. clearPeakElectricalMeasurementsTopic,
  36. mainBoardRelayTopic,
  37. setEncoderXValue,
  38. setEncoderYValue,
  39. setVoltageMeasGains,
  40. setVoltageMeasOffsets,
  41. setCurrentMeasGains,
  42. setCurrentMeasOffsets,
  43. resetSystem,
  44. setPositionX,
  45. setPositionY
  46. };
  47. enum _BoardNoOverTopic
  48. {
  49. main_board = 0,
  50. board_1,
  51. board_2,
  52. board_3,
  53. board_4,
  54. unknownBoard
  55. };
  56. typedef enum _BoardNoOverTopic BoardNoOverTopic;
  57. extern struct netif gnetif; //extern gnetif
  58. extern UartTaskData uart1TaskData; // Board 1
  59. extern UartTaskData uart3TaskData; // Board 2
  60. extern UartTaskData uart6TaskData; // Board 3
  61. extern UartTaskData uart2TaskData; // Board 4
  62. extern UartTaskData uart8TaskData; // Debug
  63. extern osTimerId_t relay1TimerHandle;
  64. extern osTimerId_t relay2TimerHandle;
  65. extern osTimerId_t relay3TimerHandle;
  66. extern osTimerId_t relay4TimerHandle;
  67. const osThreadAttr_t mqttClientSubTaskAttr =
  68. { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 6,
  69. .priority = (osPriority_t) osPriorityNormal, };
  70. const osThreadAttr_t mqttClientPubTaskAttr =
  71. { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 6,
  72. .priority = (osPriority_t) osPriorityNormal, };
  73. osThreadId mqttClientSubTaskHandle; //mqtt client task handle
  74. osThreadId mqttClientPubTaskHandle; //mqtt client task handle
  75. Network net; //mqtt network
  76. MQTTClient mqttClient; //mqtt client
  77. uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
  78. uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
  79. void RelayCtrl(int32_t relayNumber, int32_t relayTimeOn);
  80. void MqttClientSubTask (void* argument); // mqtt client subscribe task function
  81. void MqttClientPubTask (void* argument); // mqtt client publish task function
  82. uint32_t MqttConnectBroker (void); // mqtt broker connect function
  83. void MqttMessageArrived (MessageData* msg); // mqtt message callback function
  84. void MqttClientSubTask (void* argument) {
  85. uint8_t connectionTryCounter = 0;
  86. while (1) {
  87. // waiting for valid ip address
  88. if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
  89. {
  90. osDelay (pdMS_TO_TICKS (1000));
  91. continue;
  92. } else {
  93. printf ("DHCP/Static IP O.K.\n");
  94. break;
  95. }
  96. }
  97. while (1) {
  98. if (!mqttClient.isconnected) {
  99. // try to connect to the broker
  100. if (MqttConnectBroker () != MQTT_SUCCESS) {
  101. osDelay (pdMS_TO_TICKS (1000));
  102. connectionTryCounter++;
  103. if (connectionTryCounter > 10) {
  104. __disable_irq ();
  105. NVIC_SystemReset ();
  106. }
  107. }
  108. } else {
  109. connectionTryCounter = 0;
  110. MQTTYield (&mqttClient, 500); // handle timer
  111. osDelay (pdMS_TO_TICKS (100));
  112. }
  113. }
  114. }
  115. void MqttClientPubTask (void* argument) {
  116. char messageBuffer[MQTT_BUFSIZE] = { 0x00 };
  117. char topicTextBuffer[32] = { 0x00 };
  118. uint32_t bytesInBuffer = 0;
  119. uint8_t boardNumber = 0;
  120. MQTTMessage message;
  121. while (1) {
  122. if (mqttClient.isconnected) {
  123. if (is_link_up ()) {
  124. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  125. osMutexAcquire (resMeasurementsMutex, osWaitForever);
  126. RESMeasurements* resMeas = &resMeasurements[boardNumber];
  127. sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
  128. bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
  129. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
  130. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
  131. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
  132. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
  133. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]);
  134. osMutexRelease (resMeasurementsMutex);
  135. message.payload = (void*)messageBuffer;
  136. message.payloadlen = strlen (messageBuffer);
  137. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  138. }
  139. for (boardNumber = 0; boardNumber < SLAVES_COUNT + 1; boardNumber++) {
  140. if (boardNumber > 0) {
  141. osMutexAcquire (sensorsInfoMutex, osWaitForever);
  142. SesnorsInfo* sensors = &sensorsInfo[boardNumber - 1];
  143. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  144. bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
  145. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
  146. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderX\":%.2f, ", sensors->currentXPosition);
  147. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderY\":%.2f, ", sensors->currentYPosition);
  148. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderXraw\":%ld, ", sensors->pvEncoderXraw);
  149. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderYraw\":%ld, ", sensors->pvEncoderYraw);
  150. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
  151. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
  152. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
  153. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
  154. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
  155. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
  156. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchUp\":%d, ", sensors->limitXSwitchUp);
  157. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchDown\":%d, ", sensors->limitXSwitchDown);
  158. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchCenter\":%d, ", sensors->limitXSwitchCenter);
  159. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchUp\":%d, ", sensors->limitYSwitchUp);
  160. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchDown\":%d, ", sensors->limitYSwitchDown);
  161. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchCenter\":%d, ", sensors->limitYSwitchCenter);
  162. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentXPosition\":%.2f, ", sensors->currentXPosition + sensors->positionXOffset);
  163. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentYPosition\":%.2f, ", sensors->currentYPosition + sensors->positionYOffset);
  164. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionXWeak\":%d, ", sensors->positionXWeak);
  165. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"positionYWeak\":%d, ", sensors->positionYWeak);
  166. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
  167. osMutexRelease (sensorsInfoMutex);
  168. } else {
  169. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  170. uint8_t mainBoardPowerSupplyFailMask = ~((HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_4) << 1) | HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_2)) & 0x3;
  171. bytesInBuffer = sprintf (messageBuffer, "{\"powerSupplyFailMask\":%d}", mainBoardPowerSupplyFailMask);
  172. }
  173. message.payload = (void*)messageBuffer;
  174. message.payloadlen = strlen (messageBuffer);
  175. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  176. }
  177. }
  178. }
  179. osDelay (pdMS_TO_TICKS (1000));
  180. }
  181. }
  182. uint32_t MqttConnectBroker () {
  183. uint8_t boardNumber = 0;
  184. int ret;
  185. NewNetwork (&net);
  186. ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
  187. if (ret != MQTT_SUCCESS) {
  188. printf ("ConnectNetwork failed.\n");
  189. return -1;
  190. }
  191. MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
  192. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  193. data.willFlag = 0;
  194. data.MQTTVersion = 3;
  195. data.clientID.cstring = "test_user1";
  196. data.username.cstring = "test_user1";
  197. data.password.cstring = "1234";
  198. data.keepAliveInterval = 100;
  199. data.cleansession = 1;
  200. ret = MQTTConnect (&mqttClient, &data);
  201. if (ret != MQTT_SUCCESS) {
  202. net_disconnect (&net);
  203. printf ("MQTTConnect failed. Code %d\n", ret);
  204. return ret;
  205. }
  206. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  207. ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
  208. if (ret != MQTT_SUCCESS) {
  209. net_disconnect (&net);
  210. printf ("MQTTSubscribe failed.\n");
  211. return ret;
  212. }
  213. }
  214. printf ("MQTT_ConnectBroker O.K.\n");
  215. return MQTT_SUCCESS;
  216. }
  217. void RelayCtrl (int32_t relayNumber, int32_t relayTimeOn) {
  218. switch (relayNumber) {
  219. case 1:
  220. if (relayTimeOn > 0) {
  221. osTimerStart (relay1TimerHandle, relayTimeOn * 1000);
  222. } else {
  223. osTimerStop (relay1TimerHandle);
  224. }
  225. if (relayTimeOn != 0) {
  226. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_SET);
  227. } else {
  228. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_RESET);
  229. }
  230. break;
  231. case 2:
  232. if (relayTimeOn > 0) {
  233. osTimerStart (relay2TimerHandle, relayTimeOn * 1000);
  234. } else {
  235. osTimerStop (relay2TimerHandle);
  236. }
  237. if (relayTimeOn != 0) {
  238. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_SET);
  239. } else {
  240. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_RESET);
  241. }
  242. break;
  243. case 3:
  244. if (relayTimeOn > 0) {
  245. osTimerStart (relay3TimerHandle, relayTimeOn * 1000);
  246. } else {
  247. osTimerStop (relay3TimerHandle);
  248. }
  249. if (relayTimeOn != 0) {
  250. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_SET);
  251. } else {
  252. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_RESET);
  253. }
  254. break;
  255. case 4:
  256. if (relayTimeOn > 0) {
  257. osTimerStart (relay4TimerHandle, relayTimeOn * 1000);
  258. } else {
  259. osTimerStop (relay4TimerHandle);
  260. }
  261. if (relayTimeOn != 0) {
  262. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_SET);
  263. } else {
  264. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_RESET);
  265. }
  266. break;
  267. default: break;
  268. }
  269. }
  270. void MqttMessageArrived (MessageData* msg) {
  271. SerialProtocolCommands spCommand = spUnknown;
  272. BoardNoOverTopic topicForBoard = unknownBoard;
  273. uint8_t boardNumber = 0;
  274. MQTTMessage* message = msg->message;
  275. char topicName[32] = { 0 };
  276. memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
  277. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  278. if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
  279. topicForBoard = (BoardNoOverTopic)(boardNumber);
  280. break;
  281. }
  282. }
  283. if (topicForBoard == unknownBoard) {
  284. return;
  285. }
  286. cJSON* json = cJSON_Parse (message->payload);
  287. const cJSON* objectItem = NULL;
  288. InterProcessData data = { 0 };
  289. uint32_t arraySize = 0;
  290. if (topicForBoard != main_board) {
  291. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  292. spCommand = spUnknown;
  293. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  294. if (objectItem != NULL) {
  295. switch (topicCmdNumber) {
  296. case fanSpeedTopic: spCommand = spSetFanSpeed;
  297. case motorXonTopic:
  298. if (spCommand == spUnknown) {
  299. spCommand = spSetMotorXOn;
  300. }
  301. case motorYonTopic:
  302. if (spCommand == spUnknown) {
  303. spCommand = spSetMotorYOn;
  304. }
  305. if (cJSON_IsArray (objectItem)) {
  306. data.spCommand = spCommand;
  307. arraySize = cJSON_GetArraySize (objectItem);
  308. if (arraySize == 2) {
  309. for (int i = 0; i < arraySize; i++) {
  310. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  311. if (cJSON_IsNumber (item)) {
  312. data.values.integerValues.value[i] = item->valueint;
  313. }
  314. }
  315. }
  316. }
  317. break;
  318. case diodeTopic:
  319. if (cJSON_IsNumber (objectItem)) {
  320. data.spCommand = spSetDiodeOn;
  321. data.values.integerValues.value[0] = objectItem->valueint;
  322. }
  323. break;
  324. case motorXMaxCurrentTopic: spCommand = spSetmotorXMaxCurrent;
  325. case motorYMaxCurrentTopic:
  326. if (cJSON_IsNumber (objectItem)) {
  327. if (spCommand == spUnknown) {
  328. spCommand = spSetmotorYMaxCurrent;
  329. }
  330. data.spCommand = spCommand;
  331. data.values.flaotValues.value[0] = objectItem->valuedouble;
  332. }
  333. break;
  334. case clearPeakElectricalMeasurementsTopic:
  335. if (cJSON_IsNumber (objectItem)) {
  336. if (objectItem->valueint == 1) {
  337. data.spCommand = spClearPeakMeasurments;
  338. }
  339. }
  340. break;
  341. case mainBoardRelayTopic:
  342. if (cJSON_IsArray (objectItem)) {
  343. arraySize = cJSON_GetArraySize (objectItem);
  344. if (arraySize == 2) {
  345. int32_t relayNumber = -1;
  346. cJSON* item = cJSON_GetArrayItem (objectItem, 0);
  347. if (cJSON_IsNumber (item)) {
  348. relayNumber = item->valueint;
  349. }
  350. int32_t relayTimeOn = 0;
  351. item = cJSON_GetArrayItem (objectItem, 1);
  352. if (cJSON_IsNumber (item)) {
  353. relayTimeOn = item->valueint;
  354. }
  355. RelayCtrl (relayNumber, relayTimeOn);
  356. }
  357. }
  358. break;
  359. case setEncoderXValue:
  360. if (cJSON_IsNumber (objectItem)) {
  361. data.spCommand = spSetEncoderXValue;
  362. data.values.flaotValues.value[0] = objectItem->valuedouble;
  363. }
  364. break;
  365. case setEncoderYValue:
  366. if (cJSON_IsNumber (objectItem)) {
  367. data.spCommand = spSetEncoderYValue;
  368. data.values.flaotValues.value[0] = objectItem->valuedouble;
  369. }
  370. break;
  371. case setVoltageMeasGains: spCommand = spSetVoltageMeasGains;
  372. case setVoltageMeasOffsets:
  373. if (spCommand == spUnknown) {
  374. spCommand = spSetVoltageMeasOffsets;
  375. }
  376. case setCurrentMeasGains:
  377. if (spCommand == spUnknown) {
  378. spCommand = spSetCurrentMeasGains;
  379. }
  380. case setCurrentMeasOffsets:
  381. if (spCommand == spUnknown) {
  382. spCommand = spSetCurrentMeasOffsets;
  383. }
  384. if (cJSON_IsArray (objectItem)) {
  385. data.spCommand = spCommand;
  386. arraySize = cJSON_GetArraySize (objectItem);
  387. if (arraySize == 3) {
  388. for (int i = 0; i < arraySize; i++) {
  389. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  390. if (cJSON_IsNumber (item)) {
  391. data.values.flaotValues.value[i] = item->valuedouble;
  392. }
  393. }
  394. }
  395. }
  396. break;
  397. case resetSystem:
  398. if (cJSON_IsNumber (objectItem)) {
  399. if (objectItem->valueint == 1) {
  400. data.spCommand = spResetSystem;
  401. break;
  402. }
  403. }
  404. break;
  405. case setPositionX:
  406. if (cJSON_IsNumber (objectItem)) {
  407. data.spCommand = spSetPositonX;
  408. data.values.flaotValues.value[0] = objectItem->valuedouble;
  409. }
  410. break;
  411. case setPositionY:
  412. if (cJSON_IsNumber (objectItem)) {
  413. data.spCommand = spSetPositonY;
  414. data.values.flaotValues.value[0] = objectItem->valuedouble;
  415. }
  416. break;
  417. default: break;
  418. }
  419. }
  420. }
  421. } else {
  422. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  423. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  424. if (objectItem != NULL) {
  425. if (topicCmdNumber == resetSystem) {
  426. __disable_irq ();
  427. NVIC_SystemReset ();
  428. break;
  429. }
  430. }
  431. }
  432. }
  433. switch (topicForBoard) {
  434. case main_board: break;
  435. case board_1:
  436. #ifdef USE_UART8_INSTEAD_UART1
  437. if (uart8TaskData.sendCmdToSlaveQueue != NULL) {
  438. osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  439. }
  440. #else
  441. if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
  442. osMessageQueuePut (uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  443. }
  444. #endif
  445. printf ("Send cmd to board 1\n");
  446. break;
  447. case board_2:
  448. if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
  449. osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  450. }
  451. printf ("Send cmd to board 2\n");
  452. break;
  453. case board_3:
  454. if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
  455. osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  456. }
  457. printf ("Send cmd to board 3\n");
  458. break;
  459. case board_4:
  460. if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
  461. osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  462. }
  463. printf ("Send cmd to board 4\n");
  464. break;
  465. default: break;
  466. }
  467. cJSON_Delete (json);
  468. printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
  469. }
  470. void mqtt_cli_init (void) {
  471. mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
  472. mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
  473. }