mqtt_client.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * mqtt_client.c
  3. *
  4. * Created on: Jun 10, 2024
  5. * Author: jakubski
  6. */
  7. #include <stdio.h>
  8. #if 1
  9. #include "FreeRTOS.h"
  10. #include "task.h"
  11. #include "main.h"
  12. #include "cmsis_os.h"
  13. #include <string.h>
  14. #include "lwip.h"
  15. #include "lwip/api.h"
  16. #include "MQTTClient.h"
  17. #include "MQTTInterface.h"
  18. #include "node-red-config.h"
  19. #define MQTT_BUFSIZE 1024
  20. extern struct netif gnetif; //extern gnetif
  21. const osThreadAttr_t mqttClientSubTaskAttr = {
  22. .name = "mqttClientSubTask",
  23. .stack_size = configMINIMAL_STACK_SIZE * 2,
  24. .priority = (osPriority_t) osPriorityNormal,
  25. };
  26. const osThreadAttr_t mqttClientPubTaskAttr = {
  27. .name = "mqttClientPsubTask",
  28. .stack_size = configMINIMAL_STACK_SIZE * 2,
  29. .priority = (osPriority_t) osPriorityNormal,
  30. };
  31. osThreadId mqttClientSubTaskHandle; //mqtt client task handle
  32. osThreadId mqttClientPubTaskHandle; //mqtt client task handle
  33. Network net; //mqtt network
  34. MQTTClient mqttClient; //mqtt client
  35. uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
  36. uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
  37. uint8_t msgBuffer[MQTT_BUFSIZE]; //mqtt message buffer
  38. void MqttClientSubTask(void *argument); //mqtt client subscribe task function
  39. void MqttClientPubTask(void *argument); //mqtt client publish task function
  40. int MqttConnectBroker(void); //mqtt broker connect function
  41. void MqttMessageArrived(MessageData* msg); //mqtt message callback function
  42. void MqttClientSubTask(void *argument)
  43. {
  44. while(1)
  45. {
  46. //waiting for valid ip address
  47. if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) //system has no valid ip address
  48. {
  49. osDelay(pdMS_TO_TICKS(1000));
  50. continue;
  51. }
  52. else
  53. {
  54. printf("DHCP/Static IP O.K.\n");
  55. break;
  56. }
  57. }
  58. while(1)
  59. {
  60. if(!mqttClient.isconnected)
  61. {
  62. //try to connect to the broker
  63. MQTTDisconnect(&mqttClient);
  64. MqttConnectBroker();
  65. osDelay(pdMS_TO_TICKS(1000));
  66. }
  67. else
  68. {
  69. MQTTYield(&mqttClient, 500); //handle timer
  70. osDelay(pdMS_TO_TICKS(100));
  71. }
  72. }
  73. }
  74. void MqttClientPubTask(void *argument)
  75. {
  76. const char* str = "MQTT message from STM32";
  77. MQTTMessage message;
  78. while(1)
  79. {
  80. if(mqttClient.isconnected)
  81. {
  82. message.payload = (void*)str;
  83. message.payloadlen = strlen(str);
  84. if(is_link_up())
  85. {
  86. MQTTPublish(&mqttClient, "test", &message); //publish a message
  87. }
  88. }
  89. osDelay(pdMS_TO_TICKS(1000));
  90. }
  91. }
  92. int MqttConnectBroker()
  93. {
  94. int ret;
  95. NewNetwork(&net);
  96. ret = ConnectNetwork(&net, BROKER_IP, MQTT_PORT);
  97. if(ret != MQTT_SUCCESS)
  98. {
  99. printf("ConnectNetwork failed.\n");
  100. return -1;
  101. }
  102. MQTTClientInit(&mqttClient, &net, 1000, sndBuffer, sizeof(sndBuffer), rcvBuffer, sizeof(rcvBuffer));
  103. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  104. data.willFlag = 0;
  105. data.MQTTVersion = 3;
  106. data.clientID.cstring = "test_user1";
  107. data.username.cstring = "test_user1";
  108. data.password.cstring = "1234";
  109. data.keepAliveInterval = 100;
  110. data.cleansession = 1;
  111. ret = MQTTConnect(&mqttClient, &data);
  112. if(ret != MQTT_SUCCESS)
  113. {
  114. net_disconnect(&net);
  115. printf("MQTTConnect failed.\n");
  116. return ret;
  117. }
  118. // osDelay(pdMS_TO_TICKS(100));
  119. ret = MQTTSubscribe(&mqttClient, "test_second", QOS0, MqttMessageArrived);
  120. if(ret != MQTT_SUCCESS)
  121. {
  122. net_disconnect(&net);
  123. printf("MQTTSubscribe failed.\n");
  124. return ret;
  125. }
  126. printf("MQTT_ConnectBroker O.K.\n");
  127. return MQTT_SUCCESS;
  128. }
  129. void MqttMessageArrived(MessageData* msg)
  130. {
  131. MQTTMessage* message = msg->message;
  132. memset(msgBuffer, 0, sizeof(msgBuffer));
  133. memcpy(msgBuffer, message->payload,message->payloadlen);
  134. printf("MQTT MSG[%d]:%s\n", (int)message->payloadlen, msgBuffer);
  135. }
  136. void mqtt_cli_init(void)
  137. {
  138. mqttClientSubTaskHandle = osThreadNew(MqttClientSubTask, NULL, &mqttClientSubTaskAttr); //subscribe task
  139. mqttClientPubTaskHandle = osThreadNew(MqttClientPubTask, NULL, &mqttClientPubTaskAttr); //publish task
  140. }
  141. #else
  142. #include "lwip/opt.h"
  143. #include "lwip/arch.h"
  144. #include "lwip/api.h"
  145. #include "lwip/apps/mqtt.h"
  146. #include "cmsis_os.h"
  147. #define MQTT_CLI_THREAD_PRIO ( tskIDLE_PRIORITY + 4 )
  148. #ifndef LWIP_MQTT_EXAMPLE_IPADDR_INIT
  149. #if LWIP_IPV4
  150. #define LWIP_MQTT_EXAMPLE_IPADDR_INIT = IPADDR4_INIT(PP_HTONL(IPADDR_LOOPBACK))
  151. #else
  152. #define LWIP_MQTT_EXAMPLE_IPADDR_INIT
  153. #endif
  154. #endif
  155. static ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT;
  156. static const struct mqtt_connect_client_info_t mqtt_client_info =
  157. {
  158. "test_user1",
  159. "test_user1", /* user */
  160. "1234", /* pass */
  161. 100, /* keep alive */
  162. NULL, /* will_topic */
  163. NULL, /* will_msg */
  164. 0, /* will_qos */
  165. 0 /* will_retain */
  166. #if LWIP_ALTCP && LWIP_ALTCP_TLS
  167. , NULL
  168. #endif
  169. };
  170. static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
  171. {
  172. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  173. // LWIP_UNUSED_ARG(data);
  174. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags));
  175. LWIP_PLATFORM_DIAG(("Data:\n%s\n", data));
  176. }
  177. static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
  178. {
  179. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  180. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len));
  181. }
  182. static void mqtt_request_cb(void *arg, err_t err)
  183. {
  184. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  185. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err));
  186. }
  187. static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
  188. {
  189. const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
  190. // LWIP_UNUSED_ARG(client);
  191. LWIP_PLATFORM_DIAG(("MQTT client \"%s\" connection cb: status %d\n", client_info->client_id, (int)status));
  192. if (status == MQTT_CONNECT_ACCEPTED)
  193. {
  194. mqtt_sub_unsub(client, "topic_qos1", 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
  195. mqtt_sub_unsub(client, "topic_qos0", 0, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
  196. }
  197. }
  198. void mqtt_cli_thread(void *arg)
  199. {
  200. mqtt_client_t* mqtt_client;
  201. osDelay(pdMS_TO_TICKS(7000));
  202. ipaddr_aton("192.168.1.34", &mqtt_ip);
  203. LOCK_TCPIP_CORE();
  204. mqtt_client = mqtt_client_new();
  205. mqtt_client_connect(mqtt_client, &mqtt_ip, MQTT_PORT, mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info), &mqtt_client_info);
  206. mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info));
  207. UNLOCK_TCPIP_CORE();
  208. while(1)
  209. {
  210. osDelay(pdMS_TO_TICKS(100));
  211. }
  212. }
  213. void mqtt_cli_init(void)
  214. {
  215. sys_thread_new("mqtt_cli_netconn", mqtt_cli_thread, NULL, DEFAULT_THREAD_STACKSIZE, MQTT_CLI_THREAD_PRIO);
  216. }
  217. #endif