Fix reconnect support for websockets.

This commit is contained in:
Roger A. Light 2014-06-30 23:30:43 +01:00
parent 39674a443e
commit d75903b0ae
7 changed files with 193 additions and 38 deletions

View File

@ -231,7 +231,9 @@ int _mosquitto_socket_close(struct mosquitto *mosq)
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_disconnect_ws;
}
if(mosq->wsi){
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
}
mosq->sock = INVALID_SOCKET;
#endif
}

View File

@ -326,14 +326,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
#endif
}
#ifdef WITH_WEBSOCKETS
for(i=0; i<db->config->listener_count; i++){
if(db->config->listeners[i].ws_context){
libwebsocket_context_destroy(db->config->listeners[i].ws_context);
}
}
#endif
if(pollfds) _mosquitto_free(pollfds);
return MOSQ_ERR_SUCCESS;
}

View File

@ -42,6 +42,9 @@ Contributors:
#ifdef WITH_WRAP
#include <tcpd.h>
#endif
#ifdef WITH_WEBSOCKETS
# include <libwebsockets.h>
#endif
#include <mosquitto_broker.h>
#include <memory_mosq.h>
@ -162,6 +165,9 @@ int main(int argc, char *argv[])
struct timeval tv;
#endif
struct mosquitto *ctxt, *ctxt_tmp;
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack *hack_head, *hack;
#endif
#if defined(WIN32) || defined(__CYGWIN__)
if(argc == 2){
@ -331,8 +337,14 @@ int main(int argc, char *argv[])
#endif
HASH_ITER(hh_id, int_db.contexts_by_id, ctxt, ctxt_tmp){
#ifdef WITH_WEBSOCKETS
if(!ctxt->wsi){
mqtt3_context_cleanup(&int_db, ctxt, true);
}
#else
mqtt3_context_cleanup(&int_db, ctxt, true);
#endif
}
HASH_ITER(hh_sock, int_db.contexts_by_sock, ctxt, ctxt_tmp){
mqtt3_context_cleanup(&int_db, ctxt, true);
}
@ -343,6 +355,22 @@ int main(int argc, char *argv[])
HASH_DELETE(hh_for_free, int_db.contexts_for_free, ctxt);
mqtt3_context_cleanup(&int_db, ctxt, true);
}
#ifdef WITH_WEBSOCKETS
for(i=0; i<int_db.config->listener_count; i++){
if(int_db.config->listeners[i].ws_context){
hack_head = libwebsocket_context_user(int_db.config->listeners[i].ws_context);
libwebsocket_context_destroy(int_db.config->listeners[i].ws_context);
if(hack_head){
while(hack_head){
hack = hack_head->next;
_mosquitto_free(hack_head);
hack_head = hack;
}
}
}
}
#endif
mqtt3_db_close(&int_db);
if(listensock){

View File

@ -294,6 +294,18 @@ struct _mqtt3_bridge{
#endif
};
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack {
struct mosquitto *old_mosq;
struct mosquitto *new_mosq;
struct libws_mqtt_hack *next;
};
struct libws_mqtt_data {
struct mosquitto *mosq;
};
#endif
#include <net_mosq.h>
/* ============================================================

View File

@ -98,6 +98,9 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
X509_NAME *name;
X509_NAME_ENTRY *name_entry;
#endif
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack *ws_ctxt_user, *ws_ctxt_user_head;
#endif
#ifdef WITH_SYS_TREE
g_connection_count++;
@ -387,10 +390,6 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
/* Find if this client already has an entry. This must be done *after* any security checks. */
HASH_FIND(hh_id, db->contexts_by_id, client_id, strlen(client_id), found_context);
if(found_context){
HASH_DELETE(hh_id, db->contexts_by_id, found_context);
_mosquitto_free(found_context->id);
found_context->id = NULL;
/* Found a matching client */
if(found_context->sock == INVALID_SOCKET){
/* Client is reconnecting after a disconnect */
@ -398,16 +397,12 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
#ifdef WITH_SYS_TREE
db->disconnected_count--;
#endif
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, found_context, sizeof(void *), found_context);
}else{
/* Client is already connected, disconnect old version */
/* Client is already connected, disconnect old version. This is
* done in mqtt3_context_cleanup() below. */
if(db->config->connection_messages == true){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Client %s already connected, closing old connection.", client_id);
}
if(found_context->sock >= 0){
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, found_context, sizeof(void *), found_context);
}
mqtt3_context_disconnect(db, found_context);
}
if(context->protocol == mosq_p_mqtt311){
@ -416,16 +411,105 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
}
}
if(found_context->msgs){
if(context->last_msg){
context->last_msg->next = found_context->msgs;
context->last_msg = context->last_msg->next;
found_context->clean_session = clean_session;
mqtt3_context_cleanup(db, found_context, false);
found_context->state = mosq_cs_connected;
if(context->address){
found_context->address = context->address;
context->address = NULL;
}else{
context->msgs = found_context->msgs;
context->last_msg = found_context->msgs;
found_context->address = NULL;
}
found_context->disconnect_t = 0;
found_context->sock = context->sock;
found_context->listener = context->listener;
context->listener = NULL;
found_context->last_msg_in = mosquitto_time();
found_context->last_msg_out = mosquitto_time();
found_context->keepalive = context->keepalive;
found_context->pollfd_index = context->pollfd_index;
#ifdef WITH_TLS
found_context->ssl = context->ssl;
#endif
if(context->username){
found_context->username = context->username;
context->username = NULL;
}
if(context->password){
found_context->password = context->password;
context->password = NULL;
}
#ifdef WITH_TLS
context->ssl = NULL;
#endif
context->state = mosq_cs_disconnecting;
#ifdef WITH_WEBSOCKETS
if(found_context->wsi){
/* This is a hack to allow us to update the wsi->user_space
* structure. If libwebsockets let us access that variable itself,
* this wouldn't be necessary. */
ws_ctxt_user_head = (struct libws_mqtt_hack *)libwebsocket_context_user(found_context->ws_context);
ws_ctxt_user = _mosquitto_calloc(1, sizeof(struct libws_mqtt_hack));
if(!ws_ctxt_user){
goto handle_connect_error;
}
ws_ctxt_user->old_mosq = found_context;
ws_ctxt_user->new_mosq = NULL;
ws_ctxt_user->next = ws_ctxt_user_head->next;
ws_ctxt_user_head->next = ws_ctxt_user;
found_context->sock = INVALID_SOCKET;
found_context->wsi = NULL;
}
if(context->wsi){
found_context->wsi = context->wsi;
found_context->sock = WEBSOCKET_CLIENT;
context->wsi = NULL;
context->sock = INVALID_SOCKET;
/* This is a hack to allow us to update the wsi->user_space
* structure. If libwebsockets let us access that variable itself,
* this wouldn't be necessary. */
ws_ctxt_user_head = (struct libws_mqtt_hack *)libwebsocket_context_user(found_context->ws_context);
ws_ctxt_user = _mosquitto_calloc(1, sizeof(struct libws_mqtt_hack));
if(!ws_ctxt_user){
goto handle_connect_error;
}
ws_ctxt_user->old_mosq = context;
ws_ctxt_user->new_mosq = found_context;
if(ws_ctxt_user_head){
while(ws_ctxt_user_head->next){
ws_ctxt_user_head = ws_ctxt_user_head->next;
}
ws_ctxt_user_head->next = ws_ctxt_user;
}else{
ws_ctxt_user->next = ws_ctxt_user_head->next;
ws_ctxt_user_head->next = ws_ctxt_user;
}
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
}else{
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
context->sock = INVALID_SOCKET;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(found_context->sock), found_context);
}
#else
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
context->sock = INVALID_SOCKET;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(found_context->sock), found_context);
#endif
context = found_context;
if(context->msgs){
mqtt3_db_message_reconnect_reset(context);
found_context->msgs = NULL;
}
}

