diff --git a/.gitignore b/.gitignore index d6ffc24c..69de08c4 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ cpp/*.test build/ client/mosquitto_pub +client/mosquitto_rr client/mosquitto_sub cov-int/ @@ -36,6 +37,7 @@ man/mosquitto.conf.5 man/libmosquitto.3 man/mosquitto_passwd.1 man/mosquitto_pub.1 +man/mosquitto_rr.1 man/mosquitto_sub.1 man/mqtt.7 diff --git a/ChangeLog.txt b/ChangeLog.txt index 1f97696a..c4becdf8 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -30,6 +30,8 @@ Client library features: - Add support for OCSP stapling to bridges. Client features: +- Add mosquitto_rr client, which can be used for "request-response" messaging, + by sending a request message and awaiting a response. - Add -E to mosquitto_sub, which causes it to exit immediately after having its subscriptions acknowledged. Use with -c to create a durable client session without requiring a message to be received. diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index c36b06e8..925bc7c1 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -11,15 +11,19 @@ endif (WITH_SRV) add_executable(mosquitto_pub pub_client.c pub_shared.c ${shared_src}) add_executable(mosquitto_sub sub_client.c sub_client_output.c ${shared_src}) +add_executable(mosquitto_pub rr_client.c pub_shared.c ${shared_src}) target_link_libraries(mosquitto_pub libmosquitto) target_link_libraries(mosquitto_sub libmosquitto) +target_link_libraries(mosquitto_rr libmosquitto) if (QNX) target_link_libraries(mosquitto_pub socket) target_link_libraries(mosquitto_sub socket) + target_link_libraries(mosquitto_rr socket) endif() install(TARGETS mosquitto_pub RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}") install(TARGETS mosquitto_sub RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}") +install(TARGETS mosquitto_rr RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}") diff --git a/client/Makefile b/client/Makefile index 014c821d..71948272 100644 --- a/client/Makefile +++ b/client/Makefile @@ -1,23 +1,23 @@ include ../config.mk -.PHONY: all install uninstall reallyclean clean static static_pub static_sub +.PHONY: all install uninstall reallyclean clean static static_pub static_sub static_rr ifeq ($(WITH_SHARED_LIBRARIES),yes) SHARED_DEP:=../lib/libmosquitto.so.${SOVERSION} endif ifeq ($(WITH_SHARED_LIBRARIES),yes) -ALL_DEPS:= mosquitto_pub mosquitto_sub +ALL_DEPS:= mosquitto_pub mosquitto_sub mosquitto_rr else ifeq ($(WITH_STATIC_LIBRARIES),yes) -ALL_DEPS:= static_pub static_sub +ALL_DEPS:= static_pub static_sub static_rr endif endif all : ${ALL_DEPS} -static : static_pub static_sub - # This makes mosquitto_pub/sub versions that are statically linked with +static : static_pub static_sub static_rr + # This makes mosquitto_pub/sub/rr versions that are statically linked with # libmosquitto only. static_pub : pub_client.o pub_shared.o client_props.o client_shared.o ../lib/libmosquitto.a @@ -26,12 +26,18 @@ static_pub : pub_client.o pub_shared.o client_props.o client_shared.o ../lib/lib static_sub : sub_client.o sub_client_output.o client_props.o client_shared.o ../lib/libmosquitto.a ${CROSS_COMPILE}${CC} $^ -o mosquitto_sub ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS} +static_rr : rr_client.o client_props.o client_shared.o pub_shared.o ../lib/libmosquitto.a + ${CROSS_COMPILE}${CC} $^ -o mosquitto_rr ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS} + mosquitto_pub : pub_client.o pub_shared.o client_shared.o client_props.o ${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS} mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o ${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS} +mosquitto_rr : rr_client.o client_shared.o client_props.o pub_shared.o sub_client_output.o + ${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS} + pub_client.o : pub_client.c ${SHARED_DEP} ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} @@ -44,6 +50,9 @@ sub_client.o : sub_client.c ${SHARED_DEP} sub_client_output.o : sub_client_output.c ${SHARED_DEP} ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} +rr_client.o : rr_client.c ${SHARED_DEP} + ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} + client_shared.o : client_shared.c client_shared.h ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} @@ -60,12 +69,14 @@ install : all $(INSTALL) -d "${DESTDIR}$(prefix)/bin" $(INSTALL) ${STRIP_OPTS} mosquitto_pub "${DESTDIR}${prefix}/bin/mosquitto_pub" $(INSTALL) ${STRIP_OPTS} mosquitto_sub "${DESTDIR}${prefix}/bin/mosquitto_sub" + $(INSTALL) ${STRIP_OPTS} mosquitto_rr "${DESTDIR}${prefix}/bin/mosquitto_rr" uninstall : -rm -f "${DESTDIR}${prefix}/bin/mosquitto_pub" -rm -f "${DESTDIR}${prefix}/bin/mosquitto_sub" + -rm -f "${DESTDIR}${prefix}/bin/mosquitto_rr" reallyclean : clean clean : - -rm -f *.o mosquitto_pub mosquitto_sub *.gcda *.gcno + -rm -f *.o mosquitto_pub mosquitto_sub mosquitto_rr *.gcda *.gcno diff --git a/client/client_shared.c b/client/client_shared.c index f179f5a5..f903497c 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -121,7 +121,7 @@ static int check_format(const char *str) } -void init_config(struct mosq_config *cfg) +void init_config(struct mosq_config *cfg, int pub_or_sub) { memset(cfg, 0, sizeof(*cfg)); cfg->port = -1; @@ -129,7 +129,12 @@ void init_config(struct mosq_config *cfg) cfg->keepalive = 60; cfg->clean_session = true; cfg->eol = true; - cfg->protocol_version = MQTT_PROTOCOL_V311; + if(pub_or_sub == CLIENT_RR){ + cfg->protocol_version = MQTT_PROTOCOL_V5; + cfg->msg_count = 1; + }else{ + cfg->protocol_version = MQTT_PROTOCOL_V311; + } } void client_config_cleanup(struct mosq_config *cfg) @@ -147,6 +152,7 @@ void client_config_cleanup(struct mosq_config *cfg) free(cfg->will_topic); free(cfg->will_payload); free(cfg->format); + free(cfg->response_topic); #ifdef WITH_TLS free(cfg->cafile); free(cfg->capath); @@ -210,7 +216,7 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * #endif args[0] = NULL; - init_config(cfg); + init_config(cfg, pub_or_sub); /* Default config file */ #ifndef WIN32 @@ -224,8 +230,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * } if(pub_or_sub == CLIENT_PUB){ snprintf(loc, len, "%s/mosquitto_pub", env); - }else{ + }else if(pub_or_sub == CLIENT_SUB){ snprintf(loc, len, "%s/mosquitto_sub", env); + }else{ + snprintf(loc, len, "%s/mosquitto_rr", env); } loc[len-1] = '\0'; }else{ @@ -239,8 +247,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * } if(pub_or_sub == CLIENT_PUB){ snprintf(loc, len, "%s/.config/mosquitto_pub", env); - }else{ + }else if(pub_or_sub == CLIENT_SUB){ snprintf(loc, len, "%s/.config/mosquitto_sub", env); + }else{ + snprintf(loc, len, "%s/.config/mosquitto_rr", env); } loc[len-1] = '\0'; }else{ @@ -259,8 +269,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * } if(pub_or_sub == CLIENT_PUB){ snprintf(loc, len, "%s\\mosquitto_pub.conf", env); - }else{ + }else if(pub_or_sub == CLIENT_SUB){ snprintf(loc, len, "%s\\mosquitto_sub.conf", env); + }else{ + snprintf(loc, len, "%s\\mosquitto_rr.conf", env); } loc[len-1] = '\0'; }else{ @@ -394,19 +406,25 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * return MOSQ_ERR_SUCCESS; } -int cfg_add_topic(struct mosq_config *cfg, int pub_or_sub, char *topic, const char *arg) +int cfg_add_topic(struct mosq_config *cfg, int type, char *topic, const char *arg) { if(mosquitto_validate_utf8(topic, strlen(topic))){ fprintf(stderr, "Error: Malformed UTF-8 in %s argument.\n\n", arg); return 1; } - if(pub_or_sub == CLIENT_PUB){ + if(type == CLIENT_PUB || type == CLIENT_RR){ if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){ fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", topic); return 1; } cfg->topic = strdup(topic); - } else { + }else if(type == CLIENT_RESPONSE_TOPIC){ + if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid response topic '%s', does it contain '+' or '#'?\n", topic); + return 1; + } + cfg->response_topic = strdup(topic); + }else{ if(mosquitto_sub_topic_check(topic) == MOSQ_ERR_INVAL){ fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", topic); return 1; @@ -471,7 +489,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c i++; #endif }else if(!strcmp(argv[i], "-C")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; }else{ if(i==argc-1){ @@ -496,8 +514,21 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c return 1; } cfg->protocol_version = MQTT_PROTOCOL_V5; + }else if(!strcmp(argv[i], "-e")){ + if(pub_or_sub != CLIENT_RR){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: -e argument given but no response topic specified.\n\n"); + return 1; + }else{ + if(cfg_add_topic(cfg, CLIENT_RESPONSE_TOPIC, argv[i+1], "-e")){ + return 1; + } + } + i++; }else if(!strcmp(argv[i], "-E")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; } cfg->exit_after_sub = true; @@ -652,7 +683,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; }else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){ - if(pub_or_sub == CLIENT_SUB){ + if(pub_or_sub != CLIENT_PUB){ goto unknown_option; } if(cfg->pub_mode != MSGMODE_NONE){ @@ -765,7 +796,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c }else if(!strcmp(argv[i], "--quiet")){ cfg->quiet = true; }else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){ - if(pub_or_sub == CLIENT_SUB){ + if(pub_or_sub != CLIENT_PUB){ goto unknown_option; } cfg->retain = 1; @@ -774,8 +805,9 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c goto unknown_option; } cfg->no_retain = true; + cfg->sub_opts |= MQTT_SUB_OPT_SEND_RETAIN_NEVER; }else if(!strcmp(argv[i], "--remove-retained")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; } cfg->remove_retained = true; @@ -785,7 +817,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } cfg->sub_opts |= MQTT_SUB_OPT_RETAIN_AS_PUBLISHED; }else if(!strcmp(argv[i], "--retained-only")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; } cfg->retained_only = true; @@ -813,7 +845,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c i++; } }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; } if(i==argc-1){ @@ -864,7 +896,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c i++; #endif }else if(!strcmp(argv[i], "-U") || !strcmp(argv[i], "--unsubscribe")){ - if(pub_or_sub == CLIENT_PUB){ + if(pub_or_sub != CLIENT_SUB){ goto unknown_option; } if(i==argc-1){ diff --git a/client/client_shared.h b/client/client_shared.h index f92a1730..7356acbe 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -29,6 +29,8 @@ Contributors: #define CLIENT_PUB 1 #define CLIENT_SUB 2 +#define CLIENT_RR 3 +#define CLIENT_RESPONSE_TOPIC 4 struct mosq_config { char *id; @@ -39,11 +41,11 @@ struct mosq_config { int port; int qos; bool retain; - int pub_mode; /* pub */ - char *file_input; /* pub */ - char *message; /* pub */ - long msglen; /* pub */ - char *topic; /* pub */ + int pub_mode; /* pub, rr */ + char *file_input; /* pub, rr */ + char *message; /* pub, rr */ + long msglen; /* pub, rr */ + char *topic; /* pub, rr */ char *bind_address; #ifdef WITH_SRV bool use_srv; @@ -104,6 +106,7 @@ struct mosq_config { mosquitto_property *disconnect_props; mosquitto_property *will_props; bool have_topic_alias; /* pub */ + char *response_topic; /* rr */ }; int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]); diff --git a/client/pub_client.c b/client/pub_client.c index a4f2245e..b7709744 100644 --- a/client/pub_client.c +++ b/client/pub_client.c @@ -29,6 +29,7 @@ Contributors: #define snprintf sprintf_s #endif +#include #include #include "client_shared.h" #include "pub_shared.h" @@ -36,6 +37,23 @@ Contributors: /* Global variables for use in callbacks. See sub_client.c for an example of * using a struct to hold variables for use in callbacks. */ static bool first_publish = true; +static int last_mid = -1; +static int last_mid_sent = -1; +static char *line_buf = NULL; +static int line_buf_len = 1024; +static bool connected = true; +static bool disconnect_sent = false; + + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); + UNUSED(properties); + + connected = false; +} int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain) { @@ -107,6 +125,124 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag } +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties) +{ + UNUSED(obj); + UNUSED(properties); + + last_mid_sent = mid; + if(reason_code > 127){ + if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code)); + } + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ + if(mid == last_mid){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + disconnect_sent = true; + } + }else if(disconnect_sent == false){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + disconnect_sent = true; + } +} + + +int pub_shared_init(void) +{ + line_buf = malloc(line_buf_len); + if(!line_buf){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + return 0; +} + + +int pub_shared_loop(struct mosquitto *mosq) +{ + int read_len; + int pos; + int rc, rc2; + char *buf2; + int buf_len_actual; + int mode; + + mode = cfg.pub_mode; + + if(mode == MSGMODE_STDIN_LINE){ + mosquitto_loop_start(mosq); + } + + do{ + if(mode == MSGMODE_STDIN_LINE){ + if(status == STATUS_CONNACK_RECVD){ + pos = 0; + read_len = line_buf_len; + while(connected && fgets(&line_buf[pos], read_len, stdin)){ + buf_len_actual = strlen(line_buf); + if(line_buf[buf_len_actual-1] == '\n'){ + line_buf[buf_len_actual-1] = '\0'; + rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, line_buf, cfg.qos, cfg.retain); + if(rc2){ + if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); + mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props); + } + break; + }else{ + line_buf_len += 1024; + pos += 1023; + read_len = 1024; + buf2 = realloc(line_buf, line_buf_len); + if(!buf2){ + fprintf(stderr, "Error: Out of memory.\n"); + return MOSQ_ERR_NOMEM; + } + line_buf = buf2; + } + } + if(feof(stdin)){ + if(mid_sent == -1){ + /* Empty file */ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + disconnect_sent = true; + status = STATUS_DISCONNECTING; + }else{ + last_mid = mid_sent; + status = STATUS_WAITING; + } + } + }else if(status == STATUS_WAITING){ + if(last_mid_sent == last_mid && disconnect_sent == false){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + disconnect_sent = true; + } +#ifdef WIN32 + Sleep(100); +#else + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 100000000; + nanosleep(&ts, NULL); +#endif + } + rc = MOSQ_ERR_SUCCESS; + }else{ + rc = mosquitto_loop(mosq, -1, 1); + } + }while(rc == MOSQ_ERR_SUCCESS && connected); + + if(mode == MSGMODE_STDIN_LINE){ + mosquitto_loop_stop(mosq, false); + } + return 0; +} + + +void pub_shared_cleanup(void) +{ + free(line_buf); +} + + void print_usage(void) { int major, minor, revision; diff --git a/client/pub_shared.c b/client/pub_shared.c index b000ed79..ead86a34 100644 --- a/client/pub_shared.c +++ b/client/pub_shared.c @@ -40,43 +40,6 @@ int mid_sent = 0; int status = STATUS_CONNECTING; struct mosq_config cfg; -static int last_mid = -1; -static int last_mid_sent = -1; -static bool connected = true; -static bool disconnect_sent = false; -static char *buf = NULL; -static int buf_len = 1024; - -void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); - UNUSED(properties); - - connected = false; -} - -void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties) -{ - UNUSED(obj); - UNUSED(properties); - - last_mid_sent = mid; - if(reason_code > 127){ - if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code)); - } - if(cfg.pub_mode == MSGMODE_STDIN_LINE){ - if(mid == last_mid){ - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); - disconnect_sent = true; - } - }else if(disconnect_sent == false){ - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); - disconnect_sent = true; - } -} - void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) { UNUSED(mosq); @@ -161,98 +124,3 @@ int load_file(const char *filename) } -int pub_shared_init(void) -{ - buf = malloc(buf_len); - if(!buf){ - fprintf(stderr, "Error: Out of memory.\n"); - return 1; - } - return 0; -} - - -int pub_shared_loop(struct mosquitto *mosq) -{ - int read_len; - int pos; - int rc, rc2; - char *buf2; - int buf_len_actual; - int mode; - - mode = cfg.pub_mode; - - if(mode == MSGMODE_STDIN_LINE){ - mosquitto_loop_start(mosq); - } - - do{ - if(mode == MSGMODE_STDIN_LINE){ - if(status == STATUS_CONNACK_RECVD){ - pos = 0; - read_len = buf_len; - while(connected && fgets(&buf[pos], read_len, stdin)){ - buf_len_actual = strlen(buf); - if(buf[buf_len_actual-1] == '\n'){ - buf[buf_len_actual-1] = '\0'; - rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, buf, cfg.qos, cfg.retain); - if(rc2){ - if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); - mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props); - } - break; - }else{ - buf_len += 1024; - pos += 1023; - read_len = 1024; - buf2 = realloc(buf, buf_len); - if(!buf2){ - fprintf(stderr, "Error: Out of memory.\n"); - return MOSQ_ERR_NOMEM; - } - buf = buf2; - } - } - if(feof(stdin)){ - if(mid_sent == -1){ - /* Empty file */ - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); - disconnect_sent = true; - status = STATUS_DISCONNECTING; - }else{ - last_mid = mid_sent; - status = STATUS_WAITING; - } - } - }else if(status == STATUS_WAITING){ - if(last_mid_sent == last_mid && disconnect_sent == false){ - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); - disconnect_sent = true; - } -#ifdef WIN32 - Sleep(100); -#else - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 100000000; - nanosleep(&ts, NULL); -#endif - } - rc = MOSQ_ERR_SUCCESS; - }else{ - rc = mosquitto_loop(mosq, -1, 1); - } - }while(rc == MOSQ_ERR_SUCCESS && connected); - - if(mode == MSGMODE_STDIN_LINE){ - mosquitto_loop_stop(mosq, false); - } - return 0; -} - - -void pub_shared_cleanup(void) -{ - free(buf); -} diff --git a/client/rr_client.c b/client/rr_client.c new file mode 100644 index 00000000..5ef95946 --- /dev/null +++ b/client/rr_client.c @@ -0,0 +1,396 @@ +/* +Copyright (c) 2009-2018 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#ifndef WIN32 +#include +#include +#else +#include +#include +#define snprintf sprintf_s +#endif + +#include +#include +#include "client_shared.h" +#include "pub_shared.h" + +enum rr__state { + rr_s_new, + rr_s_connected, + rr_s_subscribed, + rr_s_ready_to_publish, + rr_s_wait_for_response, + rr_s_disconnect +}; + +static enum rr__state client_state = rr_s_new; + +struct mosq_config cfg; +bool process_messages = true; +int msg_count = 0; +struct mosquitto *mosq = NULL; + +#ifndef WIN32 +void my_signal_handler(int signum) +{ + if(signum == SIGALRM){ + process_messages = false; + mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props); + } +} +#endif + +void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); + + +int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain) +{ + return mosquitto_publish_v5(mosq, mid, topic, payloadlen, payload, qos, retain, cfg.publish_props); +} + + +void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties) +{ + print_message(&cfg, message); + switch(cfg.pub_mode){ + case MSGMODE_CMD: + case MSGMODE_FILE: + case MSGMODE_STDIN_FILE: + case MSGMODE_NULL: + client_state = rr_s_disconnect; + break; + case MSGMODE_STDIN_LINE: + client_state = rr_s_ready_to_publish; + break; + } + /* FIXME - check all below + if(process_messages == false) return; + + if(cfg.retained_only && !message->retain && process_messages){ + process_messages = false; + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + return; + } + + if(message->retain && cfg.no_retain) return; + if(cfg.filter_outs){ + for(i=0; itopic, &res); + if(res) return; + } + } + + //print_message(&cfg, message); + + if(cfg.msg_count>0){ + msg_count++; + if(cfg.msg_count == msg_count){ + process_messages = false; + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } + } + */ +} + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties) +{ + if(!result){ + client_state = rr_s_connected; + mosquitto_subscribe_v5(mosq, NULL, cfg.response_topic, cfg.qos, 0, cfg.subscribe_props); + }else{ + client_state = rr_s_disconnect; + if(result && !cfg.quiet){ + fprintf(stderr, "%s\n", mosquitto_connack_string(result)); + } + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } +} + + +void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) +{ + int rc = MOSQ_ERR_SUCCESS; + + if(granted_qos[0] < 128){ + client_state = rr_s_ready_to_publish; + + if(rc){ + if(!cfg.quiet){ + switch(rc){ + case MOSQ_ERR_INVAL: + fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n"); + break; + case MOSQ_ERR_NOMEM: + fprintf(stderr, "Error: Out of memory when trying to publish message.\n"); + break; + case MOSQ_ERR_NO_CONN: + fprintf(stderr, "Error: Client not connected when trying to publish.\n"); + break; + case MOSQ_ERR_PROTOCOL: + fprintf(stderr, "Error: Protocol error when communicating with broker.\n"); + break; + case MOSQ_ERR_PAYLOAD_SIZE: + fprintf(stderr, "Error: Message payload is too large.\n"); + break; + } + } + client_state = rr_s_disconnect; + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } + }else{ + client_state = rr_s_disconnect; + if(!cfg.quiet){ + fprintf(stderr, "%s\n", mosquitto_reason_string(granted_qos[0])); + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } + } +} + + +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties) +{ + client_state = rr_s_wait_for_response; +} + + +void print_usage(void) +{ + int major, minor, revision; + + mosquitto_lib_version(&major, &minor, &revision); + printf("mosquitto_rr is an mqtt client that can be used to publish a request message and wait for a response.\n"); + printf(" Defaults to MQTT v5, where the Request-Response feature will be used, but v3.1.1 can also be used\n"); + printf(" with v3.1.1 brokers.\n"); + printf("mosquitto_rr version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); + printf("Usage: mosquitto_rr {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL} -e response-topic\n"); + printf(" [-c] [-k keepalive] [-q qos] [-R]\n"); + printf(" [-F format]\n"); +#ifndef WIN32 + printf(" [-W timeout_secs]\n"); +#endif +#ifdef WITH_SRV + printf(" [-A bind_address] [-S]\n"); +#else + printf(" [-A bind_address]\n"); +#endif + printf(" [-i id] [-I id_prefix]\n"); + printf(" [-d] [-N] [--quiet] [-v]\n"); + printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n"); +#ifdef WITH_TLS + printf(" [{--cafile file | --capath dir} [--cert file] [--key file]\n"); + printf(" [--ciphers ciphers] [--insecure]]\n"); +#ifdef WITH_TLS_PSK + printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n"); +#endif +#endif +#ifdef WITH_SOCKS + printf(" [--proxy socks-url]\n"); +#endif + printf(" [-D command identifier value]\n"); + printf(" mosquitto_rr --help\n\n"); + printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n"); + printf(" the client communicates over.\n"); + printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n"); + printf(" -d : enable debug messages.\n"); + printf(" -D : Define MQTT v5 properties. See the documentation for more details.\n"); + printf(" -F : output format.\n"); + printf(" -h : mqtt host to connect to. Defaults to localhost.\n"); + printf(" -i : id to use for this client. Defaults to mosquitto_rr_ appended with the process id.\n"); + printf(" -k : keep alive in seconds for this client. Defaults to 60.\n"); + printf(" -L : specify user, password, hostname, port and topic as a URL in the form:\n"); + printf(" mqtt(s)://[username[:password]@]host[:port]/topic\n"); + printf(" -N : do not add an end of line character when printing the payload.\n"); + printf(" -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.\n"); + printf(" -P : provide a password\n"); + printf(" -q : quality of service level to use for communications. Defaults to 0.\n"); + printf(" -R : do not print stale messages (those with retain set).\n"); +#ifdef WITH_SRV + printf(" -S : use SRV lookups to determine which host to connect to.\n"); +#endif + printf(" -t : mqtt response topic to subscribe to. May be repeated multiple times.\n"); + printf(" -u : provide a username\n"); + printf(" -v : print received messages verbosely.\n"); + printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); + printf(" Defaults to 5.\n"); +#ifndef WIN32 + printf(" -W : Specifies a timeout in seconds how long to wait for a response.\n"); +#endif + printf(" --help : display this message.\n"); + printf(" --quiet : don't print error messages.\n"); + printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n"); + printf(" unexpected disconnection. If not given and will-topic is set, a zero\n"); + printf(" length message will be sent.\n"); + printf(" --will-qos : QoS level for the client Will.\n"); + printf(" --will-retain : if given, make the client Will retained.\n"); + printf(" --will-topic : the topic on which to publish the client Will.\n"); +#ifdef WITH_TLS + printf(" --cafile : path to a file containing trusted CA certificates to enable encrypted\n"); + printf(" certificate based communication.\n"); + printf(" --capath : path to a directory containing trusted CA certificates to enable encrypted\n"); + printf(" communication.\n"); + printf(" --cert : client certificate for authentication, if required by server.\n"); + printf(" --key : client private key for authentication, if required by server.\n"); + printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n"); + printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n"); + printf(" Defaults to tlsv1.2 if available.\n"); + printf(" --insecure : do not check that the server certificate hostname matches the remote\n"); + printf(" hostname. Using this option means that you cannot be sure that the\n"); + printf(" remote host is the server you wish to connect to and so is insecure.\n"); + printf(" Do not use this option in a production environment.\n"); +#ifdef WITH_TLS_PSK + printf(" --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.\n"); + printf(" --psk-identity : client identity string for TLS-PSK mode.\n"); +#endif +#endif +#ifdef WITH_SOCKS + printf(" --proxy : SOCKS5 proxy URL of the form:\n"); + printf(" socks5h://[username[:password]@]hostname[:port]\n"); + printf(" Only \"none\" and \"username\" authentication is supported.\n"); +#endif + printf("\nSee https://mosquitto.org/ for more information.\n\n"); +} + +int main(int argc, char *argv[]) +{ + int rc; +#ifndef WIN32 + struct sigaction sigact; +#endif + + memset(&cfg, 0, sizeof(struct mosq_config)); + + mosquitto_lib_init(); + + rc = client_config_load(&cfg, CLIENT_RR, argc, argv); + if(rc){ + if(rc == 2){ + /* --help */ + print_usage(); + }else{ + fprintf(stderr, "\nUse 'mosquitto_rr --help' to see usage.\n"); + } + goto cleanup; + } + + if(!cfg.topic || cfg.pub_mode == MSGMODE_NONE || !cfg.response_topic){ + fprintf(stderr, "Error: All of topic, message, and response topic must be supplied.\n"); + fprintf(stderr, "\nUse 'mosquitto_rr --help' to see usage.\n"); + goto cleanup; + } + rc = mosquitto_property_add_string(&cfg.publish_props, MQTT_PROP_RESPONSE_TOPIC, cfg.response_topic); + if(rc){ + fprintf(stderr, "Error adding property RESPONSE_TOPIC.\n"); + goto cleanup; + } + rc = mosquitto_property_check_all(CMD_PUBLISH, cfg.publish_props); + if(rc){ + if(!cfg.quiet) fprintf(stderr, "Error in PUBLISH properties: Duplicate response topic.\n"); + goto cleanup; + } + + if(client_id_generate(&cfg)){ + goto cleanup; + } + + mosq = mosquitto_new(cfg.id, cfg.clean_session, &cfg); + if(!mosq){ + switch(errno){ + case ENOMEM: + if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n"); + break; + case EINVAL: + if(!cfg.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n"); + break; + } + goto cleanup; + } + if(client_opts_set(mosq, &cfg)){ + goto cleanup; + } + if(cfg.debug){ + mosquitto_log_callback_set(mosq, my_log_callback); + } + mosquitto_connect_v5_callback_set(mosq, my_connect_callback); + mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); + mosquitto_message_v5_callback_set(mosq, my_message_callback); + + rc = client_connect(mosq, &cfg); + if(rc){ + goto cleanup; + } + +#ifndef WIN32 + sigact.sa_handler = my_signal_handler; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = 0; + + if(sigaction(SIGALRM, &sigact, NULL) == -1){ + perror("sigaction"); + goto cleanup; + } + + if(cfg.timeout){ + alarm(cfg.timeout); + } +#endif + + do{ + rc = mosquitto_loop(mosq, -1, 1); + if(client_state == rr_s_ready_to_publish){ + client_state = rr_s_wait_for_response; + switch(cfg.pub_mode){ + case MSGMODE_CMD: + case MSGMODE_FILE: + case MSGMODE_STDIN_FILE: + rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain); + break; + case MSGMODE_NULL: + rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain); + break; + case MSGMODE_STDIN_LINE: + /* FIXME */ + break; + } + } + }while(rc == MOSQ_ERR_SUCCESS && client_state != rr_s_disconnect); + + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); + + if(cfg.msg_count>0 && rc == MOSQ_ERR_NO_CONN){ + rc = 0; + } + client_config_cleanup(&cfg); + if(rc){ + fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc)); + } + return rc; + +cleanup: + mosquitto_lib_cleanup(); + client_config_cleanup(&cfg); + return 1; +} + diff --git a/man/CMakeLists.txt b/man/CMakeLists.txt index f0a9cf79..309bdd27 100644 --- a/man/CMakeLists.txt +++ b/man/CMakeLists.txt @@ -1,4 +1,4 @@ -install(FILES mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 DESTINATION ${MANDIR}/man1) +install(FILES mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mosquitto_rr.1 DESTINATION ${MANDIR}/man1) install(FILES libmosquitto.3 DESTINATION ${MANDIR}/man3) install(FILES mosquitto.conf.5 DESTINATION ${MANDIR}/man5) install(FILES mosquitto-tls.7 mqtt.7 DESTINATION ${MANDIR}/man7) diff --git a/man/Makefile b/man/Makefile index 51dcccd0..f7268cca 100644 --- a/man/Makefile +++ b/man/Makefile @@ -2,22 +2,26 @@ include ../config.mk .PHONY : all clean install uninstall dist -all : mosquitto.8 mosquitto-tls.7 mosquitto.conf.5 mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mqtt.7 libmosquitto.3 +MANPAGES = \ + libmosquitto.3 \ + mosquitto-tls.7 \ + mosquitto.8 \ + mosquitto.conf.5 \ + mosquitto_passwd.1 \ + mosquitto_pub.1 \ + mosquitto_rr.1 \ + mosquitto_sub.1 \ + mqtt.7 + +all : ${MANPAGES} clean : reallyclean : clean -rm -f *.orig - -rm -f libmosquitto.3 - -rm -f mosquitto.8 - -rm -f mosquitto.conf.5 - -rm -f mosquitto_passwd.1 - -rm -f mosquitto_pub.1 - -rm -f mosquitto_sub.1 - -rm -f mosquitto-tls.7 - -rm -f mqtt.7 + -rm -f ${MANPAGES} -dist : mosquitto.8 mosquitto-tls.7 mosquitto.conf.5 mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mqtt.7 libmosquitto.3 +dist : ${MANPAGES} install : $(INSTALL) -d "${DESTDIR}$(mandir)/man8" @@ -28,6 +32,7 @@ install : $(INSTALL) -m 644 mosquitto_passwd.1 "${DESTDIR}${mandir}/man1/mosquitto_passwd.1" $(INSTALL) -m 644 mosquitto_pub.1 "${DESTDIR}${mandir}/man1/mosquitto_pub.1" $(INSTALL) -m 644 mosquitto_sub.1 "${DESTDIR}${mandir}/man1/mosquitto_sub.1" + $(INSTALL) -m 644 mosquitto_rr.1 "${DESTDIR}${mandir}/man1/mosquitto_rr.1" $(INSTALL) -d "${DESTDIR}$(mandir)/man7" $(INSTALL) -m 644 mqtt.7 "${DESTDIR}${mandir}/man7/mqtt.7" $(INSTALL) -m 644 mosquitto-tls.7 "${DESTDIR}${mandir}/man7/mosquitto-tls.7" @@ -40,6 +45,7 @@ uninstall : -rm -f "${DESTDIR}${mandir}/man1/mosquitto_passwd.1" -rm -f "${DESTDIR}${mandir}/man1/mosquitto_pub.1" -rm -f "${DESTDIR}${mandir}/man1/mosquitto_sub.1" + -rm -f "${DESTDIR}${mandir}/man1/mosquitto_rr.1" -rm -f "${DESTDIR}${mandir}/man7/mqtt.7" -rm -f "${DESTDIR}${mandir}/man7/mosquitto-tls.7" -rm -f "${DESTDIR}${mandir}/man3/libmosquitto.3" @@ -59,6 +65,9 @@ mosquitto_pub.1 : mosquitto_pub.1.xml mosquitto_sub.1 : mosquitto_sub.1.xml $(XSLTPROC) $^ +mosquitto_rr.1 : mosquitto_rr.1.xml + $(XSLTPROC) $^ + mqtt.7 : mqtt.7.xml $(XSLTPROC) $^ @@ -81,6 +90,7 @@ potgen : xml2po -o po/mosquitto_passwd/mosquitto_passwd.1.pot mosquitto_passwd.1.xml xml2po -o po/mosquitto_pub/mosquitto_pub.1.pot mosquitto_pub.1.xml xml2po -o po/mosquitto_sub/mosquitto_sub.1.pot mosquitto_sub.1.xml + xml2po -o po/mosquitto_sub/mosquitto_rr.1.pot mosquitto_rr.1.xml xml2po -o po/mqtt/mqtt.7.pot mqtt.7.xml xml2po -o po/mosquitto-tls/mosquitto-tls.7.pot mosquitto-tls.7.xml xml2po -o po/libmosquitto/libmosquitto.3.pot libmosquitto.3.xml diff --git a/man/mosquitto.8.xml b/man/mosquitto.8.xml index f771f1a1..fbae44fe 100644 --- a/man/mosquitto.8.xml +++ b/man/mosquitto.8.xml @@ -513,6 +513,12 @@ 1 + + + mosquitto_rr + 1 + + mosquitto_sub diff --git a/man/mosquitto_pub.1.xml b/man/mosquitto_pub.1.xml index b02e3214..f92a1442 100644 --- a/man/mosquitto_pub.1.xml +++ b/man/mosquitto_pub.1.xml @@ -668,6 +668,12 @@ 7 + + + mosquitto_rr + 1 + + mosquitto_sub diff --git a/man/mosquitto_rr.1.meta b/man/mosquitto_rr.1.meta new file mode 100644 index 00000000..365b7388 --- /dev/null +++ b/man/mosquitto_rr.1.meta @@ -0,0 +1,5 @@ +.. title: mosquitto_rr man page +.. slug: mosquitto_rr-1 +.. category: man +.. type: man +.. pretty_url: False diff --git a/man/mosquitto_rr.1.xml b/man/mosquitto_rr.1.xml new file mode 100644 index 00000000..c8ece437 --- /dev/null +++ b/man/mosquitto_rr.1.xml @@ -0,0 +1,813 @@ + + + + + + mosquitto_rr + 1 + Mosquitto Project + Commands + + + + mosquitto_rr + an MQTT version 5/3.1.1 client for subscribing to topics + + + + + mosquitto_rr + + + hostname + port-number + + username + password + + message-topic + + + URL + message-topic + + response-topic + + + file + message + + + + bind-address + + + command identifier value + client-id + client-id-prefix + keepalive-time + + message-QoS + + + + protocol-version + message-processing-timeout + socks-url + + + topic + payload + qos + + + + + + file + dir + + file + file + version + + + + hex-key + identity + version + + + + + mosquitto_rr + + + + + + + + Description + mosquitto_rr is an MQTT version 5/3.1.1 + client that can be used to publish a request message and wait for a + response. When using MQTT v5, which is the default, + mosquitto_rr will use the Request-Response + feature. + + + + Encrypted Connections + mosquitto_rr supports TLS encrypted + connections. It is strongly recommended that you use an encrypted + connection for anything more than the most basic setup. + To enable TLS connections when using x509 certificates, one of + either or must + be provided as an option. + To enable TLS connections when using TLS-PSK, you must use the + and the + options. + + + + Options + The options below may be given on the command line, but may also + be placed in a config file located at + or + with one pair of + + per line. The values in the config file will be used as defaults + and can be overridden by using the command line. The exceptions to + this is , which if given in the config file will + not be overridden. Note also that currently some options cannot be + negated, e.g. . Config file lines that have a + as the first character are treated as comments + and not processed any further. + + + + + Bind the outgoing connection to a local ip + address/hostname. Use this argument if you need to + restrict network communication to a particular + interface. + + + + + + + Disable the 'clean session' flag. This means that all + of the subscriptions for the client will be maintained + after it disconnects, along with subsequent QoS 1 and QoS 2 + messages that arrive. When the client reconnects, it will + receive all of the queued messages. + If using this option, the client id must be set + manually with + + + + + + Define the path to a file containing PEM encoded CA + certificates that are trusted. Used to enable SSL + communication. + See also + + + + + + Define the path to a directory containing PEM encoded CA + certificates that are trusted. Used to enable SSL + communication. + For to work correctly, the + certificate files must have ".crt" as the file ending + and you must run "openssl rehash <path to capath>" each + time you add/remove a certificate. + See also + + + + + + Define the path to a file containing a PEM encoded + certificate for this client, if required by the + server. + See also . + + + + + + An openssl compatible list of TLS ciphers to support + in the client. See + ciphers1 + for more information. + + + + + + + Enable debug messages. + + + + + + + Use an MQTT v5 property with this publish. If you use + this option, the client will be set to be an MQTT v5 + client. This option has two forms: + + + is the MQTT command/packet + identifier and can be one of CONNECT, PUBACK, PUBREC, + PUBCOMP, SUBSCRIBE, UNSUBSCRIBE, DISCONNECT, AUTH, or + WILL. The properties available for each command are + listed in the Properties section. + + is the name of the + property to add. This is as described in the + specification, but with '-' as a word separator. For + example: + . More details + are in the Properties + section. + + is the value of the property + to add, with a data type that is property + specific. + + is only used for the + property as the first of + the two strings in the string pair. In that case, + is the second of the strings in + the pair. + + + + + + + Send the contents of a file as the request message. + + + + + + Specify output printing format. This option allows + you to choose what information from each message is + printed to the screen. See the Output Format section + below for full details. + This option overrides the option, + but does not override the + option. + + + + + + Display usage information. + + + + + + + Specify the host to connect to. Defaults to localhost. + + + + + + + The id to use for this client. If not given, defaults + to mosquitto_rr_ appended with the process id of the + client. Cannot be used at the same time as the + argument. + + + + + + + Provide a prefix that the client id will be built + from by appending the process id of the client. This is + useful where the broker is using the clientid_prefixes + option. Cannot be used at the same time as the + argument. + + + + + + When using certificate based encryption, this option + disables verification of the server hostname in the + server certificate. This can be useful when testing + initial server configurations but makes it possible for + a malicious third party to impersonate your server + through DNS spoofing, for example. Use this option in + testing only. If you need to + resort to using this option in a production + environment, your setup is at fault and there is no + point using encryption. + + + + + + + The number of seconds between sending PING commands + to the broker for the purposes of informing it we are still + connected and functioning. Defaults to 60 seconds. + + + + + + Define the path to a file containing a PEM encoded + private key for this client, if required by the + server. + See also . + + + + + + + Specify specify user, password, hostname, port and + topic at once as a URL. The URL must be in the form: + mqtt(s)://[username[:password]@]host[:port]/topic + If the scheme is mqtt:// then the port defaults to + 1883. If the scheme is mqtts:// then the port defaults + to 8883. + + + + + + + Send a single request message from the command line. + + + + + + Do not append an end of line character to the payload + when printing. This allows streaming of payload data + from multiple messages directly to another application + unmodified. Only really makes sense when not using + . + + + + + + + Send a null (zero length) request message. + + + + + + + Connect to the port specified. If not given, the + default of 1883 for plain MQTT or 8883 for MQTT over + TLS will be used. + + + + + + + Provide a password to be used for authenticating with + the broker. Using this argument without also specifying + a username is invalid. See also the + option. + + + + + + Specify a SOCKS5 proxy to connect through. "None" and + "username" authentication types are supported. The + must be of the form + . + The protocol prefix means that + hostnames are resolved by the proxy. The symbols %25, + %3A and %40 are URL decoded into %, : and @ + respectively, if present in the username or + password. + If username is not given, then no authentication is + attempted. If the port is not given, then the default + of 1080 is used. + More SOCKS versions may be available in the future, + depending on demand, and will use different protocol + prefixes as described in + curl + 1 . + + + + + + Provide the hexadecimal (no leading 0x) + pre-shared-key matching the one used on the broker to + use TLS-PSK encryption support. + must also be provided + to enable TLS-PSK. + + + + + + The client identity to use with TLS-PSK support. This + may be used instead of a username if the broker is + configured to do so. + + + + + + + Specify the quality of service desired for the + incoming messages, from 0, 1 and 2. Defaults to 0. See + mqtt7 + for more information on QoS. + The QoS is identical for all topics subscribed to in + a single instance of mosquitto_rr. + + + + + + If this argument is given, no runtime errors will be + printed. This excludes any error messages given in case of + invalid user input (e.g. using without a + port). + + + + + + If this argument is given, messages that are received + that have the retain bit set will not be printed. + Messages with retain set are "stale", in that it is not + known when they were originally published. When + subscribing to a wildcard topic there may be a large + number of retained messages. This argument suppresses + their display. + + + + + + Use SRV lookups to determine which host to connect + to. Performs lookups to + when used in + conjunction with , otherwise uses + . + + + + + + + Send a request message read from stdin, sending the entire content as a single message. + + + + + + + The MQTT topic to subscribe to, where responses will + be waited for. See + mqtt7 + for more information on MQTT topics. + This option may be repeated to subscribe to multiple topics. + + + + + + Choose which TLS protocol version to use when + communicating with the broker. Valid options are + , and + . The default value is + . If the installed version of + openssl is too old, only will be + available. Must match the protocol version used by the + broker. + + + + + + + Provide a username to be used for authenticating with + the broker. See also the + argument. + + + + + + + Print received messages verbosely. With this + argument, messages will be printed as "topic payload". When + this argument is not given, the messages are printed as + "payload". + + + + + + + Specify which version of the MQTT protocol should be + used when connecting to the rmeote broker. Can be + , or + . + Defaults to . + + + + + + Provide a timeout as an integer number of seconds. + mosquitto_rr will stop processing messages and + disconnect after this number of seconds has + passed. The timeout starts just after the client has + connected to the broker. + + + + + + Specify a message that will be stored by the broker + and sent out if this client disconnects unexpectedly. This + must be used in conjunction with . + + + + + + The QoS to use for the Will. Defaults to 0. This must + be used in conjunction with . + + + + + + If given, if the client disconnects unexpectedly the + message sent out will be treated as a retained message. + This must be used in conjunction with . + + + + + + The topic on which to send a Will, in the event that + the client disconnects unexpectedly. + + + + + + + Output format + There are three ways of formatting the output from mosquitto_rr. + In all cases a new-line character is appended for each message + received unless the argument is passed to + mosquitto_rr. + Payload-only is the default output format and will + print the payload exactly as it is received. + Verbose mode is activated with and prints the + message topic and the payload, separated by a space. + The final option is formatted output, which allows the user to + define a custom output format. The behaviour is controlled with + the option. The format string is + a free text string where interpreted sequences are replaced by + different parameters. The available interpreted sequences are + described below. + Three characters are used to start an interpreted sequence: + , and . + Sequences starting with are either parameters + related to the MQTT message being printed, or are helper sequences + to avoid the need to type long date format strings for example. + Sequences starting with are passed to the + strftime3 + function (with the @ replaced with a % - note that only the + character immediately after the @ is passed to strftime). This + allows the construction of a wide variety of time based outputs. + The output options for strftime vary from platform to platform, so + please check what is available for your platform. mosquitto_rr + does provide one extension to strftime which is + , which can be used to obtain the number of + nanoseconds passed in the current second. The resolution of this + option varies depending on the platform. The final sequence + character is , which is used to input some + characters that would otherwise be difficult to enter. + + + MQTT related parameters + + a literal %. + the length of the payload in bytes. + the message id (only relevant for messages with QoS>0). + the payload raw bytes (may produce non-printable characters depending on the payload). + the message QoS. + the retained flag for the message. + the message topic. + the payload with each byte as a hexadecimal number (lower case). + the payload with each byte as a hexadecimal number (upper case). + + + + + Helpers + + ISO-8601 format date and time, e.g. 2016-08-10T09:47:38+0100 + JSON output of message + parameters and timestamp, with a quoted and escaped + payload. For example + {"tst":1470825369,"topic":"greeting","qos":0,"retain":0,"payload":"hello + world"} + JSON output of message + parameters and timestamp, with a non-quoted and + non-escaped payload - this means the payload must + itself be valid JSON. For example: + {"tst":1470825369,"topic":"foo","qos":0,"retain":0,"payload":{"temperature":27.0,"humidity":57}}. + ISO-8601 format date and time, e.g. 2016-08-10T09:47:38+0100 + Unix timestamp with nanoseconds, e.g. 1470818943.786368637 + + + + + Time related parameters + + a literal @. + pass the character represented + by to the strftime function as + . The options supported are platform + dependent. + the number of nanoseconds that + have passed in the current second, with varying timing + resolution depending on platform. + + + + + Escape characters + + a literal \. + a null character. Can be used + to separate different parameters that may contain spaces + (e.g. topic, payload) so that processing with tools such as + xargs1 + is easier. + alert/bell. + the escape sequence, which can + be used with ANSI colour codes to provide coloured output + for example. + end of line. + carriage return. + horizontal tab. + vertical tab. + + + + + + Wills + mosquitto_rr can register a message with the broker that will be + sent out if it disconnects unexpectedly. See + mqtt7 + for more information. + The minimum requirement for this is to use to + specify which topic the will should be sent out on. This will result in + a non-retained, zero length message with QoS 0. + Use the , and arguments to + modify the other will parameters. + + + + Properties + The / option + allows adding properties to different stages of the mosquitto_rr + run. The properties supported for each command are as + follows: + + + Connect + + (binary data - note treated as a string in mosquitto_rr) + (UTF-8 string pair) + (32-bit unsigned integer) + (16-bit unsigned integer) + (8-bit unsigned integer) + (8-bit unsigned integer) + (32-bit unsigned integer) + (16-bit unsigned integer) + (UTF-8 string pair) + + + + + Publish + + (UTF-8 string) + (binary data - note treated as a string in mosquitto_rr) + (32-bit unsigned integer) + (8-bit unsigned integer) + (UTF-8 string) + (16-bit unsigned integer) + (UTF-8 string pair) + + + + + Subscribe + + (UTF-8 string pair) + + + + + Unsubscribe + + (UTF-8 string pair) + + + + + Disconnect + + (32-bit unsigned integer) + (UTF-8 string pair) + + + + + Will properties + + (UTF-8 string) + (binary data - note treated as a string in mosquitto_pub) + (32-bit unsigned integer) + (8-bit unsigned integer) + (UTF-8 string) + (UTF-8 string pair) + (32-bit unsigned integer) + + + + + + Files + + + $XDG_CONFIG_HOME/mosquitto_rr + $HOME/.config/mosquitto_rr + + Configuration file for default options. + + + + + + + Bugs + mosquitto bug information can be found at + + + + + See Also + + + + mqtt + 7 + + + + + mosquitto_pub + 1 + + + + + mosquitto_sub + 1 + + + + + mosquitto + 8 + + + + + libmosquitto + 3 + + + + + mosquitto-tls + 7 + + + + + + + Author + Roger Light roger@atchoo.org + + diff --git a/man/mosquitto_sub.1.xml b/man/mosquitto_sub.1.xml index 8992a867..f934ce48 100644 --- a/man/mosquitto_sub.1.xml +++ b/man/mosquitto_sub.1.xml @@ -832,7 +832,7 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained Connect - (binary data - note treated as a string in mosquitto_pub) + (binary data - note treated as a string in mosquitto_sub) (UTF-8 string pair) (32-bit unsigned integer) (16-bit unsigned integer) @@ -870,7 +870,7 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained Will properties (UTF-8 string) - (binary data - note treated as a string in mosquitto_pub) + (binary data - note treated as a string in mosquitto_sub) (32-bit unsigned integer) (8-bit unsigned integer) (UTF-8 string) @@ -951,6 +951,12 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained 1 + + + mosquitto_rr + 1 + + mosquitto