Split common publish code into separate file.

This commit is contained in:
Roger A. Light 2018-11-21 22:47:33 +00:00
parent 29cf4266d9
commit dd158ffeb3
5 changed files with 302 additions and 219 deletions

View File

@ -8,7 +8,7 @@ if (${WITH_SRV} STREQUAL ON)
add_definitions("-DWITH_SRV")
endif (${WITH_SRV} STREQUAL ON)
add_executable(mosquitto_pub pub_client.c ${shared_src})
add_executable(mosquitto_pub pub_client.c pub_shared.c ${shared_src})
add_executable(mosquitto_sub sub_client.c sub_client_output.c ${shared_src})
target_link_libraries(mosquitto_pub libmosquitto)

View File

@ -8,13 +8,13 @@ static : static_pub static_sub
# This makes mosquitto_pub/sub versions that are statically linked with
# libmosquitto only.
static_pub : pub_client.o client_shared.o ../lib/libmosquitto.a
static_pub : pub_client.o pub_shared.o client_shared.o ../lib/libmosquitto.a
${CROSS_COMPILE}${CC} $^ -o mosquitto_pub ${CLIENT_LDFLAGS} -lssl -lcrypto -lpthread
static_sub : sub_client.o sub_client_output.o client_shared.o ../lib/libmosquitto.a
${CROSS_COMPILE}${CC} $^ -o mosquitto_sub ${CLIENT_LDFLAGS} -lssl -lcrypto -lpthread
mosquitto_pub : pub_client.o client_shared.o client_props.o
mosquitto_pub : pub_client.o pub_shared.o client_shared.o client_props.o
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}
mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o
@ -23,6 +23,9 @@ mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o
pub_client.o : pub_client.c ../lib/libmosquitto.so.${SOVERSION}
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
pub_shared.o : pub_shared.c ../lib/libmosquitto.so.${SOVERSION}
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
sub_client.o : sub_client.c ../lib/libmosquitto.so.${SOVERSION}
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}

View File

