123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736 |
- /*******************************************************************************
- * Copyright (c) 2014, 2017 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
- * Ian Craggs - fix for #96 - check rem_len in readPacket
- * Ian Craggs - add ability to set message handler separately #6
- *******************************************************************************/
- #include <MQTTClient.h>
- #include <stdio.h>
- #include <string.h>
- //#include "cmsis_os.h"
- osMutexId_t mqttMutex;
- static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
- md->topicName = aTopicName;
- md->message = aMessage;
- }
- static int getNextPacketId(MQTTClient *c) {
- return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
- }
- static int sendPacket(MQTTClient* c, int length, Timer* timer)
- {
- int rc = FAILURE,
- sent = 0;
- while (sent < length && !TimerIsExpired(timer))
- {
- rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
- if (rc < 0) // there was an error writing the data
- break;
- sent += rc;
- }
- if (sent == length)
- {
- TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully sent the packet
- rc = MQTT_SUCCESS;
- }
- else
- rc = FAILURE;
- return rc;
- }
- void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
- unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
- {
- int i;
- c->ipstack = network;
- for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- c->messageHandlers[i].topicFilter = 0;
- c->command_timeout_ms = command_timeout_ms;
- c->buf = sendbuf;
- c->buf_size = sendbuf_size;
- c->readbuf = readbuf;
- c->readbuf_size = readbuf_size;
- c->isconnected = 0;
- c->cleansession = 0;
- c->ping_outstanding = 0;
- c->defaultMessageHandler = NULL;
- c->next_packetid = 1;
- TimerInit(&c->last_sent);
- TimerInit(&c->last_received);
- #if defined(MQTT_TASK)
- MutexInit(&c->mutex);
- #endif
- if(mqttMutex == NULL)
- {
- // osMutexDef(mqttMutex);
- // mqttMutex = osMutexNew(NULL);
- c->mutex = osMutexNew(NULL);
- }
- }
- static int decodePacket(MQTTClient* c, int* value, int timeout)
- {
- unsigned char i;
- int multiplier = 1;
- int len = 0;
- const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
- *value = 0;
- do
- {
- int rc = MQTTPACKET_READ_ERROR;
- if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
- {
- rc = MQTTPACKET_READ_ERROR; /* bad data */
- goto exit;
- }
- rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
- if (rc != 1)
- goto exit;
- *value += (i & 127) * multiplier;
- multiplier *= 128;
- } while ((i & 128) != 0);
- exit:
- return len;
- }
- static int readPacket(MQTTClient* c, Timer* timer)
- {
- MQTTHeader header = {0};
- int len = 0;
- int rem_len = 0;
- /* 1. read the header byte. This has the packet type in it */
- int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
- if (rc != 1)
- goto exit;
- len = 1;
- /* 2. read the remaining length. This is variable in itself */
- decodePacket(c, &rem_len, TimerLeftMS(timer));
- len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
- if (rem_len > (c->readbuf_size - len))
- {
- rc = BUFFER_OVERFLOW;
- goto exit;
- }
- /* 3. read the rest of the buffer using a callback to supply the rest of the data */
- if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
- rc = 0;
- goto exit;
- }
- header.byte = c->readbuf[0];
- rc = header.bits.type;
- if (c->keepAliveInterval > 0)
- TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully received a packet
- exit:
- return rc;
- }
- // assume topic filter and name is in correct format
- // # can only be at end
- // + and # can only be next to separator
- static char isTopicMatched(char* topicFilter, MQTTString* topicName)
- {
- char* curf = topicFilter;
- char* curn = topicName->lenstring.data;
- char* curn_end = curn + topicName->lenstring.len;
- while (*curf && curn < curn_end)
- {
- if (*curn == '/' && *curf != '/')
- break;
- if (*curf != '+' && *curf != '#' && *curf != *curn)
- break;
- if (*curf == '+')
- { // skip until we meet the next separator, or end of string
- char* nextpos = curn + 1;
- while (nextpos < curn_end && *nextpos != '/')
- nextpos = ++curn + 1;
- }
- else if (*curf == '#')
- curn = curn_end - 1; // skip until end of string
- curf++;
- curn++;
- };
- return (curn == curn_end) && (*curf == '\0');
- }
- int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
- {
- int i;
- int rc = FAILURE;
- // we have to find the right message handler - indexed by topic
- for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- {
- if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
- isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
- {
- if (c->messageHandlers[i].fp != NULL)
- {
- MessageData md;
- NewMessageData(&md, topicName, message);
- c->messageHandlers[i].fp(&md);
- rc = MQTT_SUCCESS;
- }
- }
- }
- if (rc == FAILURE && c->defaultMessageHandler != NULL)
- {
- MessageData md;
- NewMessageData(&md, topicName, message);
- c->defaultMessageHandler(&md);
- rc = MQTT_SUCCESS;
- }
- return rc;
- }
- int keepalive(MQTTClient* c)
- {
- int rc = MQTT_SUCCESS;
- if (c->keepAliveInterval == 0)
- goto exit;
- if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
- {
- if (c->ping_outstanding)
- rc = FAILURE; /* PINGRESP not received in keepalive interval */
- else
- {
- Timer timer;
- TimerInit(&timer);
- TimerCountdownMS(&timer, 1000);
- int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
- if (len > 0 && (rc = sendPacket(c, len, &timer)) == MQTT_SUCCESS) // send the ping packet
- c->ping_outstanding = 1;
- }
- }
- exit:
- return rc;
- }
- void MQTTCleanSession(MQTTClient* c)
- {
- int i = 0;
- for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- c->messageHandlers[i].topicFilter = NULL;
- }
- void MQTTCloseSession(MQTTClient* c)
- {
- c->ping_outstanding = 0;
- c->isconnected = 0;
- if (c->cleansession)
- MQTTCleanSession(c);
- }
- int cycle(MQTTClient* c, Timer* timer)
- {
- int len = 0,
- rc = MQTT_SUCCESS;
- int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
- switch (packet_type)
- {
- default:
- /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
- rc = packet_type;
- goto exit;
- case 0: /* timed out reading packet */
- break;
- case CONNACK:
- case PUBACK:
- case SUBACK:
- case UNSUBACK:
- break;
- case PUBLISH:
- {
- MQTTString topicName;
- MQTTMessage msg;
- int intQoS;
- msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
- if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
- (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
- goto exit;
- msg.qos = (enum QoS)intQoS;
- deliverMessage(c, &topicName, &msg);
- if (msg.qos != QOS0)
- {
- if (msg.qos == QOS1)
- len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
- else if (msg.qos == QOS2)
- len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
- if (len <= 0)
- rc = FAILURE;
- else
- rc = sendPacket(c, len, timer);
- if (rc == FAILURE)
- goto exit; // there was a problem
- }
- break;
- }
- case PUBREC:
- case PUBREL:
- {
- unsigned short mypacketid;
- unsigned char dup, type;
- if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
- rc = FAILURE;
- else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
- (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
- rc = FAILURE;
- else if ((rc = sendPacket(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet
- rc = FAILURE; // there was a problem
- if (rc == FAILURE)
- goto exit; // there was a problem
- break;
- }
- case PUBCOMP:
- break;
- case PINGRESP:
- c->ping_outstanding = 0;
- break;
- }
- if (keepalive(c) != MQTT_SUCCESS) {
- //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
- rc = FAILURE;
- }
- exit:
- if (rc == MQTT_SUCCESS)
- rc = packet_type;
- else if (c->isconnected)
- MQTTCloseSession(c);
- return rc;
- }
- int MQTTYield(MQTTClient* c, int timeout_ms)
- {
- int rc = MQTT_SUCCESS;
- Timer timer;
- TimerInit(&timer);
- TimerCountdownMS(&timer, timeout_ms);
- do
- {
- if (cycle(c, &timer) < 0)
- {
- rc = FAILURE;
- break;
- }
- } while (!TimerIsExpired(&timer));
- return rc;
- }
- int MQTTIsConnected(MQTTClient* client)
- {
- return client->isconnected;
- }
- void MQTTRun(void* parm)
- {
- Timer timer;
- MQTTClient* c = (MQTTClient*)parm;
- TimerInit(&timer);
- while (1)
- {
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
- cycle(c, &timer);
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- }
- }
- #if defined(MQTT_TASK)
- int MQTTStartTask(MQTTClient* client)
- {
- return ThreadStart(&client->thread, &MQTTRun, client);
- }
- #endif
- int waitfor(MQTTClient* c, int packet_type, Timer* timer)
- {
- int rc = FAILURE;
- do
- {
- if (TimerIsExpired(timer))
- break; // we timed out
- rc = cycle(c, timer);
- }
- while (rc != packet_type && rc >= 0);
- return rc;
- }
- int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
- {
- Timer connect_timer;
- int rc = FAILURE;
- MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
- int len = 0;
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- if (c->isconnected) /* don't send connect packet again if we are already connected */
- goto exit;
- TimerInit(&connect_timer);
- TimerCountdownMS(&connect_timer, c->command_timeout_ms);
- if (options == 0)
- options = &default_options; /* set default options if none were supplied */
- c->keepAliveInterval = options->keepAliveInterval;
- c->cleansession = options->cleansession;
- TimerCountdown(&c->last_received, c->keepAliveInterval);
- if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
- goto exit;
- if ((rc = sendPacket(c, len, &connect_timer)) != MQTT_SUCCESS) // send the connect packet
- goto exit; // there was a problem
- // this will be a blocking call, wait for the connack
- if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
- {
- data->rc = 0;
- data->sessionPresent = 0;
- if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
- rc = data->rc;
- // rc = MQTT_SUCCESS;
- else
- rc = FAILURE;
- }
- else
- rc = FAILURE;
- exit:
- if (rc == MQTT_SUCCESS)
- {
- c->isconnected = 1;
- c->ping_outstanding = 0;
- }
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- return rc;
- }
- int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
- {
- MQTTConnackData data;
- return MQTTConnectWithResults(c, options, &data);
- }
- int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
- {
- int rc = FAILURE;
- int i = -1;
- /* first check for an existing matching slot */
- for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- {
- if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
- {
- if (messageHandler == NULL) /* remove existing */
- {
- c->messageHandlers[i].topicFilter = NULL;
- c->messageHandlers[i].fp = NULL;
- }
- rc = MQTT_SUCCESS; /* return i when adding new subscription */
- break;
- }
- }
- /* if no existing, look for empty slot (unless we are removing) */
- if (messageHandler != NULL) {
- if (rc == FAILURE)
- {
- for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
- {
- if (c->messageHandlers[i].topicFilter == NULL)
- {
- rc = MQTT_SUCCESS;
- break;
- }
- }
- }
- if (i < MAX_MESSAGE_HANDLERS)
- {
- c->messageHandlers[i].topicFilter = topicFilter;
- c->messageHandlers[i].fp = messageHandler;
- }
- }
- return rc;
- }
- int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
- messageHandler messageHandler, MQTTSubackData* data)
- {
- int rc = FAILURE;
- Timer timer;
- int len = 0;
- MQTTString topic = MQTTString_initializer;
- topic.cstring = (char *)topicFilter;
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- if (!c->isconnected)
- goto exit;
- TimerInit(&timer);
- TimerCountdownMS(&timer, c->command_timeout_ms);
- int _qos[1] = {(int)qos};
- len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, _qos);
- // len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
- if (len <= 0)
- goto exit;
- if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
- goto exit; // there was a problem
- if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
- {
- int count = 0;
- unsigned short mypacketid;
- data->grantedQoS = QOS0;
- int _grantedQoS[1] = {(int)&data->grantedQoS};
- // if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
- if (MQTTDeserialize_suback(&mypacketid, 1, &count, _grantedQoS, c->readbuf, c->readbuf_size) == 1)
- {
- if (data->grantedQoS != 0x80)
- rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
- }
- }
- else
- rc = FAILURE;
- exit:
- if (rc == FAILURE)
- MQTTCloseSession(c);
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- return rc;
- }
- int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
- messageHandler messageHandler)
- {
- MQTTSubackData data;
- return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
- }
- int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
- {
- int rc = FAILURE;
- Timer timer;
- MQTTString topic = MQTTString_initializer;
- topic.cstring = (char *)topicFilter;
- int len = 0;
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- if (!c->isconnected)
- goto exit;
- TimerInit(&timer);
- TimerCountdownMS(&timer, c->command_timeout_ms);
- if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
- goto exit;
- if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
- goto exit; // there was a problem
- if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
- {
- unsigned short mypacketid; // should be the same as the packetid above
- if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
- {
- /* remove the subscription message handler associated with this topic, if there is one */
- MQTTSetMessageHandler(c, topicFilter, NULL);
- }
- }
- else
- rc = FAILURE;
- exit:
- if (rc == FAILURE)
- MQTTCloseSession(c);
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- return rc;
- }
- int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
- {
- int rc = FAILURE;
- Timer timer;
- MQTTString topic = MQTTString_initializer;
- topic.cstring = (char *)topicName;
- int len = 0;
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- if (!c->isconnected)
- goto exit;
- TimerInit(&timer);
- TimerCountdownMS(&timer, c->command_timeout_ms);
- if (message->qos == QOS1 || message->qos == QOS2)
- message->id = getNextPacketId(c);
- len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
- topic, (unsigned char*)message->payload, message->payloadlen);
- if (len <= 0)
- goto exit;
- if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
- goto exit; // there was a problem
- if (message->qos == QOS1)
- {
- if (waitfor(c, PUBACK, &timer) == PUBACK)
- {
- unsigned short mypacketid;
- unsigned char dup, type;
- if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
- rc = FAILURE;
- }
- else
- rc = FAILURE;
- }
- else if (message->qos == QOS2)
- {
- if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
- {
- unsigned short mypacketid;
- unsigned char dup, type;
- if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
- rc = FAILURE;
- }
- else
- rc = FAILURE;
- }
- exit:
- if (rc == FAILURE)
- MQTTCloseSession(c);
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- return rc;
- }
- int MQTTDisconnect(MQTTClient* c)
- {
- int rc = FAILURE;
- Timer timer; // we might wait for incomplete incoming publishes to complete
- int len = 0;
- #if defined(MQTT_TASK)
- MutexLock(&c->mutex);
- #endif
- // osMutexAcquire(mqttMutex, osWaitForever);
- osMutexAcquire(c->mutex, osWaitForever);
- TimerInit(&timer);
- TimerCountdownMS(&timer, c->command_timeout_ms);
- len = MQTTSerialize_disconnect(c->buf, c->buf_size);
- if (len > 0)
- rc = sendPacket(c, len, &timer); // send the disconnect packet
- MQTTCloseSession(c);
- #if defined(MQTT_TASK)
- MutexUnlock(&c->mutex);
- #endif
- // osMutexRelease(mqttMutex);
- osMutexRelease(c->mutex);
- return rc;
- }
|