mqtt_client.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*
  2. * mqtt_client.c
  3. *
  4. * Created on: Jun 10, 2024
  5. * Author: jakubski
  6. */
  7. #include <stdio.h>
  8. #if 1
  9. #include "FreeRTOS.h"
  10. #include "task.h"
  11. #include "main.h"
  12. #include "cmsis_os.h"
  13. #include <string.h>
  14. #include "lwip.h"
  15. #include "lwip/api.h"
  16. #include "MQTTClient.h"
  17. #include "MQTTInterface.h"
  18. #include "node-red-config.h"
  19. #include "measurements.h"
  20. #include "cJSON.h"
  21. #include "interprocess_data.h"
  22. #include "uart_tasks.h"
  23. #define MQTT_BUFSIZE 1024
  24. //RESMeasurements resMeasurements[SLAVES_COUNT] = { 0 };
  25. //SesnorsInfo sensorsInfo[SLAVES_COUNT] = { 0 };
  26. //uint32_t slaveLastSeen[SLAVES_COUNT] = { 0 };
  27. //osMutexId_t resMeasurementsMutex;
  28. //osMutexId_t sensorsInfoMutex;
  29. extern RESMeasurements resMeasurements[SLAVES_COUNT];
  30. extern SesnorsInfo sensorsInfo[SLAVES_COUNT];
  31. extern uint32_t slaveLastSeen[SLAVES_COUNT];
  32. extern osMutexId_t resMeasurementsMutex;
  33. extern osMutexId_t sensorsInfoMutex;
  34. char* const subscribeTopicNames[SLAVES_COUNT] = { "Set/1", "Set/2", "Set/3", "Set/4" };
  35. #define MAX_COMMANDS_IN_MQTT_PAYLOAD 6
  36. char* const topicCommands[MAX_COMMANDS_IN_MQTT_PAYLOAD] = {"fanSpeed", "motorXon", "motorYon", "diode", "motorXMaxCurrent", "motorYMaxCurrent"};
  37. enum _BoardNoOverTopic
  38. {
  39. board_1 = 1,
  40. board_2,
  41. board_3,
  42. board_4,
  43. unknownBoard
  44. };
  45. typedef enum _BoardNoOverTopic BoardNoOverTopic;
  46. extern struct netif gnetif; //extern gnetif
  47. extern UartTaskData uart1TaskData; // Board 1
  48. extern UartTaskData uart3TaskData; // Board 2
  49. extern UartTaskData uart6TaskData; // Board 3
  50. extern UartTaskData uart2TaskData; // Board 4
  51. extern UartTaskData uart8TaskData; // Debug
  52. const osThreadAttr_t mqttClientSubTaskAttr =
  53. { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  54. .priority = (osPriority_t) osPriorityNormal, };
  55. const osThreadAttr_t mqttClientPubTaskAttr =
  56. { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 4,
  57. .priority = (osPriority_t) osPriorityNormal, };
  58. osThreadId mqttClientSubTaskHandle; //mqtt client task handle
  59. osThreadId mqttClientPubTaskHandle; //mqtt client task handle
  60. Network net; //mqtt network
  61. MQTTClient mqttClient; //mqtt client
  62. uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
  63. uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
  64. void MqttClientSubTask(void *argument); //mqtt client subscribe task function
  65. void MqttClientPubTask(void *argument); //mqtt client publish task function
  66. int MqttConnectBroker(void); //mqtt broker connect function
  67. void MqttMessageArrived(MessageData *msg); //mqtt message callback function
  68. void MqttClientSubTask (void* argument) {
  69. while (1) {
  70. // waiting for valid ip address
  71. if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) // system has no valid ip address
  72. {
  73. osDelay (pdMS_TO_TICKS (1000));
  74. continue;
  75. } else {
  76. printf ("DHCP/Static IP O.K.\n");
  77. break;
  78. }
  79. }
  80. while (1) {
  81. if (!mqttClient.isconnected) {
  82. // try to connect to the broker
  83. if (MqttConnectBroker () != MQTT_SUCCESS) {
  84. osDelay (pdMS_TO_TICKS (1000));
  85. }
  86. } else {
  87. MQTTYield (&mqttClient, 500); // handle timer
  88. osDelay (pdMS_TO_TICKS (100));
  89. }
  90. }
  91. }
  92. void MqttClientPubTask (void* argument) {
  93. char messageBuffer[512] = { 0x00 };
  94. char topicTextBuffer[32] = { 0x00 };
  95. uint32_t bytesInBuffer = 0;
  96. uint8_t boardNumber = 0;
  97. MQTTMessage message;
  98. resMeasurementsMutex = osMutexNew (NULL);
  99. sensorsInfoMutex = osMutexNew (NULL);
  100. while (1) {
  101. if (mqttClient.isconnected) {
  102. if (is_link_up ()) {
  103. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  104. osMutexAcquire (resMeasurementsMutex, osWaitForever);
  105. RESMeasurements* resMeas = &resMeasurements[boardNumber];
  106. sprintf (topicTextBuffer, "RESmeasurments/%d", boardNumber + 1);
  107. bytesInBuffer = sprintf (messageBuffer, "{\"voltageRMS\":[%.2f, %.2f, %.2f], ", resMeas->voltageRMS[0], resMeas->voltageRMS[1], resMeas->voltageRMS[2]);
  108. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"voltagePeak\":[%.2f, %.2f, %.2f], ", resMeas->voltagePeak[0], resMeas->voltagePeak[1], resMeas->voltagePeak[2]);
  109. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentRMS\":[%.3f, %.3f, %.3f], ", resMeas->currentRMS[0], resMeas->currentRMS[1], resMeas->currentRMS[2]);
  110. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"currentPeak\":[%.3f, %.3f, %.3f], ", resMeas->currentPeak[0], resMeas->currentPeak[1], resMeas->currentPeak[2]);
  111. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"power\":[%.2f, %.2f, %.2f], ", resMeas->power[0], resMeas->power[1], resMeas->power[2]);
  112. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"lastSeen\": %ld}", slaveLastSeen[boardNumber]);
  113. osMutexRelease (resMeasurementsMutex);
  114. message.payload = (void*)messageBuffer;
  115. message.payloadlen = strlen (messageBuffer);
  116. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  117. }
  118. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  119. osMutexAcquire (sensorsInfoMutex, osWaitForever);
  120. SesnorsInfo* sensors = &sensorsInfo[boardNumber];
  121. sprintf (topicTextBuffer, "Sensors/%d", boardNumber + 1);
  122. bytesInBuffer = sprintf (messageBuffer, "{\"pvTemperature\":[%.1f, %.1f], ", sensors->pvTemperature[0], sensors->pvTemperature[1]);
  123. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"fanVoltage\":%.2f, ", sensors->fanVoltage);
  124. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"pvEncoder\":%.2f, ", sensors->pvEncoder);
  125. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXStatus\":%d, ", sensors->motorXStatus);
  126. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYStatus\":%d, ", sensors->motorYStatus);
  127. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXAveCurrent\":%.3f, ", sensors->motorXAveCurrent);
  128. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYAveCurrent\":%.3f, ", sensors->motorYAveCurrent);
  129. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorXPeakCurrent\":%.3f, ", sensors->motorXPeakCurrent);
  130. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"motorYPeakCurrent\":%.3f, ", sensors->motorYPeakCurrent);
  131. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchUp\":%d, ", sensors->limitSwitchUp);
  132. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchDown\":%d, ", sensors->limitSwitchDown);
  133. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"limitSwitchCenter\":%d, ", sensors->limitSwitchCenter);
  134. bytesInBuffer += sprintf (&messageBuffer[bytesInBuffer], "\"powerSupplyFailMask\":%d}", sensors->powerSupplyFailMask);
  135. osMutexRelease (sensorsInfoMutex);
  136. message.payload = (void*)messageBuffer;
  137. message.payloadlen = strlen (messageBuffer);
  138. MQTTPublish (&mqttClient, topicTextBuffer, &message); // publish a message
  139. }
  140. }
  141. }
  142. osDelay (pdMS_TO_TICKS (1000));
  143. }
  144. }
  145. int MqttConnectBroker () {
  146. uint8_t boardNumber = 0;
  147. int ret;
  148. NewNetwork (&net);
  149. ret = ConnectNetwork (&net, BROKER_IP, MQTT_PORT);
  150. if (ret != MQTT_SUCCESS) {
  151. printf ("ConnectNetwork failed.\n");
  152. return -1;
  153. }
  154. MQTTClientInit (&mqttClient, &net, 1000, sndBuffer, sizeof (sndBuffer), rcvBuffer, sizeof (rcvBuffer));
  155. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  156. data.willFlag = 0;
  157. data.MQTTVersion = 3;
  158. data.clientID.cstring = "test_user1";
  159. data.username.cstring = "test_user1";
  160. data.password.cstring = "1234";
  161. data.keepAliveInterval = 100;
  162. data.cleansession = 1;
  163. ret = MQTTConnect (&mqttClient, &data);
  164. if (ret != MQTT_SUCCESS) {
  165. net_disconnect (&net);
  166. printf ("MQTTConnect failed. Code %d\n", ret);
  167. return ret;
  168. }
  169. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  170. ret = MQTTSubscribe (&mqttClient, subscribeTopicNames[boardNumber], QOS0, MqttMessageArrived);
  171. if (ret != MQTT_SUCCESS) {
  172. net_disconnect (&net);
  173. printf ("MQTTSubscribe failed.\n");
  174. return ret;
  175. }
  176. }
  177. printf ("MQTT_ConnectBroker O.K.\n");
  178. return MQTT_SUCCESS;
  179. }
  180. void MqttMessageArrived (MessageData* msg) {
  181. SerialProtocolCommands spCommand = spUnknown;
  182. BoardNoOverTopic topicForBoard = unknownBoard;
  183. uint8_t boardNumber = 0;
  184. MQTTMessage* message = msg->message;
  185. char topicName[32] = { 0 };
  186. memcpy (topicName, msg->topicName->lenstring.data, msg->topicName->lenstring.len);
  187. for (boardNumber = 0; boardNumber < SLAVES_COUNT; boardNumber++) {
  188. if (strcmp (topicName, subscribeTopicNames[boardNumber]) == 0) {
  189. topicForBoard = (BoardNoOverTopic)(boardNumber + 1);
  190. }
  191. }
  192. cJSON* json = cJSON_Parse (message->payload);
  193. const cJSON* objectItem = NULL;
  194. InterProcessData data = { 0 };
  195. for (int topicCmdNumber = 0; topicCmdNumber < 6; topicCmdNumber++) {
  196. spCommand = spUnknown;
  197. objectItem = cJSON_GetObjectItemCaseSensitive (json, topicCommands[topicCmdNumber]);
  198. if (objectItem != NULL) {
  199. switch (topicCmdNumber) {
  200. case 0: spCommand = spSetFanSpeed;
  201. case 1:
  202. if (spCommand == spUnknown) {
  203. spCommand = spSetMotorXOn;
  204. }
  205. case 2:
  206. if (spCommand == spUnknown) {
  207. spCommand = spSetMotorYOn;
  208. }
  209. if (cJSON_IsArray (objectItem)) {
  210. data.spCommand = spCommand;
  211. int arraySize = cJSON_GetArraySize (objectItem);
  212. if (arraySize == 2) {
  213. for (int i = 0; i < arraySize; i++) {
  214. cJSON* item = cJSON_GetArrayItem (objectItem, i);
  215. if (cJSON_IsNumber (item)) {
  216. data.values.integerValues.value[i] = item->valueint;
  217. }
  218. }
  219. }
  220. }
  221. break;
  222. case 3:
  223. data.spCommand = spSetDiodeOn;
  224. data.values.integerValues.value[0] = objectItem->valueint;
  225. data.values.integerValues.value[1] = 0;
  226. break;
  227. case 4: spCommand = spSetmotorXMaxCurrent;
  228. case 5:
  229. if (spCommand == spUnknown) {
  230. spCommand = spSetmotorYMaxCurrent;
  231. }
  232. data.spCommand = spCommand;
  233. data.values.flaotValues.value[0] = objectItem->valuedouble;
  234. data.values.flaotValues.value[1] = 0.0;
  235. break;
  236. default: break;
  237. }
  238. switch (topicForBoard) {
  239. case board_1:
  240. #if 0
  241. if (uart1TaskData.sendCmdToSlaveQueue != NULL) {
  242. osMessageQueuePut (
  243. uart1TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  244. }
  245. #else
  246. if (uart8TaskData.sendCmdToSlaveQueue != NULL) {
  247. osMessageQueuePut (uart8TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  248. }
  249. #endif
  250. printf ("Send cmd to board 1\n");
  251. break;
  252. case board_2:
  253. if (uart3TaskData.sendCmdToSlaveQueue != NULL) {
  254. osMessageQueuePut (uart3TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  255. }
  256. printf ("Send cmd to board 2\n");
  257. break;
  258. case board_3:
  259. if (uart6TaskData.sendCmdToSlaveQueue != NULL) {
  260. osMessageQueuePut (uart6TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  261. }
  262. printf ("Send cmd to board 3\n");
  263. break;
  264. case board_4:
  265. if (uart2TaskData.sendCmdToSlaveQueue != NULL) {
  266. osMessageQueuePut (uart2TaskData.sendCmdToSlaveQueue, &data, 0, (TickType_t)100);
  267. }
  268. printf ("Send cmd to board 4\n");
  269. break;
  270. default: break;
  271. }
  272. }
  273. }
  274. cJSON_Delete (json);
  275. // printf ("MQTT MSG[%d]:%s\n", (int)message->payloadlen, (char*)message->payload);
  276. printf ("MQTT Topic:%s, MSG[%d]:%s\n", topicName, (int)message->payloadlen, (char*)message->payload);
  277. }
  278. void mqtt_cli_init (void) {
  279. mqttClientSubTaskHandle = osThreadNew (MqttClientSubTask, NULL, &mqttClientSubTaskAttr); // subscribe task
  280. mqttClientPubTaskHandle = osThreadNew (MqttClientPubTask, NULL, &mqttClientPubTaskAttr); // publish task
  281. }
  282. #else
  283. #include "lwip/opt.h"
  284. #include "lwip/arch.h"
  285. #include "lwip/api.h"
  286. #include "lwip/apps/mqtt.h"
  287. #include "cmsis_os.h"
  288. #define MQTT_CLI_THREAD_PRIO ( tskIDLE_PRIORITY + 4 )
  289. #ifndef LWIP_MQTT_EXAMPLE_IPADDR_INIT
  290. #if LWIP_IPV4
  291. #define LWIP_MQTT_EXAMPLE_IPADDR_INIT = IPADDR4_INIT(PP_HTONL(IPADDR_LOOPBACK))
  292. #else
  293. #define LWIP_MQTT_EXAMPLE_IPADDR_INIT
  294. #endif
  295. #endif
  296. static ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT;
  297. static const struct mqtt_connect_client_info_t mqtt_client_info =
  298. {
  299. "test_user1",
  300. "test_user1", /* user */
  301. "1234", /* pass */
  302. 100, /* keep alive */
  303. NULL, /* will_topic */
  304. NULL, /* will_msg */
  305. 0, /* will_qos */
  306. 0 /* will_retain */
  307. #if LWIP_ALTCP && LWIP_ALTCP_TLS
  308. , NULL
  309. #endif
  310. };
  311. static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
  312. {
  313. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  314. // LWIP_UNUSED_ARG(data);
  315. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags));
  316. LWIP_PLATFORM_DIAG(("Data:\n%s\n", data));
  317. }
  318. static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
  319. {
  320. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  321. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len));
  322. }
  323. static void mqtt_request_cb(void *arg, err_t err)
  324. {
  325. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  326. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err));
  327. }VES_COUNT; boardNumber++)
  328. {
  329. static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
  330. {
  331. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  332. // LWIP_UNUSED_ARG(client);
  333. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" connection cb: status %d\n", client_info->client_id, (int)status));
  334. if (status == MQTT_CONNECT_ACCEPTED)
  335. {
  336. mqtt_sub_unsub(client, "topic_qos1", 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
  337. mqtt_sub_unsub(client, "topic_qos0", 0, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
  338. }
  339. }
  340. void mqtt_cli_thread(void *arg)
  341. {
  342. mqtt_client_t* mqtt_client;
  343. osDelay(pdMS_TO_TICKS(7000));
  344. ipaddr_aton("192.168.1.34", &mqtt_ip);
  345. LOCK_TCPIP_CORE();
  346. mqtt_client = mqtt_client_new();
  347. mqtt_client_connect(mqtt_client, &mqtt_ip, MQTT_PORT, mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info), &mqtt_client_info);
  348. mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info));
  349. UNLOCK_TCPIP_CORE();
  350. while(1)
  351. {
  352. osDelay(pdMS_TO_TICKS(100));
  353. }
  354. }
  355. void mqtt_cli_init(void)
  356. {
  357. sys_thread_new("mqtt_cli_netconn", mqtt_cli_thread, NULL, DEFAULT_THREAD_STACKSIZE, MQTT_CLI_THREAD_PRIO);
  358. }
  359. #endif