/* * mqtt_client.c * * Created on: Jun 10, 2024 * Author: jakubski */ #include #if 1 #include "FreeRTOS.h" #include "task.h" #include "main.h" #include "cmsis_os.h" #include #include "lwip.h" #include "lwip/api.h" #include "MQTTClient.h" #include "MQTTInterface.h" #include "node-red-config.h" #define MQTT_BUFSIZE 1024 extern struct netif gnetif; //extern gnetif const osThreadAttr_t mqttClientSubTaskAttr = { .name = "mqttClientSubTask", .stack_size = configMINIMAL_STACK_SIZE * 2, .priority = (osPriority_t) osPriorityNormal, }; const osThreadAttr_t mqttClientPubTaskAttr = { .name = "mqttClientPsubTask", .stack_size = configMINIMAL_STACK_SIZE * 2, .priority = (osPriority_t) osPriorityNormal, }; osThreadId mqttClientSubTaskHandle; //mqtt client task handle osThreadId mqttClientPubTaskHandle; //mqtt client task handle Network net; //mqtt network MQTTClient mqttClient; //mqtt client uint8_t sndBuffer[MQTT_BUFSIZE]; //mqtt send buffer uint8_t rcvBuffer[MQTT_BUFSIZE]; //mqtt receive buffer uint8_t msgBuffer[MQTT_BUFSIZE]; //mqtt message buffer void MqttClientSubTask(void *argument); //mqtt client subscribe task function void MqttClientPubTask(void *argument); //mqtt client publish task function int MqttConnectBroker(void); //mqtt broker connect function void MqttMessageArrived(MessageData* msg); //mqtt message callback function void MqttClientSubTask(void *argument) { while(1) { //waiting for valid ip address if (gnetif.ip_addr.addr == 0 || gnetif.netmask.addr == 0 || gnetif.gw.addr == 0) //system has no valid ip address { osDelay(pdMS_TO_TICKS(1000)); continue; } else { printf("DHCP/Static IP O.K.\n"); break; } } while(1) { if(!mqttClient.isconnected) { //try to connect to the broker MQTTDisconnect(&mqttClient); MqttConnectBroker(); osDelay(pdMS_TO_TICKS(1000)); } else { MQTTYield(&mqttClient, 500); //handle timer osDelay(pdMS_TO_TICKS(100)); } } } void MqttClientPubTask(void *argument) { const char* str = "MQTT message from STM32"; MQTTMessage message; while(1) { if(mqttClient.isconnected) { message.payload = (void*)str; message.payloadlen = strlen(str); if(is_link_up()) { MQTTPublish(&mqttClient, "test", &message); //publish a message } } osDelay(pdMS_TO_TICKS(1000)); } } int MqttConnectBroker() { int ret; NewNetwork(&net); ret = ConnectNetwork(&net, BROKER_IP, MQTT_PORT); if(ret != MQTT_SUCCESS) { printf("ConnectNetwork failed.\n"); return -1; } MQTTClientInit(&mqttClient, &net, 1000, sndBuffer, sizeof(sndBuffer), rcvBuffer, sizeof(rcvBuffer)); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.willFlag = 0; data.MQTTVersion = 3; data.clientID.cstring = "test_user1"; data.username.cstring = "test_user1"; data.password.cstring = "1234"; data.keepAliveInterval = 100; data.cleansession = 1; ret = MQTTConnect(&mqttClient, &data); if(ret != MQTT_SUCCESS) { net_disconnect(&net); printf("MQTTConnect failed.\n"); return ret; } // osDelay(pdMS_TO_TICKS(100)); ret = MQTTSubscribe(&mqttClient, "test_second", QOS0, MqttMessageArrived); if(ret != MQTT_SUCCESS) { net_disconnect(&net); printf("MQTTSubscribe failed.\n"); return ret; } printf("MQTT_ConnectBroker O.K.\n"); return MQTT_SUCCESS; } void MqttMessageArrived(MessageData* msg) { MQTTMessage* message = msg->message; memset(msgBuffer, 0, sizeof(msgBuffer)); memcpy(msgBuffer, message->payload,message->payloadlen); printf("MQTT MSG[%d]:%s\n", (int)message->payloadlen, msgBuffer); } void mqtt_cli_init(void) { mqttClientSubTaskHandle = osThreadNew(MqttClientSubTask, NULL, &mqttClientSubTaskAttr); //subscribe task mqttClientPubTaskHandle = osThreadNew(MqttClientPubTask, NULL, &mqttClientPubTaskAttr); //publish task } #else #include "lwip/opt.h" #include "lwip/arch.h" #include "lwip/api.h" #include "lwip/apps/mqtt.h" #include "cmsis_os.h" #define MQTT_CLI_THREAD_PRIO ( tskIDLE_PRIORITY + 4 ) #ifndef LWIP_MQTT_EXAMPLE_IPADDR_INIT #if LWIP_IPV4 #define LWIP_MQTT_EXAMPLE_IPADDR_INIT = IPADDR4_INIT(PP_HTONL(IPADDR_LOOPBACK)) #else #define LWIP_MQTT_EXAMPLE_IPADDR_INIT #endif #endif static ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT; static const struct mqtt_connect_client_info_t mqtt_client_info = { "test_user1", "test_user1", /* user */ "1234", /* pass */ 100, /* keep alive */ NULL, /* will_topic */ NULL, /* will_msg */ 0, /* will_qos */ 0 /* will_retain */ #if LWIP_ALTCP && LWIP_ALTCP_TLS , NULL #endif }; static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; // LWIP_UNUSED_ARG(data); LWIP_PLATFORM_DIAG(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags)); LWIP_PLATFORM_DIAG(("Data:\n%s\n", data)); } static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; LWIP_PLATFORM_DIAG(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len)); } static void mqtt_request_cb(void *arg, err_t err) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; LWIP_PLATFORM_DIAG(("MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err)); } static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) { const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; // LWIP_UNUSED_ARG(client); LWIP_PLATFORM_DIAG(("MQTT client \"%s\" connection cb: status %d\n", client_info->client_id, (int)status)); if (status == MQTT_CONNECT_ACCEPTED) { mqtt_sub_unsub(client, "topic_qos1", 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); mqtt_sub_unsub(client, "topic_qos0", 0, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); } } void mqtt_cli_thread(void *arg) { mqtt_client_t* mqtt_client; osDelay(pdMS_TO_TICKS(7000)); ipaddr_aton("192.168.1.34", &mqtt_ip); LOCK_TCPIP_CORE(); mqtt_client = mqtt_client_new(); mqtt_client_connect(mqtt_client, &mqtt_ip, MQTT_PORT, mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info), &mqtt_client_info); mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info)); UNLOCK_TCPIP_CORE(); while(1) { osDelay(pdMS_TO_TICKS(100)); } } void mqtt_cli_init(void) { sys_thread_new("mqtt_cli_netconn", mqtt_cli_thread, NULL, DEFAULT_THREAD_STACKSIZE, MQTT_CLI_THREAD_PRIO); } #endif