mosquitto/lib/thread_mosq.c

152 lines
3.3 KiB
C
Raw Permalink Normal View History

2014-05-07 22:27:00 +00:00
/*
Copyright (c) 2011-2020 Roger Light <roger@atchoo.org>
2014-05-07 22:27:00 +00:00
All rights reserved. This program and the accompanying materials
2020-11-25 17:34:21 +00:00
are made available under the terms of the Eclipse Public License 2.0
2014-05-07 22:27:00 +00:00
and Eclipse Distribution License v1.0 which accompany this distribution.
2021-10-05 14:20:37 +00:00
2014-05-07 22:27:00 +00:00
The Eclipse Public License is available at
2020-11-25 17:34:21 +00:00
https://www.eclipse.org/legal/epl-2.0/
2014-05-07 22:27:00 +00:00
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
2021-10-05 14:20:37 +00:00
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
2020-12-01 18:21:59 +00:00
2014-05-07 22:27:00 +00:00
Contributors:
Roger Light - initial implementation and documentation.
*/
2015-04-29 20:37:47 +00:00
#include "config.h"
2014-05-07 22:27:00 +00:00
#ifndef WIN32
2018-08-15 16:02:56 +00:00
#include <time.h>
2014-05-07 22:27:00 +00:00
#endif
#if defined(WITH_THREADING)
#if defined(__linux__) || defined(__NetBSD__)
# include <pthread.h>
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
# include <pthread_np.h>
#endif
#endif
2015-04-29 20:37:47 +00:00
#include "mosquitto_internal.h"
#include "net_mosq.h"
#include "util_mosq.h"
2014-05-07 22:27:00 +00:00
void *mosquitto__thread_main(void *obj);
2014-05-07 22:27:00 +00:00
int mosquitto_loop_start(struct mosquitto *mosq)
{
#if defined(WITH_THREADING)
if(!mosq || mosq->threaded != mosq_ts_none) return MOSQ_ERR_INVAL;
2014-05-07 22:27:00 +00:00
mosq->threaded = mosq_ts_self;
2016-06-21 22:33:58 +00:00
if(!pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)){
#if defined(__linux__)
pthread_setname_np(mosq->thread_id, "mosquitto loop");
#elif defined(__NetBSD__)
pthread_setname_np(mosq->thread_id, "%s", "mosquitto loop");
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(mosq->thread_id, "mosquitto loop");
#endif
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_ERRNO;
}
2014-05-07 22:27:00 +00:00
#else
UNUSED(mosq);
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}
int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
{
#if defined(WITH_THREADING)
2014-05-07 22:27:00 +00:00
# ifndef WITH_BROKER
char sockpair_data = 0;
# endif
if(!mosq || mosq->threaded != mosq_ts_self) return MOSQ_ERR_INVAL;
2014-05-07 22:27:00 +00:00
/* Write a single byte to sockpairW (connected to sockpairR) to break out
* of select() if in threaded mode. */
if(mosq->sockpairW != INVALID_SOCKET){
#ifndef WIN32
if(write(mosq->sockpairW, &sockpair_data, 1)){
}
#else
send(mosq->sockpairW, &sockpair_data, 1, 0);
#endif
}
2021-10-05 14:20:37 +00:00
#ifdef HAVE_PTHREAD_CANCEL
2014-05-07 22:27:00 +00:00
if(force){
pthread_cancel(mosq->thread_id);
}
#endif
2014-05-07 22:27:00 +00:00
pthread_join(mosq->thread_id, NULL);
mosq->thread_id = pthread_self();
mosq->threaded = mosq_ts_none;
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_SUCCESS;
#else
UNUSED(mosq);
UNUSED(force);
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}
#ifdef WITH_THREADING
void *mosquitto__thread_main(void *obj)
2014-05-07 22:27:00 +00:00
{
struct mosquitto *mosq = obj;
2018-08-15 16:02:56 +00:00
#ifndef WIN32
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 10000000;
#endif
2014-05-07 22:27:00 +00:00
if(!mosq) return NULL;
do{
2020-12-02 10:19:18 +00:00
if(mosquitto__get_state(mosq) == mosq_cs_new){
#ifdef WIN32
Sleep(10);
#else
2018-08-15 16:02:56 +00:00
nanosleep(&ts, NULL);
#endif
}else{
break;
}
}while(1);
2014-05-07 22:27:00 +00:00
if(!mosq->keepalive){
/* Sleep for a day if keepalive disabled. */
mosquitto_loop_forever(mosq, 1000*86400, 1);
2014-05-07 22:27:00 +00:00
}else{
/* Sleep for our keepalive value. publish() etc. will wake us up. */
mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1);
}
if(mosq->threaded == mosq_ts_self){
mosq->threaded = mosq_ts_none;
}
2014-05-07 22:27:00 +00:00
return obj;
}
#endif
2014-07-02 19:45:26 +00:00
int mosquitto_threaded_set(struct mosquitto *mosq, bool threaded)
{
if(!mosq) return MOSQ_ERR_INVAL;
if(threaded){
mosq->threaded = mosq_ts_external;
}else{
mosq->threaded = mosq_ts_none;
}
2014-07-02 19:45:26 +00:00
return MOSQ_ERR_SUCCESS;
}