View File

@ -64,10 +64,6 @@ enum mosq_ws_protocols {
DEMO_PROTOCOL_COUNT
};
struct libws_mqtt_data {
struct mosquitto *mosq;
};
struct libws_http_data {
char blank;
};
@ -110,6 +106,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
struct _mosquitto_packet *packet;
int count;
struct libws_mqtt_data *u = (struct libws_mqtt_data *)user;
struct libws_mqtt_hack *hack_head, *hack, *hack_prev = NULL;
size_t pos;
uint8_t *buf;
int rc;
@ -117,6 +114,26 @@ static int callback_mqtt(struct libwebsocket_context *context,
db = &int_db;
/* Update wsi->user, in case of reconnecting client */
hack_head = (struct libws_mqtt_hack *)libwebsocket_context_user(context);
if(hack_head && u && u->mosq){
hack = hack_head->next;
while(hack){
if(hack->old_mosq == u->mosq){
u->mosq = hack->new_mosq;
if(hack_prev){
hack_prev->next = hack->next;
}else{
hack_head->next = hack->next;
}
_mosquitto_free(hack);
break;
}
hack_prev = hack;
hack = hack->next;
}
}
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
mosq = mqtt3_context_init(db, WEBSOCKET_CLIENT);
@ -129,16 +146,19 @@ static int callback_mqtt(struct libwebsocket_context *context,
case LWS_CALLBACK_CLOSED:
mosq = u->mosq;
mqtt3_context_disconnect(db, mosq);
if(mosq){
mosq->wsi = NULL;
if(mosq->clean_session){
mqtt3_context_cleanup(db, mosq, true);
}else{
mqtt3_context_disconnect(db, mosq);
}
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
mosq = u->mosq;
if(mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
if(!mosq || mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1;
}
@ -227,7 +247,9 @@ static int callback_mqtt(struct libwebsocket_context *context,
if(mosq->in_packet.remaining_length > 0){
mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
if(!mosq->in_packet.payload) return -1;
if(!mosq->in_packet.payload){
return -1;
}
mosq->in_packet.to_process = mosq->in_packet.remaining_length;
}
mosq->in_packet.have_remaining = 1;
@ -312,6 +334,10 @@ struct libwebsocket_context *mosq_websockets_init(struct _mqtt3_listener *listen
info.options |= LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT;
}
#endif
info.user = _mosquitto_calloc(1, sizeof(struct libws_mqtt_hack));
if(!info.user){
return NULL;
}
lws_set_log_level(0, NULL);

View File

@ -36,10 +36,21 @@ broker = mosq_test.start_broker(filename=os.path.basename(__file__), cmd=cmd)
local_cmd = ['../../src/mosquitto', '-c', '06-bridge-reconnect-local-out.conf']
local_broker = mosq_test.start_broker(cmd=local_cmd, filename=os.path.basename(__file__)+'_local1')
time.sleep(0.5)
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
time.sleep(5)
else:
time.sleep(0.5)
local_broker.terminate()
local_broker.wait()
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
time.sleep(5)
else:
time.sleep(0.5)
local_broker = mosq_test.start_broker(cmd=local_cmd, filename=os.path.basename(__file__)+'_local2')
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
time.sleep(5)
else:
time.sleep(0.5)
pub = None
try: