MQTTClient.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. /*******************************************************************************
  2. * Copyright (c) 2014, 2017 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - fix for #96 - check rem_len in readPacket
  16. * Ian Craggs - add ability to set message handler separately #6
  17. *******************************************************************************/
  18. #include <MQTTClient.h>
  19. #include <stdio.h>
  20. #include <string.h>
  21. //#include "cmsis_os.h"
  22. osMutexId_t mqttMutex;
  23. static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
  24. md->topicName = aTopicName;
  25. md->message = aMessage;
  26. }
  27. static int getNextPacketId(MQTTClient *c) {
  28. return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
  29. }
  30. static int sendPacket(MQTTClient* c, int length, Timer* timer)
  31. {
  32. int rc = FAILURE,
  33. sent = 0;
  34. while (sent < length && !TimerIsExpired(timer))
  35. {
  36. rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
  37. if (rc < 0) // there was an error writing the data
  38. break;
  39. sent += rc;
  40. }
  41. if (sent == length)
  42. {
  43. TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully sent the packet
  44. rc = MQTT_SUCCESS;
  45. }
  46. else
  47. rc = FAILURE;
  48. return rc;
  49. }
  50. void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
  51. unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
  52. {
  53. int i;
  54. c->ipstack = network;
  55. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  56. c->messageHandlers[i].topicFilter = 0;
  57. c->command_timeout_ms = command_timeout_ms;
  58. c->buf = sendbuf;
  59. c->buf_size = sendbuf_size;
  60. c->readbuf = readbuf;
  61. c->readbuf_size = readbuf_size;
  62. c->isconnected = 0;
  63. c->cleansession = 0;
  64. c->ping_outstanding = 0;
  65. c->defaultMessageHandler = NULL;
  66. c->next_packetid = 1;
  67. TimerInit(&c->last_sent);
  68. TimerInit(&c->last_received);
  69. #if defined(MQTT_TASK)
  70. MutexInit(&c->mutex);
  71. #endif
  72. if(mqttMutex == NULL)
  73. {
  74. // osMutexDef(mqttMutex);
  75. // mqttMutex = osMutexNew(NULL);
  76. c->mutex = osMutexNew(NULL);
  77. }
  78. }
  79. static int decodePacket(MQTTClient* c, int* value, int timeout)
  80. {
  81. unsigned char i;
  82. int multiplier = 1;
  83. int len = 0;
  84. const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
  85. *value = 0;
  86. do
  87. {
  88. int rc = MQTTPACKET_READ_ERROR;
  89. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  90. {
  91. rc = MQTTPACKET_READ_ERROR; /* bad data */
  92. goto exit;
  93. }
  94. rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
  95. if (rc != 1)
  96. goto exit;
  97. *value += (i & 127) * multiplier;
  98. multiplier *= 128;
  99. } while ((i & 128) != 0);
  100. exit:
  101. return len;
  102. }
  103. static int readPacket(MQTTClient* c, Timer* timer)
  104. {
  105. MQTTHeader header = {0};
  106. int len = 0;
  107. int rem_len = 0;
  108. /* 1. read the header byte. This has the packet type in it */
  109. int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
  110. if (rc != 1)
  111. goto exit;
  112. len = 1;
  113. /* 2. read the remaining length. This is variable in itself */
  114. decodePacket(c, &rem_len, TimerLeftMS(timer));
  115. len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
  116. if (rem_len > (c->readbuf_size - len))
  117. {
  118. rc = BUFFER_OVERFLOW;
  119. goto exit;
  120. }
  121. /* 3. read the rest of the buffer using a callback to supply the rest of the data */
  122. if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
  123. rc = 0;
  124. goto exit;
  125. }
  126. header.byte = c->readbuf[0];
  127. rc = header.bits.type;
  128. if (c->keepAliveInterval > 0)
  129. TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have MQTT_SUCCESSfully received a packet
  130. exit:
  131. return rc;
  132. }
  133. // assume topic filter and name is in correct format
  134. // # can only be at end
  135. // + and # can only be next to separator
  136. static char isTopicMatched(char* topicFilter, MQTTString* topicName)
  137. {
  138. char* curf = topicFilter;
  139. char* curn = topicName->lenstring.data;
  140. char* curn_end = curn + topicName->lenstring.len;
  141. while (*curf && curn < curn_end)
  142. {
  143. if (*curn == '/' && *curf != '/')
  144. break;
  145. if (*curf != '+' && *curf != '#' && *curf != *curn)
  146. break;
  147. if (*curf == '+')
  148. { // skip until we meet the next separator, or end of string
  149. char* nextpos = curn + 1;
  150. while (nextpos < curn_end && *nextpos != '/')
  151. nextpos = ++curn + 1;
  152. }
  153. else if (*curf == '#')
  154. curn = curn_end - 1; // skip until end of string
  155. curf++;
  156. curn++;
  157. };
  158. return (curn == curn_end) && (*curf == '\0');
  159. }
  160. int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
  161. {
  162. int i;
  163. int rc = FAILURE;
  164. // we have to find the right message handler - indexed by topic
  165. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  166. {
  167. if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
  168. isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
  169. {
  170. if (c->messageHandlers[i].fp != NULL)
  171. {
  172. MessageData md;
  173. NewMessageData(&md, topicName, message);
  174. c->messageHandlers[i].fp(&md);
  175. rc = MQTT_SUCCESS;
  176. }
  177. }
  178. }
  179. if (rc == FAILURE && c->defaultMessageHandler != NULL)
  180. {
  181. MessageData md;
  182. NewMessageData(&md, topicName, message);
  183. c->defaultMessageHandler(&md);
  184. rc = MQTT_SUCCESS;
  185. }
  186. return rc;
  187. }
  188. int keepalive(MQTTClient* c)
  189. {
  190. int rc = MQTT_SUCCESS;
  191. if (c->keepAliveInterval == 0)
  192. goto exit;
  193. if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
  194. {
  195. if (c->ping_outstanding)
  196. rc = FAILURE; /* PINGRESP not received in keepalive interval */
  197. else
  198. {
  199. Timer timer;
  200. TimerInit(&timer);
  201. TimerCountdownMS(&timer, 1000);
  202. int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
  203. if (len > 0 && (rc = sendPacket(c, len, &timer)) == MQTT_SUCCESS) // send the ping packet
  204. c->ping_outstanding = 1;
  205. }
  206. }
  207. exit:
  208. return rc;
  209. }
  210. void MQTTCleanSession(MQTTClient* c)
  211. {
  212. int i = 0;
  213. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  214. c->messageHandlers[i].topicFilter = NULL;
  215. }
  216. void MQTTCloseSession(MQTTClient* c)
  217. {
  218. c->ping_outstanding = 0;
  219. c->isconnected = 0;
  220. if (c->cleansession)
  221. MQTTCleanSession(c);
  222. }
  223. int cycle(MQTTClient* c, Timer* timer)
  224. {
  225. int len = 0,
  226. rc = MQTT_SUCCESS;
  227. int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
  228. switch (packet_type)
  229. {
  230. default:
  231. /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
  232. rc = packet_type;
  233. goto exit;
  234. case 0: /* timed out reading packet */
  235. break;
  236. case CONNACK:
  237. case PUBACK:
  238. case SUBACK:
  239. case UNSUBACK:
  240. break;
  241. case PUBLISH:
  242. {
  243. MQTTString topicName;
  244. MQTTMessage msg;
  245. int intQoS;
  246. msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
  247. if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
  248. (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
  249. goto exit;
  250. msg.qos = (enum QoS)intQoS;
  251. deliverMessage(c, &topicName, &msg);
  252. if (msg.qos != QOS0)
  253. {
  254. if (msg.qos == QOS1)
  255. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
  256. else if (msg.qos == QOS2)
  257. len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
  258. if (len <= 0)
  259. rc = FAILURE;
  260. else
  261. rc = sendPacket(c, len, timer);
  262. if (rc == FAILURE)
  263. goto exit; // there was a problem
  264. }
  265. break;
  266. }
  267. case PUBREC:
  268. case PUBREL:
  269. {
  270. unsigned short mypacketid;
  271. unsigned char dup, type;
  272. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  273. rc = FAILURE;
  274. else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
  275. (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
  276. rc = FAILURE;
  277. else if ((rc = sendPacket(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet
  278. rc = FAILURE; // there was a problem
  279. if (rc == FAILURE)
  280. goto exit; // there was a problem
  281. break;
  282. }
  283. case PUBCOMP:
  284. break;
  285. case PINGRESP:
  286. c->ping_outstanding = 0;
  287. break;
  288. }
  289. if (keepalive(c) != MQTT_SUCCESS) {
  290. //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
  291. rc = FAILURE;
  292. }
  293. exit:
  294. if (rc == MQTT_SUCCESS)
  295. rc = packet_type;
  296. else if (c->isconnected)
  297. MQTTCloseSession(c);
  298. return rc;
  299. }
  300. int MQTTYield(MQTTClient* c, int timeout_ms)
  301. {
  302. int rc = MQTT_SUCCESS;
  303. Timer timer;
  304. TimerInit(&timer);
  305. TimerCountdownMS(&timer, timeout_ms);
  306. do
  307. {
  308. if (cycle(c, &timer) < 0)
  309. {
  310. rc = FAILURE;
  311. break;
  312. }
  313. } while (!TimerIsExpired(&timer));
  314. return rc;
  315. }
  316. int MQTTIsConnected(MQTTClient* client)
  317. {
  318. return client->isconnected;
  319. }
  320. void MQTTRun(void* parm)
  321. {
  322. Timer timer;
  323. MQTTClient* c = (MQTTClient*)parm;
  324. TimerInit(&timer);
  325. while (1)
  326. {
  327. #if defined(MQTT_TASK)
  328. MutexLock(&c->mutex);
  329. #endif
  330. // osMutexAcquire(mqttMutex, osWaitForever);
  331. osMutexAcquire(c->mutex, osWaitForever);
  332. TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
  333. cycle(c, &timer);
  334. #if defined(MQTT_TASK)
  335. MutexUnlock(&c->mutex);
  336. #endif
  337. // osMutexRelease(mqttMutex);
  338. osMutexRelease(c->mutex);
  339. }
  340. }
  341. #if defined(MQTT_TASK)
  342. int MQTTStartTask(MQTTClient* client)
  343. {
  344. return ThreadStart(&client->thread, &MQTTRun, client);
  345. }
  346. #endif
  347. int waitfor(MQTTClient* c, int packet_type, Timer* timer)
  348. {
  349. int rc = FAILURE;
  350. do
  351. {
  352. if (TimerIsExpired(timer))
  353. break; // we timed out
  354. rc = cycle(c, timer);
  355. }
  356. while (rc != packet_type && rc >= 0);
  357. return rc;
  358. }
  359. int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
  360. {
  361. Timer connect_timer;
  362. int rc = FAILURE;
  363. MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
  364. int len = 0;
  365. #if defined(MQTT_TASK)
  366. MutexLock(&c->mutex);
  367. #endif
  368. // osMutexAcquire(mqttMutex, osWaitForever);
  369. osMutexAcquire(c->mutex, osWaitForever);
  370. if (c->isconnected) /* don't send connect packet again if we are already connected */
  371. goto exit;
  372. TimerInit(&connect_timer);
  373. TimerCountdownMS(&connect_timer, c->command_timeout_ms);
  374. if (options == 0)
  375. options = &default_options; /* set default options if none were supplied */
  376. c->keepAliveInterval = options->keepAliveInterval;
  377. c->cleansession = options->cleansession;
  378. TimerCountdown(&c->last_received, c->keepAliveInterval);
  379. if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
  380. goto exit;
  381. if ((rc = sendPacket(c, len, &connect_timer)) != MQTT_SUCCESS) // send the connect packet
  382. goto exit; // there was a problem
  383. // this will be a blocking call, wait for the connack
  384. if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
  385. {
  386. data->rc = 0;
  387. data->sessionPresent = 0;
  388. if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
  389. rc = data->rc;
  390. // rc = MQTT_SUCCESS;
  391. else
  392. rc = FAILURE;
  393. }
  394. else
  395. rc = FAILURE;
  396. exit:
  397. if (rc == MQTT_SUCCESS)
  398. {
  399. c->isconnected = 1;
  400. c->ping_outstanding = 0;
  401. }
  402. #if defined(MQTT_TASK)
  403. MutexUnlock(&c->mutex);
  404. #endif
  405. // osMutexRelease(mqttMutex);
  406. osMutexRelease(c->mutex);
  407. return rc;
  408. }
  409. int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
  410. {
  411. MQTTConnackData data;
  412. return MQTTConnectWithResults(c, options, &data);
  413. }
  414. int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
  415. {
  416. int rc = FAILURE;
  417. int i = -1;
  418. /* first check for an existing matching slot */
  419. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  420. {
  421. if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
  422. {
  423. if (messageHandler == NULL) /* remove existing */
  424. {
  425. c->messageHandlers[i].topicFilter = NULL;
  426. c->messageHandlers[i].fp = NULL;
  427. }
  428. rc = MQTT_SUCCESS; /* return i when adding new subscription */
  429. break;
  430. }
  431. }
  432. /* if no existing, look for empty slot (unless we are removing) */
  433. if (messageHandler != NULL) {
  434. if (rc == FAILURE)
  435. {
  436. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
  437. {
  438. if (c->messageHandlers[i].topicFilter == NULL)
  439. {
  440. rc = MQTT_SUCCESS;
  441. break;
  442. }
  443. }
  444. }
  445. if (i < MAX_MESSAGE_HANDLERS)
  446. {
  447. c->messageHandlers[i].topicFilter = topicFilter;
  448. c->messageHandlers[i].fp = messageHandler;
  449. }
  450. }
  451. return rc;
  452. }
  453. int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
  454. messageHandler messageHandler, MQTTSubackData* data)
  455. {
  456. int rc = FAILURE;
  457. Timer timer;
  458. int len = 0;
  459. MQTTString topic = MQTTString_initializer;
  460. topic.cstring = (char *)topicFilter;
  461. #if defined(MQTT_TASK)
  462. MutexLock(&c->mutex);
  463. #endif
  464. // osMutexAcquire(mqttMutex, osWaitForever);
  465. osMutexAcquire(c->mutex, osWaitForever);
  466. if (!c->isconnected)
  467. goto exit;
  468. TimerInit(&timer);
  469. TimerCountdownMS(&timer, c->command_timeout_ms);
  470. int _qos[1] = {(int)qos};
  471. len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, _qos);
  472. // len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
  473. if (len <= 0)
  474. goto exit;
  475. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  476. goto exit; // there was a problem
  477. if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
  478. {
  479. int count = 0;
  480. unsigned short mypacketid;
  481. data->grantedQoS = QOS0;
  482. int _grantedQoS[1] = {(int)&data->grantedQoS};
  483. // if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
  484. if (MQTTDeserialize_suback(&mypacketid, 1, &count, _grantedQoS, c->readbuf, c->readbuf_size) == 1)
  485. {
  486. if (data->grantedQoS != 0x80)
  487. rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
  488. }
  489. }
  490. else
  491. rc = FAILURE;
  492. exit:
  493. if (rc == FAILURE)
  494. MQTTCloseSession(c);
  495. #if defined(MQTT_TASK)
  496. MutexUnlock(&c->mutex);
  497. #endif
  498. // osMutexRelease(mqttMutex);
  499. osMutexRelease(c->mutex);
  500. return rc;
  501. }
  502. int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
  503. messageHandler messageHandler)
  504. {
  505. MQTTSubackData data;
  506. return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
  507. }
  508. int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
  509. {
  510. int rc = FAILURE;
  511. Timer timer;
  512. MQTTString topic = MQTTString_initializer;
  513. topic.cstring = (char *)topicFilter;
  514. int len = 0;
  515. #if defined(MQTT_TASK)
  516. MutexLock(&c->mutex);
  517. #endif
  518. // osMutexAcquire(mqttMutex, osWaitForever);
  519. osMutexAcquire(c->mutex, osWaitForever);
  520. if (!c->isconnected)
  521. goto exit;
  522. TimerInit(&timer);
  523. TimerCountdownMS(&timer, c->command_timeout_ms);
  524. if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
  525. goto exit;
  526. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  527. goto exit; // there was a problem
  528. if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
  529. {
  530. unsigned short mypacketid; // should be the same as the packetid above
  531. if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
  532. {
  533. /* remove the subscription message handler associated with this topic, if there is one */
  534. MQTTSetMessageHandler(c, topicFilter, NULL);
  535. }
  536. }
  537. else
  538. rc = FAILURE;
  539. exit:
  540. if (rc == FAILURE)
  541. MQTTCloseSession(c);
  542. #if defined(MQTT_TASK)
  543. MutexUnlock(&c->mutex);
  544. #endif
  545. // osMutexRelease(mqttMutex);
  546. osMutexRelease(c->mutex);
  547. return rc;
  548. }
  549. int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
  550. {
  551. int rc = FAILURE;
  552. Timer timer;
  553. MQTTString topic = MQTTString_initializer;
  554. topic.cstring = (char *)topicName;
  555. int len = 0;
  556. #if defined(MQTT_TASK)
  557. MutexLock(&c->mutex);
  558. #endif
  559. // osMutexAcquire(mqttMutex, osWaitForever);
  560. osMutexAcquire(c->mutex, osWaitForever);
  561. if (!c->isconnected)
  562. goto exit;
  563. TimerInit(&timer);
  564. TimerCountdownMS(&timer, c->command_timeout_ms);
  565. if (message->qos == QOS1 || message->qos == QOS2)
  566. message->id = getNextPacketId(c);
  567. len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
  568. topic, (unsigned char*)message->payload, message->payloadlen);
  569. if (len <= 0)
  570. goto exit;
  571. if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
  572. goto exit; // there was a problem
  573. if (message->qos == QOS1)
  574. {
  575. if (waitfor(c, PUBACK, &timer) == PUBACK)
  576. {
  577. unsigned short mypacketid;
  578. unsigned char dup, type;
  579. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  580. rc = FAILURE;
  581. }
  582. else
  583. rc = FAILURE;
  584. }
  585. else if (message->qos == QOS2)
  586. {
  587. if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
  588. {
  589. unsigned short mypacketid;
  590. unsigned char dup, type;
  591. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
  592. rc = FAILURE;
  593. }
  594. else
  595. rc = FAILURE;
  596. }
  597. exit:
  598. if (rc == FAILURE)
  599. MQTTCloseSession(c);
  600. #if defined(MQTT_TASK)
  601. MutexUnlock(&c->mutex);
  602. #endif
  603. // osMutexRelease(mqttMutex);
  604. osMutexRelease(c->mutex);
  605. return rc;
  606. }
  607. int MQTTDisconnect(MQTTClient* c)
  608. {
  609. int rc = FAILURE;
  610. Timer timer; // we might wait for incomplete incoming publishes to complete
  611. int len = 0;
  612. #if defined(MQTT_TASK)
  613. MutexLock(&c->mutex);
  614. #endif
  615. // osMutexAcquire(mqttMutex, osWaitForever);
  616. osMutexAcquire(c->mutex, osWaitForever);
  617. TimerInit(&timer);
  618. TimerCountdownMS(&timer, c->command_timeout_ms);
  619. len = MQTTSerialize_disconnect(c->buf, c->buf_size);
  620. if (len > 0)
  621. rc = sendPacket(c, len, &timer); // send the disconnect packet
  622. MQTTCloseSession(c);
  623. #if defined(MQTT_TASK)
  624. MutexUnlock(&c->mutex);
  625. #endif
  626. // osMutexRelease(mqttMutex);
  627. osMutexRelease(c->mutex);
  628. return rc;
  629. }