@ -31,29 +31,10 @@ Contributors:
#include <mosquitto.h>
#include "client_shared.h"
#define STATUS_CONNECTING 0
#define STATUS_CONNACK_RECVD 1
#define STATUS_WAITING 2
#define STATUS_DISCONNECTING 3
#include "pub_shared.h"
/* Global variables for use in callbacks. See sub_client.c for an example of
* using a struct to hold variables for use in callbacks. */
static char *topic = NULL;
static char *message = NULL;
static long msglen = 0;
static int qos = 0;
static int retain = 0;
static int mode = MSGMODE_NONE;
static int status = STATUS_CONNECTING;
static int mid_sent = 0;
static int last_mid = -1;
static int last_mid_sent = -1;
static bool connected = true;
static char *username = NULL;
static char *password = NULL;
static bool disconnect_sent = false;
static bool quiet = false;
static struct mosq_config cfg;
static bool first_publish = true;
@ -73,21 +54,21 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
int rc = MOSQ_ERR_SUCCESS;
if(!result){
switch(mode){
switch(cfg.pub_mode){
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = my_publish(mosq, &mid_sent, topic, msglen, message, qos, retain);
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
break;
case MSGMODE_NULL:
rc = my_publish(mosq, &mid_sent, topic, 0, NULL, qos, retain);
rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain);
break;
case MSGMODE_STDIN_LINE:
status = STATUS_CONNACK_RECVD;
break;
}
if(rc){
if(!quiet){
if(!cfg.quiet){
switch(rc){
case MOSQ_ERR_INVAL:
fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
@ -109,109 +90,12 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
}
}else{
if(result && !quiet){
if(result && !cfg.quiet){
fprintf(stderr, "%s\n", mosquitto_connack_string(result));
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
connected = false;
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
last_mid_sent = mid;
if(mode == MSGMODE_STDIN_LINE){
if(mid == last_mid){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
}else if(disconnect_sent == false){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
}
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
{
printf("%s\n", str);
}
int load_stdin(void)
{
long pos = 0, rlen;
char buf[1024];
char *aux_message = NULL;
mode = MSGMODE_STDIN_FILE;
while(!feof(stdin)){
rlen = fread(buf, 1, 1024, stdin);
aux_message = realloc(message, pos+rlen);
if(!aux_message){
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
free(message);
return 1;
} else
{
message = aux_message;
}
memcpy(&(message[pos]), buf, rlen);
pos += rlen;
}
msglen = pos;
if(!msglen){
if(!quiet) fprintf(stderr, "Error: Zero length input.\n");
return 1;
}
return 0;
}
int load_file(const char *filename)
{
long pos, rlen;
FILE *fptr = NULL;
fptr = fopen(filename, "rb");
if(!fptr){
if(!quiet) fprintf(stderr, "Error: Unable to open file \"%s\".\n", filename);
return 1;
}
mode = MSGMODE_FILE;
fseek(fptr, 0, SEEK_END);
msglen = ftell(fptr);
if(msglen > 268435455){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: File \"%s\" is too large (>268,435,455 bytes).\n", filename);
return 1;
}else if(msglen == 0){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: File \"%s\" is empty.\n", filename);
return 1;
}else if(msglen < 0){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: Unable to determine size of file \"%s\".\n", filename);
return 1;
}
fseek(fptr, 0, SEEK_SET);
message = malloc(msglen);
if(!message){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
pos = 0;
while(pos < msglen){
rlen = fread(&(message[pos]), sizeof(char), msglen-pos, fptr);
pos += rlen;
}
fclose(fptr);
return 0;
}
void print_usage(void)
{
@ -313,21 +197,11 @@ int main(int argc, char *argv[])
{
struct mosquitto *mosq = NULL;
int rc;
int rc2;
char *buf, *buf2;
int buf_len = 1024;
int buf_len_actual;
int read_len;
int pos;
buf = malloc(buf_len);
if(!buf){
fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
mosquitto_lib_init();
if(pub_shared_init()) return 1;
memset(&cfg, 0, sizeof(struct mosq_config));
rc = client_config_load(&cfg, CLIENT_PUB, argc, argv);
if(rc){
@ -340,16 +214,6 @@ int main(int argc, char *argv[])
goto cleanup;
}
topic = cfg.topic;
message = cfg.message;
msglen = cfg.msglen;
qos = cfg.qos;
retain = cfg.retain;
mode = cfg.pub_mode;
username = cfg.username;
password = cfg.password;
quiet = cfg.quiet;
#ifndef WITH_THREADING
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
fprintf(stderr, "Error: '-l' mode not available, threading support has not been compiled in.\n");
@ -369,7 +233,7 @@ int main(int argc, char *argv[])
}
}
if(!topic || mode == MSGMODE_NONE){
if(!cfg.topic || cfg.pub_mode == MSGMODE_NONE){
fprintf(stderr, "Error: Both topic and message must be supplied.\n");
print_usage();
goto cleanup;
@ -384,10 +248,10 @@ int main(int argc, char *argv[])
if(!mosq){
switch(errno){
case ENOMEM:
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n");
break;
case EINVAL:
if(!quiet) fprintf(stderr, "Error: Invalid id.\n");
if(!cfg.quiet) fprintf(stderr, "Error: Invalid id.\n");
break;
}
goto cleanup;
@ -402,84 +266,21 @@ int main(int argc, char *argv[])
if(client_opts_set(mosq, &cfg)){
goto cleanup;
}
rc = client_connect(mosq, &cfg);
if(rc){
goto cleanup;
}
if(mode == MSGMODE_STDIN_LINE){
mosquitto_loop_start(mosq);
}
rc = pub_shared_loop(mosq);
do{
if(mode == MSGMODE_STDIN_LINE){
if(status == STATUS_CONNACK_RECVD){
pos = 0;
read_len = buf_len;
while(fgets(&buf[pos], read_len, stdin)){
buf_len_actual = strlen(buf);
if(buf[buf_len_actual-1] == '\n'){
buf[buf_len_actual-1] = '\0';
rc2 = my_publish(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain);
if(rc2){
if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
}
break;
}else{
buf_len += 1024;
pos += 1023;
read_len = 1024;
buf2 = realloc(buf, buf_len);
if(!buf2){
fprintf(stderr, "Error: Out of memory.\n");
goto cleanup;
}
buf = buf2;
}
}
if(feof(stdin)){
if(last_mid == -1){
/* Empty file */
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
status = STATUS_DISCONNECTING;
}else{
last_mid = mid_sent;
status = STATUS_WAITING;
}
}
}else if(status == STATUS_WAITING){
if(last_mid_sent == last_mid && disconnect_sent == false){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
#ifdef WIN32
Sleep(100);
#else
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000;
nanosleep(&ts, NULL);
#endif
}
rc = MOSQ_ERR_SUCCESS;
}else{
rc = mosquitto_loop(mosq, -1, 1);
}
}while(rc == MOSQ_ERR_SUCCESS && connected);
if(mode == MSGMODE_STDIN_LINE){
mosquitto_loop_stop(mosq, false);
}
if(message && mode == MSGMODE_FILE){
free(message);
if(cfg.message && cfg.pub_mode == MSGMODE_FILE){
free(cfg.message);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
client_config_cleanup(&cfg);
free(buf);
pub_shared_cleanup();
if(rc){
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
@ -489,6 +290,6 @@ int main(int argc, char *argv[])
cleanup:
mosquitto_lib_cleanup();
client_config_cleanup(&cfg);
free(buf);
pub_shared_cleanup();
return 1;
}

238
client/pub_shared.c Normal file
View File

@ -0,0 +1,238 @@
/*
Copyright (c) 2009-2018 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <time.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#endif
#include <mosquitto.h>
#include "client_shared.h"
#include "pub_shared.h"
/* Global variables for use in callbacks. See sub_client.c for an example of
* using a struct to hold variables for use in callbacks. */
int mid_sent = 0;
int status = STATUS_CONNECTING;
static int last_mid = -1;
static int last_mid_sent = -1;
static bool connected = true;
static bool disconnect_sent = false;
static struct mosq_config cfg;
static char *buf;
static int buf_len = 1024;
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
connected = false;
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
last_mid_sent = mid;
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
if(mid == last_mid){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
}else if(disconnect_sent == false){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
}
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
{
printf("%s\n", str);
}
int load_stdin(void)
{
long pos = 0, rlen;
char buf[1024];
char *aux_message = NULL;
cfg.pub_mode = MSGMODE_STDIN_FILE;
while(!feof(stdin)){
rlen = fread(buf, 1, 1024, stdin);
aux_message = realloc(cfg.message, pos+rlen);
if(!aux_message){
if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n");
free(cfg.message);
return 1;
} else
{
cfg.message = aux_message;
}
memcpy(&(cfg.message[pos]), buf, rlen);
pos += rlen;
}
cfg.msglen = pos;
if(!cfg.msglen){
if(!cfg.quiet) fprintf(stderr, "Error: Zero length input.\n");
return 1;
}
return 0;
}
int load_file(const char *filename)
{
long pos, rlen;
FILE *fptr = NULL;
fptr = fopen(filename, "rb");
if(!fptr){
if(!cfg.quiet) fprintf(stderr, "Error: Unable to open file \"%s\".\n", filename);
return 1;
}
cfg.pub_mode = MSGMODE_FILE;
fseek(fptr, 0, SEEK_END);
cfg.msglen = ftell(fptr);
if(cfg.msglen > 268435455){
fclose(fptr);
if(!cfg.quiet) fprintf(stderr, "Error: File \"%s\" is too large (>268,435,455 bytes).\n", filename);
return 1;
}else if(cfg.msglen == 0){
fclose(fptr);
if(!cfg.quiet) fprintf(stderr, "Error: File \"%s\" is empty.\n", filename);
return 1;
}else if(cfg.msglen < 0){
fclose(fptr);
if(!cfg.quiet) fprintf(stderr, "Error: Unable to determine size of file \"%s\".\n", filename);
return 1;
}
fseek(fptr, 0, SEEK_SET);
cfg.message = malloc(cfg.msglen);
if(!cfg.message){
fclose(fptr);
if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
pos = 0;
while(pos < cfg.msglen){
rlen = fread(&(cfg.message[pos]), sizeof(char), cfg.msglen-pos, fptr);
pos += rlen;
}
fclose(fptr);
return 0;
}
int pub_shared_init(void)
{
buf = malloc(buf_len);
if(!buf){
fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
return 0;
}
int pub_shared_loop(struct mosquitto *mosq)
{
int read_len;
int pos;
int rc, rc2;
char *buf2;
int buf_len_actual;
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
mosquitto_loop_start(mosq);
}
do{
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
if(status == STATUS_CONNACK_RECVD){
pos = 0;
read_len = buf_len;
while(fgets(&buf[pos], read_len, stdin)){
buf_len_actual = strlen(buf);
if(buf[buf_len_actual-1] == '\n'){
buf[buf_len_actual-1] = '\0';
rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, buf, cfg.qos, cfg.retain);
if(rc2){
if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
}
break;
}else{
buf_len += 1024;
pos += 1023;
read_len = 1024;
buf2 = realloc(buf, buf_len);
if(!buf2){
fprintf(stderr, "Error: Out of memory.\n");
return MOSQ_ERR_NOMEM;
}
buf = buf2;
}
}
if(feof(stdin)){
if(last_mid == -1){
/* Empty file */
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
status = STATUS_DISCONNECTING;
}else{
last_mid = mid_sent;
status = STATUS_WAITING;
}
}
}else if(status == STATUS_WAITING){
if(last_mid_sent == last_mid && disconnect_sent == false){
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);
disconnect_sent = true;
}
#ifdef WIN32
Sleep(100);
#else
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000;
nanosleep(&ts, NULL);
#endif
}
rc = MOSQ_ERR_SUCCESS;
}else{
rc = mosquitto_loop(mosq, -1, 1);
}
}while(rc == MOSQ_ERR_SUCCESS && connected);
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
mosquitto_loop_stop(mosq, false);
}
return 0;
}
void pub_shared_cleanup(void)
{
free(buf);
}

41
client/pub_shared.h Normal file
View File

@ -0,0 +1,41 @@
/*
Copyright (c) 2009-2018 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#ifndef PUB_SHARED_H
#define PUB_SHARED_H
#define STATUS_CONNECTING 0
#define STATUS_CONNACK_RECVD 1
#define STATUS_WAITING 2
#define STATUS_DISCONNECTING 3
extern int mid_sent;
extern int status;
void my_connect_callback(struct mosquitto *mosq, void *obj, int result);
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc);
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid);
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str);
int load_stdin(void);
int load_file(const char *filename);
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain);
int pub_shared_init(void);
int pub_shared_loop(struct mosquitto *mosq);
void pub_shared_cleanup(void);
#endif