123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- /*
- * mqtt_client.c
- *
- * Created on: Jun 10, 2024
- * Author: jakubski
- */
- #include <stdio.h>
- #if 1
- #include "FreeRTOS.h"
- #include "task.h"
- #include "main.h"
- #include "cmsis_os.h"
- #include <string.h>
- #include "lwip.h"
- #include "lwip/api.h"
- #include "MQTTClient.h"
- #include "MQTTInterface.h"
- #include "node-red-config.h"
- #define MQTT_BUFSIZE 1024
- extern struct netif gnetif; //extern gnetif
- const osThreadAttr_t mqttClientSubTaskAttr = {
- .name = "mqttClientSubTask",
- .stack_size = configMINIMAL_STACK_SIZE * 2,
- .priority = (osPriority_t) osPriorityNormal,
- };
- const osThreadAttr_t mqttClientPubTaskAttr = {
- .name = "mqttClientPsubTask",
- .stack_size = configMINIMAL_STACK_SIZE * 2,
- .priority = (osPriority_t) osPriorityNormal,
- };
- osThreadId mqttClientSubTaskHandle; //mqtt client task handle
- osThreadId mqttClientPubTaskHandle; //mqtt client task handle
- Network net; //mqtt network
- MQTTClient mqttClient; //mqtt client
- uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer
- uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer
- uint8_t msgBuffer[MQTT_BUFSIZE]; //mqtt message buffer
- void MqttClientSubTask(void *argument); //mqtt client subscribe task function
- void MqttClientPubTask(void *argument); //mqtt client publish task function
- int MqttConnectBroker(void); //mqtt broker connect function
- void MqttMessageArrived(MessageData* msg); //mqtt message callback function
- void MqttClientSubTask(void *argument)
- {
- while(1)
- {
- //waiting for valid ip address
- if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) //system has no valid ip address
- {
- osDelay(pdMS_TO_TICKS(1000));
- continue;
- }
- else
- {
- printf("DHCP/Static IP O.K.\n");
- break;
- }
- }
- while(1)
- {
- if(!mqttClient.isconnected)
- {
- //try to connect to the broker
- MQTTDisconnect(&mqttClient);
- MqttConnectBroker();
- osDelay(pdMS_TO_TICKS(1000));
- }
- else
- {
- MQTTYield(&mqttClient, 500); //handle timer
- osDelay(pdMS_TO_TICKS(100));
- }
- }
- }
- void MqttClientPubTask(void *argument)
- {
- const char* str = "MQTT message from STM32";
- MQTTMessage message;
- while(1)
- {
- if(mqttClient.isconnected)
- {
- message.payload = (void*)str;
- message.payloadlen = strlen(str);
- if(is_link_up())
- {
- MQTTPublish(&mqttClient, "test", &message); //publish a message
- }
- }
- osDelay(pdMS_TO_TICKS(1000));
- }
- }
- int MqttConnectBroker()
- {
- int ret;
- NewNetwork(&net);
- ret = ConnectNetwork(&net, BROKER_IP, MQTT_PORT);
- if(ret != MQTT_SUCCESS)
- {
- printf("ConnectNetwork failed.\n");
- return -1;
- }
- MQTTClientInit(&mqttClient, &net, 1000, sndBuffer, sizeof(sndBuffer), rcvBuffer, sizeof(rcvBuffer));
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.willFlag = 0;
- data.MQTTVersion = 3;
- data.clientID.cstring = "test_user1";
- data.username.cstring = "test_user1";
- data.password.cstring = "1234";
- data.keepAliveInterval = 100;
- data.cleansession = 1;
- ret = MQTTConnect(&mqttClient, &data);
- if(ret != MQTT_SUCCESS)
- {
- net_disconnect(&net);
- printf("MQTTConnect failed.\n");
- return ret;
- }
- // osDelay(pdMS_TO_TICKS(100));
- ret = MQTTSubscribe(&mqttClient, "test_second", QOS0, MqttMessageArrived);
- if(ret != MQTT_SUCCESS)
- {
- net_disconnect(&net);
- printf("MQTTSubscribe failed.\n");
- return ret;
- }
- printf("MQTT_ConnectBroker O.K.\n");
- return MQTT_SUCCESS;
- }
- void MqttMessageArrived(MessageData* msg)
- {
- MQTTMessage* message = msg->message;
- memset(msgBuffer, 0, sizeof(msgBuffer));
- memcpy(msgBuffer, message->payload,message->payloadlen);
- printf("MQTT MSG[%d]:%s\n", (int)message->payloadlen, msgBuffer);
- }
- void mqtt_cli_init(void)
- {
- mqttClientSubTaskHandle = osThreadNew(MqttClientSubTask, NULL, &mqttClientSubTaskAttr); //subscribe task
- mqttClientPubTaskHandle = osThreadNew(MqttClientPubTask, NULL, &mqttClientPubTaskAttr); //publish task
- }
- #else
- #include "lwip/opt.h"
- #include "lwip/arch.h"
- #include "lwip/api.h"
- #include "lwip/apps/mqtt.h"
- #include "cmsis_os.h"
- #define MQTT_CLI_THREAD_PRIO ( tskIDLE_PRIORITY + 4 )
- #ifndef LWIP_MQTT_EXAMPLE_IPADDR_INIT
- #if LWIP_IPV4
- #define LWIP_MQTT_EXAMPLE_IPADDR_INIT = IPADDR4_INIT(PP_HTONL(IPADDR_LOOPBACK))
- #else
- #define LWIP_MQTT_EXAMPLE_IPADDR_INIT
- #endif
- #endif
- static ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT;
- static const struct mqtt_connect_client_info_t mqtt_client_info =
- {
- "test_user1",
- "test_user1", /* user */
- "1234", /* pass */
- 100, /* keep alive */
- NULL, /* will_topic */
- NULL, /* will_msg */
- 0, /* will_qos */
- 0 /* will_retain */
- #if LWIP_ALTCP && LWIP_ALTCP_TLS
- , NULL
- #endif
- };
- static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
- {
- const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
- // LWIP_UNUSED_ARG(data);
- LWIP_PLATFORM_DIAG(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags));
- LWIP_PLATFORM_DIAG(("Data:\n%s\n", data));
- }
- static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
- {
- const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
- LWIP_PLATFORM_DIAG(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len));
- }
- static void mqtt_request_cb(void *arg, err_t err)
- {
- const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
- LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err));
- }
- static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
- {
- const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
- // LWIP_UNUSED_ARG(client);
- LWIP_PLATFORM_DIAG(("MQTT client \"%s\" connection cb: status %d\n", client_info->client_id, (int)status));
- if (status == MQTT_CONNECT_ACCEPTED)
- {
- mqtt_sub_unsub(client, "topic_qos1", 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
- mqtt_sub_unsub(client, "topic_qos0", 0, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1);
- }
- }
- void mqtt_cli_thread(void *arg)
- {
- mqtt_client_t* mqtt_client;
- osDelay(pdMS_TO_TICKS(7000));
- ipaddr_aton("192.168.1.34", &mqtt_ip);
- LOCK_TCPIP_CORE();
- mqtt_client = mqtt_client_new();
- mqtt_client_connect(mqtt_client, &mqtt_ip, MQTT_PORT, mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info), &mqtt_client_info);
- mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info));
- UNLOCK_TCPIP_CORE();
- while(1)
- {
- osDelay(pdMS_TO_TICKS(100));
- }
- }
- void mqtt_cli_init(void)
- {
- sys_thread_new("mqtt_cli_netconn", mqtt_cli_thread, NULL, DEFAULT_THREAD_STACKSIZE, MQTT_CLI_THREAD_PRIO);
- }
- #endif
|