[166] Don't cancel external threads.
libmosquitto shouldn't cancel threads it didn't create. This change allows us to keep track of whether threads were created by the library or by external code. Thanks to Josip Ćavar. Bug: https://github.com/eclipse/mosquitto/issues/166
This commit is contained in:
parent
57da586703
commit
e8185ddaa7
@ -19,6 +19,8 @@ Client library:
|
|||||||
- Fix the case where a message received just before the keepalive timer
|
- Fix the case where a message received just before the keepalive timer
|
||||||
expired would cause the client to miss the keepalive timer.
|
expired would cause the client to miss the keepalive timer.
|
||||||
- Return value of pthread_create is now checked.
|
- Return value of pthread_create is now checked.
|
||||||
|
- _mosquitto_destroy should not cancel threads that weren't created by
|
||||||
|
libmosquitto. Closes #166.
|
||||||
|
|
||||||
Clients:
|
Clients:
|
||||||
- Handle some unchecked malloc() calls. Closes #1.
|
- Handle some unchecked malloc() calls. Closes #1.
|
||||||
|
@ -197,7 +197,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se
|
|||||||
mosq->reconnect_delay = 1;
|
mosq->reconnect_delay = 1;
|
||||||
mosq->reconnect_delay_max = 1;
|
mosq->reconnect_delay_max = 1;
|
||||||
mosq->reconnect_exponential_backoff = false;
|
mosq->reconnect_exponential_backoff = false;
|
||||||
mosq->threaded = false;
|
mosq->threaded = mosq_ts_none;
|
||||||
#ifdef WITH_TLS
|
#ifdef WITH_TLS
|
||||||
mosq->ssl = NULL;
|
mosq->ssl = NULL;
|
||||||
mosq->tls_cert_reqs = SSL_VERIFY_PEER;
|
mosq->tls_cert_reqs = SSL_VERIFY_PEER;
|
||||||
@ -278,10 +278,10 @@ void _mosquitto_destroy(struct mosquitto *mosq)
|
|||||||
if(!mosq) return;
|
if(!mosq) return;
|
||||||
|
|
||||||
#ifdef WITH_THREADING
|
#ifdef WITH_THREADING
|
||||||
if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
|
if(mosq->threaded == mosq_ts_self && !pthread_equal(mosq->thread_id, pthread_self())){
|
||||||
pthread_cancel(mosq->thread_id);
|
pthread_cancel(mosq->thread_id);
|
||||||
pthread_join(mosq->thread_id, NULL);
|
pthread_join(mosq->thread_id, NULL);
|
||||||
mosq->threaded = false;
|
mosq->threaded = mosq_ts_none;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(mosq->id){
|
if(mosq->id){
|
||||||
|
@ -112,6 +112,12 @@ enum _mosquitto_protocol {
|
|||||||
mosq_p_mqtts = 3
|
mosq_p_mqtts = 3
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum mosquitto__threaded_state {
|
||||||
|
mosq_ts_none, /* No threads in use */
|
||||||
|
mosq_ts_self, /* Threads started by libmosquitto */
|
||||||
|
mosq_ts_external /* Threads started by external code */
|
||||||
|
};
|
||||||
|
|
||||||
enum _mosquitto_transport {
|
enum _mosquitto_transport {
|
||||||
mosq_t_invalid = 0,
|
mosq_t_invalid = 0,
|
||||||
mosq_t_tcp = 1,
|
mosq_t_tcp = 1,
|
||||||
@ -245,7 +251,7 @@ struct mosquitto {
|
|||||||
unsigned int reconnect_delay;
|
unsigned int reconnect_delay;
|
||||||
unsigned int reconnect_delay_max;
|
unsigned int reconnect_delay_max;
|
||||||
bool reconnect_exponential_backoff;
|
bool reconnect_exponential_backoff;
|
||||||
bool threaded;
|
char threaded;
|
||||||
struct _mosquitto_packet *out_packet_last;
|
struct _mosquitto_packet *out_packet_last;
|
||||||
int inflight_messages;
|
int inflight_messages;
|
||||||
int max_inflight_messages;
|
int max_inflight_messages;
|
||||||
|
@ -187,7 +187,7 @@ int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *pa
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if(mosq->in_callback == false && mosq->threaded == false){
|
if(mosq->in_callback == false && mosq->threaded == mosq_ts_none){
|
||||||
return _mosquitto_packet_write(mosq);
|
return _mosquitto_packet_write(mosq);
|
||||||
}else{
|
}else{
|
||||||
return MOSQ_ERR_SUCCESS;
|
return MOSQ_ERR_SUCCESS;
|
||||||
|
@ -28,9 +28,9 @@ void *_mosquitto_thread_main(void *obj);
|
|||||||
int mosquitto_loop_start(struct mosquitto *mosq)
|
int mosquitto_loop_start(struct mosquitto *mosq)
|
||||||
{
|
{
|
||||||
#ifdef WITH_THREADING
|
#ifdef WITH_THREADING
|
||||||
if(!mosq || mosq->threaded) return MOSQ_ERR_INVAL;
|
if(!mosq || mosq->threaded != mosq_ts_none) return MOSQ_ERR_INVAL;
|
||||||
|
|
||||||
mosq->threaded = true;
|
mosq->threaded = mosq_ts_self;
|
||||||
if(!pthread_create(&mosq->thread_id, NULL, _mosquitto_thread_main, mosq)){
|
if(!pthread_create(&mosq->thread_id, NULL, _mosquitto_thread_main, mosq)){
|
||||||
return MOSQ_ERR_SUCCESS;
|
return MOSQ_ERR_SUCCESS;
|
||||||
}else{
|
}else{
|
||||||
@ -48,7 +48,7 @@ int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
|
|||||||
char sockpair_data = 0;
|
char sockpair_data = 0;
|
||||||
# endif
|
# endif
|
||||||
|
|
||||||
if(!mosq || !mosq->threaded) return MOSQ_ERR_INVAL;
|
if(!mosq || mosq->threaded != mosq_ts_self) return MOSQ_ERR_INVAL;
|
||||||
|
|
||||||
|
|
||||||
/* Write a single byte to sockpairW (connected to sockpairR) to break out
|
/* Write a single byte to sockpairW (connected to sockpairR) to break out
|
||||||
@ -67,7 +67,7 @@ int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
|
|||||||
}
|
}
|
||||||
pthread_join(mosq->thread_id, NULL);
|
pthread_join(mosq->thread_id, NULL);
|
||||||
mosq->thread_id = pthread_self();
|
mosq->thread_id = pthread_self();
|
||||||
mosq->threaded = false;
|
mosq->threaded = mosq_ts_none;
|
||||||
|
|
||||||
return MOSQ_ERR_SUCCESS;
|
return MOSQ_ERR_SUCCESS;
|
||||||
#else
|
#else
|
||||||
@ -106,7 +106,11 @@ int mosquitto_threaded_set(struct mosquitto *mosq, bool threaded)
|
|||||||
{
|
{
|
||||||
if(!mosq) return MOSQ_ERR_INVAL;
|
if(!mosq) return MOSQ_ERR_INVAL;
|
||||||
|
|
||||||
mosq->threaded = threaded;
|
if(threaded){
|
||||||
|
mosq->threaded = mosq_ts_external;
|
||||||
|
}else{
|
||||||
|
mosq->threaded = mosq_ts_none;
|
||||||
|
}
|
||||||
|
|
||||||
return MOSQ_ERR_SUCCESS;
|
return MOSQ_ERR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user