mqtt_client.c 20 KB


  1. /*
  2. * mqtt_client.c
  3. *
  4. * Created on: Jun 10, 2024
  5. * Author: jakubski
  6. */
  7. #include <stdio.h>
  8. #include "FreeRTOS.h"
  9. #include "task.h"
  10. #include "main.h"
  11. #include "cmsis_os.h"
  12. #include <string.h>
  13. #include "lwip.h"
  14. #include "lwip/api.h"
  15. #include "MQTTClient.h"
  16. #include "MQTTInterface.h"
  17. #include "node-red-config.h"
  18. #include "measurements.h"
  19. #include "cJSON.h"
  20. #include "interprocess_data.h"
  21. #include "uart_tasks.h"
  22. #define MQTT_BUFSIZE 1024
  23. char* const subscribeTopicNames[MASTER_BOARD + SLAVES_COUNT] = { "Set/0", "Set/1", "Set/2", "Set/3", "Set/4" };
  24. #define MAX_COMMANDS_IN_MQTT_PAYLOAD 15
  25. char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = { "fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent", "clearPeakElectricalMeasurements", "mainBoardRelay",
  26. "setEncoderXValue", "setEncoderYValue", "setVoltageMeasGains", "setVoltageMeasOffsets", "setCurrentMeasGains", "setCurrentMeasOffsets", "resetSystem" };
  27. enum _Topics
  28. {
  29. fanSpeedTopic = 0,
  30. motorXonTopic,
  31. motorYonTopic,
  32. diodeTopic,
  33. motorXMaxCurrentTopic,
  34. motorYMaxCurrentTopic,
  35. clearPeakElectricalMeasurementsTopic,
  36. mainBoardRelayTopic,
  37. setEncoderXValue,
  38. setEncoderYValue,
  39. setVoltageMeasGains,
  40. setVoltageMeasOffsets,
  41. setCurrentMeasGains,
  42. setCurrentMeasOffsets,
  43. resetSystem,
  44. };
  45. enum _BoardNoOverTopic
  46. {
  47. main_board = 0,
  48. board_1,
  49. board_2,
  50. board_3,
  51. board_4,
  52. unknownBoard
  53. };
  54. typedef enum _BoardNoOverTopic BoardNoOverTopic;
  55. extern struct netif gnetif; //extern gnetif
  56. extern UartTaskData uart1TaskData; // Board 1
  57. extern UartTaskData uart3TaskData; // Board 2
  58. extern UartTaskData uart6TaskData; // Board 3
  59. extern UartTaskData uart2TaskData; // Board 4
  60. extern UartTaskData uart8TaskData; // Debug
  61. extern osTimerId_t relay1TimerHandle;
  62. extern osTimerId_t relay2TimerHandle;
  63. extern osTimerId_t relay3TimerHandle;
  64. extern osTimerId_t relay4TimerHandle;
  65. const osThreadAttr_t mqttClientSubTaskAttr =
  66. { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  67. .priority = (osPriority_t) osPriorityNormal, };
  68. const osThreadAttr_t mqttClientPubTaskAttr =
  69. { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  70. .priority = (osPriority_t) osPriorityNormal, };
  71. osThreadId mqttClientSubTaskHandle; //mqtt client task handle
  72. osThreadId mqttClientPubTaskHandle; //mqtt client task handle
  73. Network net; //mqtt network
  74. MQTTClient mqttClient; //mqtt client
  75. uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
  76. uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
  77. void RelayCtrl(int32_t relayNumber, int32_t relayTimeOn);
  78. void MqttClientSubTask (void* argument); // mqtt client subscribe task function
  79. void MqttClientPubTask (void* argument); // mqtt client publish task function
  80. uint32_t MqttConnectBroker (void); // mqtt broker connect function
  81. void MqttMessageArrived (MessageData* msg); // mqtt message callback function
  82. void MqttClientSubTask (void* argument) {
  83. while (1) {
  84. // waiting for valid ip address
  85. if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
  86. {
  87. osDelay (pdMS_TO_TICKS (1000));
  88. continue;
  89. } else {
  90. printf ("DHCP/Static IP O.K.\n");
  91. break;
  92. }
  93. }
  94. while (1) {
  95. if (!mqttClient.isconnected) {
  96. // try to connect to the broker
  97. if (MqttConnectBroker () != MQTT_SUCCESS) {
  98. osDelay (pdMS_TO_TICKS (1000));
  99. }
  100. } else {
  101. MQTTYield (&mqttClient, 500); // handle timer
  102. osDelay (pdMS_TO_TICKS (100));
  103. }
  104. }
  105. }
  106. void MqttClientPubTask (void* argument) {
  107. char messageBuffer[512] = { 0x00 };
  108. char topicTextBuffer[32] = { 0x00 };
  109. uint32_t bytesInBuffer = 0;
  110. uint8_t boardNumber = 0;
  111. MQTTMessage message;
  112. while (1) {
  113. if (mqttClient.isconnected) {
  114. if (is_link_up ()) {
  115. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  116. osMutexAcquire (resMeasurementsMutex, osWaitForever);
  117. RESMeasurements* resMeas = &resMeasurements[boardNumber];
  118. sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
  119. bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
  120. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
  121. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
  122. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
  123. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
  124. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]);
  125. osMutexRelease (resMeasurementsMutex);
  126. message.payload = (void*)messageBuffer;
  127. message.payloadlen = strlen (messageBuffer);
  128. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  129. }
  130. for (boardNumber = 0; boardNumber < SLAVES_COUNT + 1; boardNumber++) {
  131. if (boardNumber > 0) {
  132. osMutexAcquire (sensorsInfoMutex, osWaitForever);
  133. SesnorsInfo* sensors = &sensorsInfo[boardNumber - 1];
  134. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  135. bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
  136. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
  137. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderX\":%.2f, ", sensors->pvEncoderX);
  138. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoderY\":%.2f, ", sensors->pvEncoderY);
  139. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
  140. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
  141. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
  142. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
  143. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
  144. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
  145. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchUp\":%d, ", sensors->limitXSwitchUp);
  146. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchDown\":%d, ", sensors->limitXSwitchDown);
  147. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitXSwitchCenter\":%d, ", sensors->limitXSwitchCenter);
  148. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchUp\":%d, ", sensors->limitYSwitchUp);
  149. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchDown\":%d, ", sensors->limitYSwitchDown);
  150. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitYSwitchCenter\":%d, ", sensors->limitYSwitchCenter);
  151. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
  152. osMutexRelease (sensorsInfoMutex);
  153. } else {
  154. sprintf (topicTextBuffer, "Sensors/%d", boardNumber);
  155. uint8_t mainBoardPowerSupplyFailMask = ~((HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_4) << 1) | HAL_GPIO_ReadPin (GPIOD, GPIO_PIN_2)) & 0x3;
  156. bytesInBuffer = sprintf (messageBuffer, "{\"powerSupplyFailMask\":%d}", mainBoardPowerSupplyFailMask);
  157. }
  158. message.payload = (void*)messageBuffer;
  159. message.payloadlen = strlen (messageBuffer);
  160. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  161. }
  162. }
  163. }
  164. osDelay (pdMS_TO_TICKS (1000));
  165. }
  166. }
  167. uint32_t MqttConnectBroker () {
  168. uint8_t boardNumber = 0;
  169. int ret;
  170. NewNetwork (&net);
  171. ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
  172. if (ret != MQTT_SUCCESS) {
  173. printf ("ConnectNetwork failed.\n");
  174. return -1;
  175. }
  176. MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
  177. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  178. data.willFlag = 0;
  179. data.MQTTVersion = 3;
  180. data.clientID.cstring = "test_user1";
  181. data.username.cstring = "test_user1";
  182. data.password.cstring = "1234";
  183. data.keepAliveInterval = 100;
  184. data.cleansession = 1;
  185. ret = MQTTConnect (&mqttClient, &data);
  186. if (ret != MQTT_SUCCESS) {
  187. net_disconnect (&net);
  188. printf ("MQTTConnect failed. Code %d\n", ret);
  189. return ret;
  190. }
  191. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  192. ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
  193. if (ret != MQTT_SUCCESS) {
  194. net_disconnect (&net);
  195. printf ("MQTTSubscribe failed.\n");
  196. return ret;
  197. }
  198. }
  199. printf ("MQTT_ConnectBroker O.K.\n");
  200. return MQTT_SUCCESS;
  201. }
  202. void RelayCtrl (int32_t relayNumber, int32_t relayTimeOn) {
  203. switch (relayNumber) {
  204. case 1:
  205. if (relayTimeOn > 0) {
  206. osTimerStart (relay1TimerHandle, relayTimeOn * 1000);
  207. } else {
  208. osTimerStop (relay1TimerHandle);
  209. }
  210. if (relayTimeOn != 0) {
  211. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_SET);
  212. } else {
  213. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_5, GPIO_PIN_RESET);
  214. }
  215. break;
  216. case 2:
  217. if (relayTimeOn > 0) {
  218. osTimerStart (relay2TimerHandle, relayTimeOn * 1000);
  219. } else {
  220. osTimerStop (relay2TimerHandle);
  221. }
  222. if (relayTimeOn != 0) {
  223. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_SET);
  224. } else {
  225. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_3, GPIO_PIN_RESET);
  226. }
  227. break;
  228. case 3:
  229. if (relayTimeOn > 0) {
  230. osTimerStart (relay3TimerHandle, relayTimeOn * 1000);
  231. } else {
  232. osTimerStop (relay3TimerHandle);
  233. }
  234. if (relayTimeOn != 0) {
  235. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_SET);
  236. } else {
  237. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_4, GPIO_PIN_RESET);
  238. }
  239. break;
  240. case 4:
  241. if (relayTimeOn > 0) {
  242. osTimerStart (relay4TimerHandle, relayTimeOn * 1000);
  243. } else {
  244. osTimerStop (relay4TimerHandle);
  245. }
  246. if (relayTimeOn != 0) {
  247. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_SET);
  248. } else {
  249. HAL_GPIO_WritePin (GPIOE, GPIO_PIN_2, GPIO_PIN_RESET);
  250. }
  251. break;
  252. default: break;
  253. }
  254. }
  255. void MqttMessageArrived (MessageData* msg) {
  256. SerialProtocolCommands spCommand = spUnknown;
  257. BoardNoOverTopic topicForBoard = unknownBoard;
  258. uint8_t boardNumber = 0;
  259. MQTTMessage* message = msg->message;
  260. char topicName[32] = { 0 };
  261. memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
  262. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  263. if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
  264. topicForBoard = (BoardNoOverTopic)(boardNumber);
  265. break;
  266. }
  267. }
  268. if (topicForBoard == unknownBoard) {
  269. return;
  270. }
  271. cJSON* json = cJSON_Parse (message->payload);
  272. const cJSON* objectItem = NULL;
  273. InterProcessData data = { 0 };
  274. uint32_t arraySize = 0;
  275. if (topicForBoard != main_board) {
  276. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  277. spCommand = spUnknown;
  278. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  279. if (objectItem != NULL) {
  280. switch (topicCmdNumber) {
  281. case fanSpeedTopic: spCommand = spSetFanSpeed;
  282. case motorXonTopic:
  283. if (spCommand == spUnknown) {
  284. spCommand = spSetMotorXOn;
  285. }
  286. case motorYonTopic:
  287. if (spCommand == spUnknown) {
  288. spCommand = spSetMotorYOn;
  289. }
  290. if (cJSON_IsArray (objectItem)) {
  291. data.spCommand = spCommand;
  292. arraySize = cJSON_GetArraySize (objectItem);
  293. if (arraySize == 2) {
  294. for (int i = 0; i < arraySize; i++) {
  295. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  296. if (cJSON_IsNumber (item)) {
  297. data.values.integerValues.value[i] = item->valueint;
  298. }
  299. }
  300. }
  301. }
  302. break;
  303. case diodeTopic:
  304. if (cJSON_IsNumber (objectItem)) {
  305. data.spCommand = spSetDiodeOn;
  306. data.values.integerValues.value[0] = objectItem->valueint;
  307. }
  308. break;
  309. case motorXMaxCurrentTopic: spCommand = spSetmotorXMaxCurrent;
  310. case motorYMaxCurrentTopic:
  311. if (cJSON_IsNumber (objectItem)) {
  312. if (spCommand == spUnknown) {
  313. spCommand = spSetmotorYMaxCurrent;
  314. }
  315. data.spCommand = spCommand;
  316. data.values.flaotValues.value[0] = objectItem->valuedouble;
  317. }
  318. break;
  319. case clearPeakElectricalMeasurementsTopic:
  320. if (cJSON_IsNumber (objectItem)) {
  321. if (objectItem->valueint == 1) {
  322. data.spCommand = spClearPeakMeasurments;
  323. }
  324. }
  325. break;
  326. case mainBoardRelayTopic:
  327. if (cJSON_IsArray (objectItem)) {
  328. arraySize = cJSON_GetArraySize (objectItem);
  329. if (arraySize == 2) {
  330. int32_t relayNumber = -1;
  331. cJSON* item = cJSON_GetArrayItem (objectItem, 0);
  332. if (cJSON_IsNumber (item)) {
  333. relayNumber = item->valueint;
  334. }
  335. int32_t relayTimeOn = 0;
  336. item = cJSON_GetArrayItem (objectItem, 1);
  337. if (cJSON_IsNumber (item)) {
  338. relayTimeOn = item->valueint;
  339. }
  340. RelayCtrl (relayNumber, relayTimeOn);
  341. }
  342. }
  343. break;
  344. case setEncoderXValue:
  345. if (cJSON_IsNumber (objectItem)) {
  346. data.spCommand = spSetEncoderXValue;
  347. data.values.flaotValues.value[0] = objectItem->valuedouble;
  348. }
  349. break;
  350. case setEncoderYValue:
  351. if (cJSON_IsNumber (objectItem)) {
  352. data.spCommand = spSetEncoderYValue;
  353. data.values.flaotValues.value[0] = objectItem->valuedouble;
  354. }
  355. break;
  356. case setVoltageMeasGains: spCommand = spSetVoltageMeasGains;
  357. case setVoltageMeasOffsets:
  358. if (spCommand == spUnknown) {
  359. spCommand = spSetVoltageMeasOffsets;
  360. }
  361. case setCurrentMeasGains:
  362. if (spCommand == spUnknown) {
  363. spCommand = spSetCurrentMeasGains;
  364. }
  365. case setCurrentMeasOffsets:
  366. if (spCommand == spUnknown) {
  367. spCommand = spSetCurrentMeasOffsets;
  368. }
  369. if (cJSON_IsArray (objectItem)) {
  370. data.spCommand = spCommand;
  371. arraySize = cJSON_GetArraySize (objectItem);
  372. if (arraySize == 3) {
  373. for (int i = 0; i < arraySize; i++) {
  374. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  375. if (cJSON_IsNumber (item)) {
  376. data.values.flaotValues.value[i] = item->valuedouble;
  377. }
  378. }
  379. }
  380. }
  381. break;
  382. case resetSystem:
  383. if (cJSON_IsNumber (objectItem)) {
  384. if (objectItem->valueint == 1) {
  385. data.spCommand = spResetSystem;
  386. break;
  387. }
  388. }
  389. default: break;
  390. }
  391. }
  392. }
  393. } else {
  394. for (int topicCmdNumber = 0; topicCmdNumber < MAX_COMMANDS_IN_MQTT_PAYLOAD; topicCmdNumber++) {
  395. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  396. if (objectItem != NULL) {
  397. if (topicCmdNumber == resetSystem) {
  398. __disable_irq ();
  399. NVIC_SystemReset ();
  400. break;
  401. }
  402. }
  403. }
  404. }
  405. switch (topicForBoard) {
  406. case main_board: break;
  407. case board_1:
  408. if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
  409. osMessageQueuePut (uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  410. }
  411. printf ("Send cmd to board 1\n");
  412. break;
  413. case board_2:
  414. if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
  415. osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  416. }
  417. printf ("Send cmd to board 2\n");
  418. break;
  419. case board_3:
  420. if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
  421. osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  422. }
  423. printf ("Send cmd to board 3\n");
  424. break;
  425. case board_4:
  426. if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
  427. osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  428. }
  429. printf ("Send cmd to board 4\n");
  430. break;
  431. default: break;
  432. }
  433. cJSON_Delete (json);
  434. printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
  435. }
  436. void mqtt_cli_init (void) {
  437. mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
  438. mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
  439. }