MQTTClient.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. /*******************************************************************************
  2. * Copyright (c) 2014, 2017 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - fix for #96 - check rem_len in readPacket
  16. * Ian Craggs - add ability to set message handler separately #6
  17. *******************************************************************************/
  18. #include <MQTTClient.h>
  19. #include <stdio.h>
  20. #include <string.h>
  21. #include "cmsis_os.h"
  22. osMutexId_t mqttMutex;
  23. static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
  24. md->topicName = aTopicName;
  25. md->message = aMessage;
  26. }
  27. static int getNextPacketId(MQTTClient *c) {
  28. return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
  29. }
  30. static int sendPacket(MQTTClient* c, int length, Timer* timer)
  31. {
  32. int rc = FAILURE,
  33. sent = 0;
  34. while (sent < length && !TimerIsExpired(timer))
  35. {
  36. rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
  37. if (rc < 0) // there was an error writing the data
  38. break;
  39. sent += rc;
  40. }
  41. if (sent == length)
  42. {
  43. TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully sent the packet
  44. rc = MQTT_SUCCESS;
  45. }
  46. else
  47. rc = FAILURE;
  48. return rc;
  49. }
  50. void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
  51. unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
  52. {
  53. int i;
  54. c->ipstack = network;
  55. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  56. c->messageHandlers[i].topicFilter = 0;
  57. c->command_timeout_ms = command_timeout_ms;
  58. c->buf = sendbuf;
  59. c->buf_size = sendbuf_size;
  60. c->readbuf = readbuf;
  61. c->readbuf_size = readbuf_size;
  62. c->isconnected = 0;
  63. c->cleansession = 0;
  64. c->ping_outstanding = 0;
  65. c->defaultMessageHandler = NULL;
  66. c->next_packetid = 1;
  67. TimerInit(&c->last_sent);
  68. TimerInit(&c->last_received);
  69. #if defined(MQTT_TASK)
  70. MutexInit(&c->mutex);
  71. #endif
  72. if(mqttMutex == NULL)
  73. {
  74. osMutexDef(mqttMutex);
  75. mqttMutex = osMutexNew(NULL);
  76. }
  77. }
  78. static int decodePacket(MQTTClient* c, int* value, int timeout)
  79. {
  80. unsigned char i;
  81. int multiplier = 1;
  82. int len = 0;
  83. const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
  84. *value = 0;
  85. do
  86. {
  87. int rc = MQTTPACKET_READ_ERROR;
  88. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  89. {
  90. rc = MQTTPACKET_READ_ERROR; /* bad data */
  91. goto exit;
  92. }
  93. rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
  94. if (rc != 1)
  95. goto exit;
  96. *value += (i & 127) * multiplier;
  97. multiplier *= 128;
  98. } while ((i & 128) != 0);
  99. exit:
  100. return len;
  101. }
  102. static int readPacket(MQTTClient* c, Timer* timer)
  103. {
  104. MQTTHeader header = {0};
  105. int len = 0;
  106. int rem_len = 0;
  107. /* 1. read the header byte. This has the packet type in it */
  108. int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
  109. if (rc != 1)
  110. goto exit;
  111. len = 1;
  112. /* 2. read the remaining length. This is variable in itself */
  113. decodePacket(c, &rem_len, TimerLeftMS(timer));
  114. len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
  115. if (rem_len > (c->readbuf_size - len))
  116. {
  117. rc = BUFFER_OVERFLOW;
  118. goto exit;
  119. }
  120. /* 3. read the rest of the buffer using a callback to supply the rest of the data */
  121. if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
  122. rc = 0;
  123. goto exit;
  124. }
  125. header.byte = c->readbuf[0];
  126. rc = header.bits.type;
  127. if (c->keepAliveInterval > 0)
  128. TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully received a packet
  129. exit:
  130. return rc;
  131. }
  132. // assume topic filter and name is in correct format
  133. // # can only be at end
  134. // + and # can only be next to separator
  135. static char isTopicMatched(char* topicFilter, MQTTString* topicName)
  136. {
  137. char* curf = topicFilter;
  138. char* curn = topicName->lenstring.data;
  139. char* curn_end = curn + topicName->lenstring.len;
  140. while (*curf && curn < curn_end)
  141. {
  142. if (*curn == '/' && *curf != '/')
  143. break;
  144. if (*curf != '+' && *curf != '#' && *curf != *curn)
  145. break;
  146. if (*curf == '+')
  147. { // skip until we meet the next separator, or end of string
  148. char* nextpos = curn + 1;
  149. while (nextpos < curn_end && *nextpos != '/')
  150. nextpos = ++curn + 1;
  151. }
  152. else if (*curf == '#')
  153. curn = curn_end - 1; // skip until end of string
  154. curf++;
  155. curn++;
  156. };
  157. return (curn == curn_end) && (*curf == '\0');
  158. }
  159. int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
  160. {
  161. int i;
  162. int rc = FAILURE;
  163. // we have to find the right message handler - indexed by topic
  164. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  165. {
  166. if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
  167. isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
  168. {
  169. if (c->messageHandlers[i].fp != NULL)
  170. {
  171. MessageData md;
  172. NewMessageData(&md, topicName, message);
  173. c->messageHandlers[i].fp(&md);
  174. rc = MQTT_SUCCESS;
  175. }
  176. }
  177. }
  178. if (rc == FAILURE && c->defaultMessageHandler != NULL)
  179. {
  180. MessageData md;
  181. NewMessageData(&md, topicName, message);
  182. c->defaultMessageHandler(&md);
  183. rc = MQTT_SUCCESS;
  184. }
  185. return rc;
  186. }
  187. int keepalive(MQTTClient* c)
  188. {
  189. int rc = MQTT_SUCCESS;
  190. if (c->keepAliveInterval == 0)
  191. goto exit;
  192. if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
  193. {
  194. if (c->ping_outstanding)
  195. rc = FAILURE; /* PINGRESP not received in keepalive interval */
  196. else
  197. {
  198. Timer timer;
  199. TimerInit(&timer);
  200. TimerCountdownMS(&timer, 1000);
  201. int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
  202. if (len > 0 && (rc = sendPacket(c, len, &timer)) == MQTT_SUCCESS) // send the ping packet
  203. c->ping_outstanding = 1;
  204. }
  205. }
  206. exit:
  207. return rc;
  208. }
  209. void MQTTCleanSession(MQTTClient* c)
  210. {
  211. int i = 0;
  212. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  213. c->messageHandlers[i].topicFilter = NULL;
  214. }
  215. void MQTTCloseSession(MQTTClient* c)
  216. {
  217. c->ping_outstanding = 0;
  218. c->isconnected = 0;
  219. if (c->cleansession)
  220. MQTTCleanSession(c);
  221. }
  222. int cycle(MQTTClient* c, Timer* timer)
  223. {
  224. int len = 0,
  225. rc = MQTT_SUCCESS;
  226. int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
  227. switch (packet_type)
  228. {
  229. default:
  230. /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
  231. rc = packet_type;
  232. goto exit;
  233. case 0: /* timed out reading packet */
  234. break;
  235. case CONNACK:
  236. case PUBACK:
  237. case SUBACK:
  238. case UNSUBACK:
  239. break;
  240. case PUBLISH:
  241. {
  242. MQTTString topicName;
  243. MQTTMessage msg;
  244. int intQoS;
  245. msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
  246. if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
  247. (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
  248. goto exit;
  249. msg.qos = (enum QoS)intQoS;
  250. deliverMessage(c, &topicName, &msg);
  251. if (msg.qos != QOS0)
  252. {
  253. if (msg.qos == QOS1)
  254. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
  255. else if (msg.qos == QOS2)
  256. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
  257. if (len <= 0)
  258. rc = FAILURE;
  259. else
  260. rc = sendPacket(c, len, timer);
  261. if (rc == FAILURE)
  262. goto exit; // there was a problem
  263. }
  264. break;
  265. }
  266. case PUBREC:
  267. case PUBREL:
  268. {
  269. unsigned short mypacketid;
  270. unsigned char dup, type;
  271. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  272. rc = FAILURE;
  273. else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
  274. (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
  275. rc = FAILURE;
  276. else if ((rc = sendPacket(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet
  277. rc = FAILURE; // there was a problem
  278. if (rc == FAILURE)
  279. goto exit; // there was a problem
  280. break;
  281. }
  282. case PUBCOMP:
  283. break;
  284. case PINGRESP:
  285. c->ping_outstanding = 0;
  286. break;
  287. }
  288. if (keepalive(c) != MQTT_SUCCESS) {
  289. //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
  290. rc = FAILURE;
  291. }
  292. exit:
  293. if (rc == MQTT_SUCCESS)
  294. rc = packet_type;
  295. else if (c->isconnected)
  296. MQTTCloseSession(c);
  297. return rc;
  298. }
  299. int MQTTYield(MQTTClient* c, int timeout_ms)
  300. {
  301. int rc = MQTT_SUCCESS;
  302. Timer timer;
  303. TimerInit(&timer);
  304. TimerCountdownMS(&timer, timeout_ms);
  305. do
  306. {
  307. if (cycle(c, &timer) < 0)
  308. {
  309. rc = FAILURE;
  310. break;
  311. }
  312. } while (!TimerIsExpired(&timer));
  313. return rc;
  314. }
  315. int MQTTIsConnected(MQTTClient* client)
  316. {
  317. return client->isconnected;
  318. }
  319. void MQTTRun(void* parm)
  320. {
  321. Timer timer;
  322. MQTTClient* c = (MQTTClient*)parm;
  323. TimerInit(&timer);
  324. while (1)
  325. {
  326. #if defined(MQTT_TASK)
  327. MutexLock(&c->mutex);
  328. #endif
  329. osMutexAcquire(mqttMutex, osWaitForever);
  330. TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
  331. cycle(c, &timer);
  332. #if defined(MQTT_TASK)
  333. MutexUnlock(&c->mutex);
  334. #endif
  335. osMutexRelease(mqttMutex);
  336. }
  337. }
  338. #if defined(MQTT_TASK)
  339. int MQTTStartTask(MQTTClient* client)
  340. {
  341. return ThreadStart(&client->thread, &MQTTRun, client);
  342. }
  343. #endif
  344. int waitfor(MQTTClient* c, int packet_type, Timer* timer)
  345. {
  346. int rc = FAILURE;
  347. do
  348. {
  349. if (TimerIsExpired(timer))
  350. break; // we timed out
  351. rc = cycle(c, timer);
  352. }
  353. while (rc != packet_type && rc >= 0);
  354. return rc;
  355. }
  356. int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
  357. {
  358. Timer connect_timer;
  359. int rc = FAILURE;
  360. MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
  361. int len = 0;
  362. #if defined(MQTT_TASK)
  363. MutexLock(&c->mutex);
  364. #endif
  365. osMutexAcquire(mqttMutex, osWaitForever);
  366. if (c->isconnected) /* don't send connect packet again if we are already connected */
  367. goto exit;
  368. TimerInit(&connect_timer);
  369. TimerCountdownMS(&connect_timer, c->command_timeout_ms);
  370. if (options == 0)
  371. options = &default_options; /* set default options if none were supplied */
  372. c->keepAliveInterval = options->keepAliveInterval;
  373. c->cleansession = options->cleansession;
  374. TimerCountdown(&c->last_received, c->keepAliveInterval);
  375. if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
  376. goto exit;
  377. if ((rc = sendPacket(c, len, &connect_timer)) != MQTT_SUCCESS) // send the connect packet
  378. goto exit; // there was a problem
  379. // this will be a blocking call, wait for the connack
  380. if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
  381. {
  382. data->rc = 0;
  383. data->sessionPresent = 0;
  384. if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
  385. rc = data->rc;
  386. // rc = MQTT_SUCCESS;
  387. else
  388. rc = FAILURE;
  389. }
  390. else
  391. rc = FAILURE;
  392. exit:
  393. if (rc == MQTT_SUCCESS)
  394. {
  395. c->isconnected = 1;
  396. c->ping_outstanding = 0;
  397. }
  398. #if defined(MQTT_TASK)
  399. MutexUnlock(&c->mutex);
  400. #endif
  401. osMutexRelease(mqttMutex);
  402. return rc;
  403. }
  404. int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
  405. {
  406. MQTTConnackData data;
  407. return MQTTConnectWithResults(c, options, &data);
  408. }
  409. int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
  410. {
  411. int rc = FAILURE;
  412. int i = -1;
  413. /* first check for an existing matching slot */
  414. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  415. {
  416. if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
  417. {
  418. if (messageHandler == NULL) /* remove existing */
  419. {
  420. c->messageHandlers[i].topicFilter = NULL;
  421. c->messageHandlers[i].fp = NULL;
  422. }
  423. rc = MQTT_SUCCESS; /* return i when adding new subscription */
  424. break;
  425. }
  426. }
  427. /* if no existing, look for empty slot (unless we are removing) */
  428. if (messageHandler != NULL) {
  429. if (rc == FAILURE)
  430. {
  431. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  432. {
  433. if (c->messageHandlers[i].topicFilter == NULL)
  434. {
  435. rc = MQTT_SUCCESS;
  436. break;
  437. }
  438. }
  439. }
  440. if (i < MAX_MESSAGE_HANDLERS)
  441. {
  442. c->messageHandlers[i].topicFilter = topicFilter;
  443. c->messageHandlers[i].fp = messageHandler;
  444. }
  445. }
  446. return rc;
  447. }
  448. int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
  449. messageHandler messageHandler, MQTTSubackData* data)
  450. {
  451. int rc = FAILURE;
  452. Timer timer;
  453. int len = 0;
  454. MQTTString topic = MQTTString_initializer;
  455. topic.cstring = (char *)topicFilter;
  456. #if defined(MQTT_TASK)
  457. MutexLock(&c->mutex);
  458. #endif
  459. osMutexAcquire(mqttMutex, osWaitForever);
  460. if (!c->isconnected)
  461. goto exit;
  462. TimerInit(&timer);
  463. TimerCountdownMS(&timer, c->command_timeout_ms);
  464. len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
  465. if (len <= 0)
  466. goto exit;
  467. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  468. goto exit; // there was a problem
  469. if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
  470. {
  471. int count = 0;
  472. unsigned short mypacketid;
  473. data->grantedQoS = QOS0;
  474. if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
  475. {
  476. if (data->grantedQoS != 0x80)
  477. rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
  478. }
  479. }
  480. else
  481. rc = FAILURE;
  482. exit:
  483. if (rc == FAILURE)
  484. MQTTCloseSession(c);
  485. #if defined(MQTT_TASK)
  486. MutexUnlock(&c->mutex);
  487. #endif
  488. osMutexRelease(mqttMutex);
  489. return rc;
  490. }
  491. int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
  492. messageHandler messageHandler)
  493. {
  494. MQTTSubackData data;
  495. return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
  496. }
  497. int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
  498. {
  499. int rc = FAILURE;
  500. Timer timer;
  501. MQTTString topic = MQTTString_initializer;
  502. topic.cstring = (char *)topicFilter;
  503. int len = 0;
  504. #if defined(MQTT_TASK)
  505. MutexLock(&c->mutex);
  506. #endif
  507. osMutexAcquire(mqttMutex, osWaitForever);
  508. if (!c->isconnected)
  509. goto exit;
  510. TimerInit(&timer);
  511. TimerCountdownMS(&timer, c->command_timeout_ms);
  512. if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
  513. goto exit;
  514. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  515. goto exit; // there was a problem
  516. if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
  517. {
  518. unsigned short mypacketid; // should be the same as the packetid above
  519. if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
  520. {
  521. /* remove the subscription message handler associated with this topic, if there is one */
  522. MQTTSetMessageHandler(c, topicFilter, NULL);
  523. }
  524. }
  525. else
  526. rc = FAILURE;
  527. exit:
  528. if (rc == FAILURE)
  529. MQTTCloseSession(c);
  530. #if defined(MQTT_TASK)
  531. MutexUnlock(&c->mutex);
  532. #endif
  533. osMutexRelease(mqttMutex);
  534. return rc;
  535. }
  536. int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
  537. {
  538. int rc = FAILURE;
  539. Timer timer;
  540. MQTTString topic = MQTTString_initializer;
  541. topic.cstring = (char *)topicName;
  542. int len = 0;
  543. #if defined(MQTT_TASK)
  544. MutexLock(&c->mutex);
  545. #endif
  546. osMutexAcquire(mqttMutex, osWaitForever);
  547. if (!c->isconnected)
  548. goto exit;
  549. TimerInit(&timer);
  550. TimerCountdownMS(&timer, c->command_timeout_ms);
  551. if (message->qos == QOS1 || message->qos == QOS2)
  552. message->id = getNextPacketId(c);
  553. len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
  554. topic, (unsigned char*)message->payload, message->payloadlen);
  555. if (len <= 0)
  556. goto exit;
  557. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  558. goto exit; // there was a problem
  559. if (message->qos == QOS1)
  560. {
  561. if (waitfor(c, PUBACK, &timer) == PUBACK)
  562. {
  563. unsigned short mypacketid;
  564. unsigned char dup, type;
  565. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  566. rc = FAILURE;
  567. }
  568. else
  569. rc = FAILURE;
  570. }
  571. else if (message->qos == QOS2)
  572. {
  573. if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
  574. {
  575. unsigned short mypacketid;
  576. unsigned char dup, type;
  577. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  578. rc = FAILURE;
  579. }
  580. else
  581. rc = FAILURE;
  582. }
  583. exit:
  584. if (rc == FAILURE)
  585. MQTTCloseSession(c);
  586. #if defined(MQTT_TASK)
  587. MutexUnlock(&c->mutex);
  588. #endif
  589. osMutexRelease(mqttMutex);
  590. return rc;
  591. }
  592. int MQTTDisconnect(MQTTClient* c)
  593. {
  594. int rc = FAILURE;
  595. Timer timer; // we might wait for incomplete incoming publishes to complete
  596. int len = 0;
  597. #if defined(MQTT_TASK)
  598. MutexLock(&c->mutex);
  599. #endif
  600. osMutexAcquire(mqttMutex, osWaitForever);
  601. TimerInit(&timer);
  602. TimerCountdownMS(&timer, c->command_timeout_ms);
  603. len = MQTTSerialize_disconnect(c->buf, c->buf_size);
  604. if (len > 0)
  605. rc = sendPacket(c, len, &timer); // send the disconnect packet
  606. MQTTCloseSession(c);
  607. #if defined(MQTT_TASK)
  608. MutexUnlock(&c->mutex);
  609. #endif
  610. osMutexRelease(mqttMutex);
  611. return rc;
  612. }