2014-05-07 22:27:00 +00:00
|
|
|
/*
|
|
|
|
Copyright (c) 2009-2014 Roger Light <roger@atchoo.org>
|
|
|
|
|
|
|
|
All rights reserved. This program and the accompanying materials
|
|
|
|
are made available under the terms of the Eclipse Public License v1.0
|
|
|
|
and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
|
|
|
|
|
|
The Eclipse Public License is available at
|
|
|
|
http://www.eclipse.org/legal/epl-v10.html
|
|
|
|
and the Eclipse Distribution License is available at
|
|
|
|
http://www.eclipse.org/org/documents/edl-v10.php.
|
|
|
|
|
|
|
|
Contributors:
|
|
|
|
Roger Light - initial implementation and documentation.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#define _GNU_SOURCE
|
|
|
|
|
|
|
|
#include <config.h>
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
#ifndef WIN32
|
|
|
|
#include <poll.h>
|
|
|
|
#else
|
|
|
|
#include <process.h>
|
|
|
|
#include <winsock2.h>
|
|
|
|
#include <ws2tcpip.h>
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#include <errno.h>
|
|
|
|
#include <signal.h>
|
|
|
|
#include <stdio.h>
|
|
|
|
#include <string.h>
|
2014-10-24 21:49:48 +00:00
|
|
|
#ifndef WIN32
|
|
|
|
# include <sys/socket.h>
|
|
|
|
#endif
|
2014-09-14 17:08:09 +00:00
|
|
|
#include <time.h>
|
2014-05-07 22:27:00 +00:00
|
|
|
|
2014-05-06 09:47:00 +00:00
|
|
|
#ifdef WITH_WEBSOCKETS
|
|
|
|
# include <libwebsockets.h>
|
|
|
|
#endif
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
#include <mosquitto_broker.h>
|
|
|
|
#include <memory_mosq.h>
|
2014-10-24 20:28:24 +00:00
|
|
|
#include <send_mosq.h>
|
2014-05-07 22:27:00 +00:00
|
|
|
#include <time_mosq.h>
|
|
|
|
#include <util_mosq.h>
|
|
|
|
|
|
|
|
extern bool flag_reload;
|
|
|
|
#ifdef WITH_PERSISTENCE
|
|
|
|
extern bool flag_db_backup;
|
|
|
|
#endif
|
|
|
|
extern bool flag_tree_print;
|
|
|
|
extern int run;
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
extern int g_clients_expired;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds);
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
|
|
|
|
|
|
|
|
int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max)
|
|
|
|
{
|
2014-09-15 21:13:13 +00:00
|
|
|
#ifdef WITH_SYS_TREE
|
2014-05-07 22:27:00 +00:00
|
|
|
time_t start_time = mosquitto_time();
|
2014-09-15 21:13:13 +00:00
|
|
|
#endif
|
|
|
|
#ifdef WITH_PERSISTENCE
|
2014-05-07 22:27:00 +00:00
|
|
|
time_t last_backup = mosquitto_time();
|
2014-09-15 21:13:13 +00:00
|
|
|
#endif
|
2014-06-30 22:37:37 +00:00
|
|
|
time_t now = 0;
|
2014-06-04 21:14:16 +00:00
|
|
|
time_t now_time;
|
2014-05-07 22:27:00 +00:00
|
|
|
int time_count;
|
|
|
|
int fdcount;
|
2014-06-23 16:57:35 +00:00
|
|
|
struct mosquitto *context, *ctxt_tmp;
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifndef WIN32
|
|
|
|
sigset_t sigblock, origsig;
|
|
|
|
#endif
|
|
|
|
int i;
|
|
|
|
struct pollfd *pollfds = NULL;
|
|
|
|
int pollfd_count = 0;
|
|
|
|
int pollfd_index;
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
int bridge_sock;
|
|
|
|
int rc;
|
|
|
|
#endif
|
2014-06-23 16:57:35 +00:00
|
|
|
int context_count;
|
|
|
|
time_t expiration_check_time = 0;
|
2015-01-26 21:02:53 +00:00
|
|
|
char *id;
|
2014-05-07 22:27:00 +00:00
|
|
|
|
|
|
|
#ifndef WIN32
|
|
|
|
sigemptyset(&sigblock);
|
|
|
|
sigaddset(&sigblock, SIGINT);
|
|
|
|
#endif
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
if(db->config->persistent_client_expiration > 0){
|
2014-08-19 00:11:22 +00:00
|
|
|
expiration_check_time = time(NULL) + 3600;
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
while(run){
|
2014-09-22 22:35:09 +00:00
|
|
|
mosquitto__free_disused_contexts(db);
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
if(db->config->sys_interval > 0){
|
|
|
|
mqtt3_db_sys_update(db, db->config->sys_interval, start_time);
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
2014-09-15 21:13:13 +00:00
|
|
|
context_count = HASH_CNT(hh_sock, db->contexts_by_sock);
|
|
|
|
#ifdef WITH_BRIDGE
|
2014-11-18 23:34:54 +00:00
|
|
|
context_count += db->bridge_count;
|
2014-09-15 21:13:13 +00:00
|
|
|
#endif
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
if(listensock_count + context_count > pollfd_count || !pollfds){
|
|
|
|
pollfd_count = listensock_count + context_count;
|
2014-05-07 22:27:00 +00:00
|
|
|
pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count);
|
|
|
|
if(!pollfds){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count);
|
|
|
|
|
|
|
|
pollfd_index = 0;
|
|
|
|
for(i=0; i<listensock_count; i++){
|
|
|
|
pollfds[pollfd_index].fd = listensock[i];
|
|
|
|
pollfds[pollfd_index].events = POLLIN;
|
|
|
|
pollfds[pollfd_index].revents = 0;
|
|
|
|
pollfd_index++;
|
|
|
|
}
|
|
|
|
|
2014-06-04 21:14:16 +00:00
|
|
|
now_time = time(NULL);
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
time_count = 0;
|
2014-06-23 16:57:35 +00:00
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
|
|
|
if(time_count > 0){
|
|
|
|
time_count--;
|
|
|
|
}else{
|
|
|
|
time_count = 1000;
|
|
|
|
now = mosquitto_time();
|
|
|
|
}
|
|
|
|
context->pollfd_index = -1;
|
|
|
|
|
|
|
|
if(context->sock != INVALID_SOCKET){
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
if(context->bridge){
|
|
|
|
_mosquitto_check_keepalive(db, context);
|
|
|
|
if(context->bridge->round_robin == false
|
|
|
|
&& context->bridge->cur_address != 0
|
|
|
|
&& now > context->bridge->primary_retry){
|
|
|
|
|
2014-10-24 21:39:09 +00:00
|
|
|
if(_mosquitto_try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) == MOSQ_ERR_SUCCESS){
|
2014-06-23 16:57:35 +00:00
|
|
|
COMPAT_CLOSE(bridge_sock);
|
|
|
|
_mosquitto_socket_close(db, context);
|
|
|
|
context->bridge->cur_address = context->bridge->address_count-1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* Local bridges never time out in this fashion. */
|
|
|
|
if(!(context->keepalive)
|
|
|
|
|| context->bridge
|
|
|
|
|| now - context->last_msg_in < (time_t)(context->keepalive)*3/2){
|
|
|
|
|
2014-11-17 23:46:02 +00:00
|
|
|
if(mqtt3_db_message_write(db, context) == MOSQ_ERR_SUCCESS){
|
2014-06-23 16:57:35 +00:00
|
|
|
pollfds[pollfd_index].fd = context->sock;
|
|
|
|
pollfds[pollfd_index].events = POLLIN;
|
|
|
|
pollfds[pollfd_index].revents = 0;
|
2014-10-24 20:28:24 +00:00
|
|
|
if(context->current_out_packet || context->state == mosq_cs_connect_pending){
|
2014-06-23 16:57:35 +00:00
|
|
|
pollfds[pollfd_index].events |= POLLOUT;
|
|
|
|
}
|
|
|
|
context->pollfd_index = pollfd_index;
|
|
|
|
pollfd_index++;
|
|
|
|
}else{
|
2014-07-08 22:07:19 +00:00
|
|
|
do_disconnect(db, context);
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(db->config->connection_messages == true){
|
2015-01-26 21:02:53 +00:00
|
|
|
if(context->id){
|
|
|
|
id = context->id;
|
|
|
|
}else{
|
|
|
|
id = "<unknown>";
|
|
|
|
}
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id);
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
/* Client has exceeded keepalive*1.5 */
|
2014-07-08 22:07:19 +00:00
|
|
|
do_disconnect(db, context);
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
time_count = 0;
|
2014-11-18 23:34:54 +00:00
|
|
|
for(i=0; i<db->bridge_count; i++){
|
|
|
|
if(!db->bridges[i]) continue;
|
|
|
|
|
|
|
|
context = db->bridges[i];
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
if(context->sock == INVALID_SOCKET){
|
2014-05-07 22:27:00 +00:00
|
|
|
if(time_count > 0){
|
|
|
|
time_count--;
|
|
|
|
}else{
|
|
|
|
time_count = 1000;
|
|
|
|
now = mosquitto_time();
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
/* Want to try to restart the bridge connection */
|
|
|
|
if(!context->bridge->restart_t){
|
|
|
|
context->bridge->restart_t = now+context->bridge->restart_timeout;
|
|
|
|
context->bridge->cur_address++;
|
|
|
|
if(context->bridge->cur_address == context->bridge->address_count){
|
|
|
|
context->bridge->cur_address = 0;
|
|
|
|
}
|
|
|
|
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
|
|
|
|
context->bridge->primary_retry = now + 5;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect){
|
|
|
|
rc = mqtt3_bridge_connect(db, context);
|
|
|
|
if(rc){
|
|
|
|
context->bridge->cur_address++;
|
|
|
|
if(context->bridge->cur_address == context->bridge->address_count){
|
|
|
|
context->bridge->cur_address = 0;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
if(context->bridge->start_type == bst_automatic && now > context->bridge->restart_t){
|
|
|
|
context->bridge->restart_t = 0;
|
|
|
|
rc = mqtt3_bridge_connect(db, context);
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS){
|
|
|
|
pollfds[pollfd_index].fd = context->sock;
|
2014-05-07 22:27:00 +00:00
|
|
|
pollfds[pollfd_index].events = POLLIN;
|
|
|
|
pollfds[pollfd_index].revents = 0;
|
2014-06-23 16:57:35 +00:00
|
|
|
if(context->current_out_packet){
|
2014-05-07 22:27:00 +00:00
|
|
|
pollfds[pollfd_index].events |= POLLOUT;
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
context->pollfd_index = pollfd_index;
|
2014-05-07 22:27:00 +00:00
|
|
|
pollfd_index++;
|
|
|
|
}else{
|
2014-06-23 16:57:35 +00:00
|
|
|
/* Retry later. */
|
|
|
|
context->bridge->restart_t = now+context->bridge->restart_timeout;
|
|
|
|
|
|
|
|
context->bridge->cur_address++;
|
|
|
|
if(context->bridge->cur_address == context->bridge->address_count){
|
|
|
|
context->bridge->cur_address = 0;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
2014-06-23 16:57:35 +00:00
|
|
|
now_time = time(NULL);
|
|
|
|
if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){
|
|
|
|
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
|
|
|
|
if(context->sock == -1 && context->clean_session == 0){
|
|
|
|
/* This is a persistent client, check to see if the
|
|
|
|
* last time it connected was longer than
|
|
|
|
* persistent_client_expiration seconds ago. If so,
|
|
|
|
* expire it and clean up.
|
|
|
|
*/
|
|
|
|
if(now_time > context->disconnect_t+db->config->persistent_client_expiration){
|
2015-01-26 21:02:53 +00:00
|
|
|
if(context->id){
|
|
|
|
id = context->id;
|
|
|
|
}else{
|
|
|
|
id = "<unknown>";
|
|
|
|
}
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifdef WITH_SYS_TREE
|
2014-06-23 16:57:35 +00:00
|
|
|
g_clients_expired++;
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
2014-06-23 16:57:35 +00:00
|
|
|
context->clean_session = true;
|
2015-01-06 17:35:12 +00:00
|
|
|
context->state = mosq_cs_expiring;
|
2014-08-18 23:36:09 +00:00
|
|
|
do_disconnect(db, context);
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-08-19 00:11:22 +00:00
|
|
|
expiration_check_time = time(NULL) + 3600;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
mqtt3_db_message_timeout_check(db, db->config->retry_interval);
|
|
|
|
|
|
|
|
#ifndef WIN32
|
|
|
|
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
|
|
|
|
fdcount = poll(pollfds, pollfd_index, 100);
|
|
|
|
sigprocmask(SIG_SETMASK, &origsig, NULL);
|
|
|
|
#else
|
|
|
|
fdcount = WSAPoll(pollfds, pollfd_index, 100);
|
|
|
|
#endif
|
|
|
|
if(fdcount == -1){
|
|
|
|
loop_handle_errors(db, pollfds);
|
|
|
|
}else{
|
|
|
|
loop_handle_reads_writes(db, pollfds);
|
|
|
|
|
|
|
|
for(i=0; i<listensock_count; i++){
|
|
|
|
if(pollfds[i].revents & (POLLIN | POLLPRI)){
|
|
|
|
while(mqtt3_socket_accept(db, listensock[i]) != -1){
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#ifdef WITH_PERSISTENCE
|
|
|
|
if(db->config->persistence && db->config->autosave_interval){
|
|
|
|
if(db->config->autosave_on_changes){
|
|
|
|
if(db->persistence_changes > db->config->autosave_interval){
|
2014-11-17 23:46:02 +00:00
|
|
|
mqtt3_db_backup(db, false);
|
2014-05-07 22:27:00 +00:00
|
|
|
db->persistence_changes = 0;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(last_backup + db->config->autosave_interval < mosquitto_time()){
|
2014-11-17 23:46:02 +00:00
|
|
|
mqtt3_db_backup(db, false);
|
2014-05-07 22:27:00 +00:00
|
|
|
last_backup = mosquitto_time();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
2014-11-17 23:46:02 +00:00
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifdef WITH_PERSISTENCE
|
|
|
|
if(flag_db_backup){
|
2014-11-17 23:46:02 +00:00
|
|
|
mqtt3_db_backup(db, false);
|
2014-05-07 22:27:00 +00:00
|
|
|
flag_db_backup = false;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
if(flag_reload){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
|
|
|
|
mqtt3_config_read(db->config, true);
|
|
|
|
mosquitto_security_cleanup(db, true);
|
|
|
|
mosquitto_security_init(db, true);
|
|
|
|
mosquitto_security_apply(db);
|
2015-02-05 22:44:38 +00:00
|
|
|
mqtt3_log_close(db->config);
|
|
|
|
mqtt3_log_init(db->config);
|
2014-05-07 22:27:00 +00:00
|
|
|
flag_reload = false;
|
|
|
|
}
|
|
|
|
if(flag_tree_print){
|
|
|
|
mqtt3_sub_tree_print(&db->subs, 0);
|
|
|
|
flag_tree_print = false;
|
|
|
|
}
|
2014-05-06 09:47:00 +00:00
|
|
|
#ifdef WITH_WEBSOCKETS
|
2014-05-26 17:01:24 +00:00
|
|
|
for(i=0; i<db->config->listener_count; i++){
|
2014-05-26 16:09:44 +00:00
|
|
|
/* Extremely hacky, should be using the lws provided external poll
|
|
|
|
* interface, but their interface has changed recently and ours
|
|
|
|
* will soon, so for now websockets clients are second class
|
|
|
|
* citizens. */
|
|
|
|
if(db->config->listeners[i].ws_context){
|
|
|
|
libwebsocket_service(db->config->listeners[i].ws_context, 0);
|
|
|
|
}
|
|
|
|
}
|
2014-05-06 09:47:00 +00:00
|
|
|
#endif
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
2014-05-26 16:09:44 +00:00
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
if(pollfds) _mosquitto_free(pollfds);
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
2014-07-03 00:00:57 +00:00
|
|
|
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
|
2014-05-07 22:27:00 +00:00
|
|
|
{
|
2015-01-26 21:02:53 +00:00
|
|
|
char *id;
|
|
|
|
|
2014-07-08 22:16:34 +00:00
|
|
|
if(context->state == mosq_cs_disconnected){
|
|
|
|
return;
|
|
|
|
}
|
2014-07-03 00:00:57 +00:00
|
|
|
#ifdef WITH_WEBSOCKETS
|
|
|
|
if(context->wsi){
|
2014-06-23 16:57:35 +00:00
|
|
|
if(context->state != mosq_cs_disconnecting){
|
2014-07-03 00:00:57 +00:00
|
|
|
context->state = mosq_cs_disconnect_ws;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
2014-07-03 00:00:57 +00:00
|
|
|
if(context->wsi){
|
|
|
|
libwebsocket_callback_on_writable(context->ws_context, context->wsi);
|
|
|
|
}
|
|
|
|
context->sock = INVALID_SOCKET;
|
2015-01-07 23:33:31 +00:00
|
|
|
}else
|
2014-07-03 00:00:57 +00:00
|
|
|
#endif
|
2015-01-07 23:33:31 +00:00
|
|
|
{
|
2014-07-03 00:00:57 +00:00
|
|
|
if(db->config->connection_messages == true){
|
2015-01-26 21:02:53 +00:00
|
|
|
if(context->id){
|
|
|
|
id = context->id;
|
|
|
|
}else{
|
|
|
|
id = "<unknown>";
|
|
|
|
}
|
2014-07-03 00:00:57 +00:00
|
|
|
if(context->state != mosq_cs_disconnecting){
|
2015-01-26 21:02:53 +00:00
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", id);
|
2014-07-03 00:00:57 +00:00
|
|
|
}else{
|
2015-01-26 21:02:53 +00:00
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id);
|
2014-07-03 00:00:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
mqtt3_context_disconnect(db, context);
|
2014-08-05 08:06:50 +00:00
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
if(context->clean_session && !context->bridge){
|
|
|
|
#else
|
2014-07-03 00:00:57 +00:00
|
|
|
if(context->clean_session){
|
2014-08-05 08:06:50 +00:00
|
|
|
#endif
|
2014-09-22 22:35:09 +00:00
|
|
|
mosquitto__add_context_to_disused(db, context);
|
2014-07-03 00:00:57 +00:00
|
|
|
if(context->id){
|
|
|
|
HASH_DELETE(hh_id, db->contexts_by_id, context);
|
|
|
|
_mosquitto_free(context->id);
|
|
|
|
context->id = NULL;
|
|
|
|
}
|
|
|
|
}
|
2015-01-07 23:33:31 +00:00
|
|
|
context->state = mosq_cs_disconnected;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Error ocurred, probably an fd has been closed.
|
|
|
|
* Loop through and check them all.
|
|
|
|
*/
|
|
|
|
static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds)
|
|
|
|
{
|
2014-06-23 16:57:35 +00:00
|
|
|
struct mosquitto *context, *ctxt_tmp;
|
2014-05-07 22:27:00 +00:00
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
2014-10-16 23:08:10 +00:00
|
|
|
if(context->pollfd_index < 0){
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
if(pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL)){
|
|
|
|
do_disconnect(db, context);
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
|
|
|
|
{
|
2014-06-23 16:57:35 +00:00
|
|
|
struct mosquitto *context, *ctxt_tmp;
|
2014-10-24 21:39:09 +00:00
|
|
|
#ifdef WIN32
|
|
|
|
char err;
|
|
|
|
#else
|
|
|
|
int err;
|
|
|
|
#endif
|
|
|
|
socklen_t len;
|
2014-05-07 22:27:00 +00:00
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
2014-10-16 23:08:10 +00:00
|
|
|
if(context->pollfd_index < 0){
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
assert(pollfds[context->pollfd_index].fd == context->sock);
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifdef WITH_TLS
|
2014-06-23 16:57:35 +00:00
|
|
|
if(pollfds[context->pollfd_index].revents & POLLOUT ||
|
|
|
|
context->want_write ||
|
|
|
|
(context->ssl && context->state == mosq_cs_new)){
|
2014-05-07 22:27:00 +00:00
|
|
|
#else
|
2014-06-23 16:57:35 +00:00
|
|
|
if(pollfds[context->pollfd_index].revents & POLLOUT){
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
2014-10-24 20:28:24 +00:00
|
|
|
if(context->state == mosq_cs_connect_pending){
|
|
|
|
len = sizeof(int);
|
|
|
|
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, &err, &len)){
|
|
|
|
if(err == 0){
|
|
|
|
context->state = mosq_cs_new;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
do_disconnect(db, context);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
if(_mosquitto_packet_write(context)){
|
|
|
|
do_disconnect(db, context);
|
|
|
|
continue;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
2014-06-23 16:57:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
2014-10-16 23:08:10 +00:00
|
|
|
if(context->pollfd_index < 0){
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
#ifdef WITH_TLS
|
2014-06-23 16:57:35 +00:00
|
|
|
if(pollfds[context->pollfd_index].revents & POLLIN ||
|
|
|
|
(context->ssl && context->state == mosq_cs_new)){
|
2014-05-07 22:27:00 +00:00
|
|
|
#else
|
2014-06-23 16:57:35 +00:00
|
|
|
if(pollfds[context->pollfd_index].revents & POLLIN){
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
2014-06-23 16:57:35 +00:00
|
|
|
if(_mosquitto_packet_read(db, context)){
|
|
|
|
do_disconnect(db, context);
|
|
|
|
continue;
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|