Separate pub client loops for better readability.

This commit is contained in:
Roger A. Light 2019-11-06 11:26:58 +00:00
parent 74e1f77310
commit aabf850a62

View File

@ -220,104 +220,74 @@ int pub_shared_init(void)
}
int pub_shared_loop(struct mosquitto *mosq)
int pub_stdin_line_loop(struct mosquitto *mosq)
{
int read_len;
int pos;
int rc = MOSQ_ERR_SUCCESS;
char *buf2;
int buf_len_actual;
int mode;
int loop_delay = 1000;
int pos;
int rc = MOSQ_ERR_SUCCESS;
int read_len;
bool stdin_finished = false;
if(cfg.repeat_count > 1 && (cfg.repeat_delay.tv_sec == 0 || cfg.repeat_delay.tv_usec != 0)){
loop_delay = cfg.repeat_delay.tv_usec / 2000;
}
mode = cfg.pub_mode;
if(mode == MSGMODE_STDIN_LINE){
mosquitto_loop_start(mosq);
stdin_finished = false;
do{
if(status == STATUS_CONNACK_RECVD){
pos = 0;
read_len = line_buf_len;
while(status == STATUS_CONNACK_RECVD && fgets(&line_buf[pos], read_len, stdin)){
buf_len_actual = strlen(line_buf);
if(line_buf[buf_len_actual-1] == '\n'){
line_buf[buf_len_actual-1] = '\0';
rc = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, line_buf, cfg.qos, cfg.retain);
if(rc){
err_printf(&cfg, "Error: Publish returned %d, disconnecting.\n", rc);
mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props);
}
break;
}else{
line_buf_len += 1024;
pos += 1023;
read_len = 1024;
buf2 = realloc(line_buf, line_buf_len);
if(!buf2){
err_printf(&cfg, "Error: Out of memory.\n");
return MOSQ_ERR_NOMEM;
}
line_buf = buf2;
mosquitto_loop_start(mosq);
stdin_finished = false;
do{
if(status == STATUS_CONNACK_RECVD){
pos = 0;
read_len = line_buf_len;
while(status == STATUS_CONNACK_RECVD && fgets(&line_buf[pos], read_len, stdin)){
buf_len_actual = strlen(line_buf);
if(line_buf[buf_len_actual-1] == '\n'){
line_buf[buf_len_actual-1] = '\0';
rc = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, line_buf, cfg.qos, cfg.retain);
if(rc){
err_printf(&cfg, "Error: Publish returned %d, disconnecting.\n", rc);
mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props);
}
}
if(feof(stdin)){
if(mid_sent == -1){
/* Empty file */
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
status = STATUS_DISCONNECTING;
}else{
last_mid = mid_sent;
status = STATUS_WAITING;
break;
}else{
line_buf_len += 1024;
pos += 1023;
read_len = 1024;
buf2 = realloc(line_buf, line_buf_len);
if(!buf2){
err_printf(&cfg, "Error: Out of memory.\n");
return MOSQ_ERR_NOMEM;
}
stdin_finished = true;
}else if(status == STATUS_DISCONNECTED){
/* Not end of stdin, so we've lost our connection and must
* reconnect */
line_buf = buf2;
}
}else if(status == STATUS_WAITING){
if(last_mid_sent == last_mid && disconnect_sent == false){
}
if(feof(stdin)){
if(mid_sent == -1){
/* Empty file */
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
status = STATUS_DISCONNECTING;
}else{
last_mid = mid_sent;
status = STATUS_WAITING;
}
stdin_finished = true;
}else if(status == STATUS_DISCONNECTED){
/* Not end of stdin, so we've lost our connection and must
* reconnect */
}
}else if(status == STATUS_WAITING){
if(last_mid_sent == last_mid && disconnect_sent == false){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
}
#ifdef WIN32
Sleep(100);
Sleep(100);
#else
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000;
nanosleep(&ts, NULL);
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000;
nanosleep(&ts, NULL);
#endif
}
}while(stdin_finished == false);
mosquitto_loop_stop(mosq, false);
}else{
do{
rc = mosquitto_loop(mosq, loop_delay, 1);
if(ready_for_repeat && check_repeat_time()){
rc = MOSQ_ERR_SUCCESS;
switch(cfg.pub_mode){
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
break;
case MSGMODE_NULL:
rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain);
break;
}
if(rc){
err_printf(&cfg, "Error sending repeat publish: %s", mosquitto_strerror(rc));
}
}
}while(rc == MOSQ_ERR_SUCCESS);
}
}
}while(stdin_finished == false);
mosquitto_loop_stop(mosq, false);
if(status == STATUS_DISCONNECTED){
return MOSQ_ERR_SUCCESS;
@ -327,6 +297,53 @@ int pub_shared_loop(struct mosquitto *mosq)
}
int pub_other_loop(struct mosquitto *mosq)
{
int rc;
int loop_delay = 1000;
if(cfg.repeat_count > 1 && (cfg.repeat_delay.tv_sec == 0 || cfg.repeat_delay.tv_usec != 0)){
loop_delay = cfg.repeat_delay.tv_usec / 2000;
}
do{
rc = mosquitto_loop(mosq, loop_delay, 1);
if(ready_for_repeat && check_repeat_time()){
rc = MOSQ_ERR_SUCCESS;
switch(cfg.pub_mode){
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
break;
case MSGMODE_NULL:
rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain);
break;
}
if(rc){
err_printf(&cfg, "Error sending repeat publish: %s", mosquitto_strerror(rc));
}
}
}while(rc == MOSQ_ERR_SUCCESS);
if(status == STATUS_DISCONNECTED){
return MOSQ_ERR_SUCCESS;
}else{
return rc;
}
}
int pub_shared_loop(struct mosquitto *mosq)
{
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
return pub_stdin_line_loop(mosq);
}else{
return pub_other_loop(mosq);
}
}
void pub_shared_cleanup(void)
{
free(line_buf);