mosquitto/client/sub_client_output.c

835 lines
18 KiB
C
Raw Normal View History

/*
Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
2020-11-25 17:34:21 +00:00
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
2021-10-05 14:20:37 +00:00
The Eclipse Public License is available at
2020-11-25 17:34:21 +00:00
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
2021-10-05 14:20:37 +00:00
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
2020-12-01 18:21:59 +00:00
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
2020-09-30 08:44:36 +00:00
#ifdef WIN32
/* For rand_s on Windows */
# define _CRT_RAND_S
# include <fcntl.h>
# include <io.h>
2020-09-30 08:44:36 +00:00
#endif
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#ifndef WIN32
#include <unistd.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#endif
#ifdef WITH_CJSON
2020-12-04 22:15:19 +00:00
# include <cjson/cJSON.h>
#endif
#ifdef __APPLE__
# include <sys/time.h>
#endif
#include <mosquitto.h>
#include <mqtt_protocol.h>
#include "client_shared.h"
#include "sub_client_output.h"
extern struct mosq_config cfg;
static int get_time(struct tm **ti, long *ns)
{
#ifdef WIN32
SYSTEMTIME st;
#elif defined(__APPLE__)
struct timeval tv;
#else
struct timespec ts;
#endif
time_t s;
#ifdef WIN32
s = time(NULL);
GetLocalTime(&st);
*ns = st.wMilliseconds*1000000L;
#elif defined(__APPLE__)
gettimeofday(&tv, NULL);
s = tv.tv_sec;
*ns = tv.tv_usec*1000;
#else
if(clock_gettime(CLOCK_REALTIME, &ts) != 0){
err_printf(&cfg, "Error obtaining system time.\n");
return 1;
}
s = ts.tv_sec;
*ns = ts.tv_nsec;
#endif
*ti = localtime(&s);
if(!(*ti)){
err_printf(&cfg, "Error obtaining system time.\n");
return 1;
}
return 0;
}
static void write_payload(const unsigned char *payload, int payloadlen, int hex, char align, char pad, int field_width, int precision)
{
int i;
int padlen;
2021-01-21 13:33:54 +00:00
UNUSED(precision); /* FIXME - use or remove */
if(field_width > 0){
if(payloadlen > field_width){
payloadlen = field_width;
}
if(hex > 0){
payloadlen /= 2;
padlen = field_width - payloadlen*2;
}else{
padlen = field_width - payloadlen;
}
}else{
padlen = field_width - payloadlen;
}
if(align != '-'){
for(i=0; i<padlen; i++){
putchar(pad);
}
}
2018-03-15 11:27:17 +00:00
if(hex == 0){
(void)fwrite(payload, 1, (size_t )payloadlen, stdout);
2018-03-15 11:27:17 +00:00
}else if(hex == 1){
for(i=0; i<payloadlen; i++){
fprintf(stdout, "%02x", payload[i]);
}
2018-03-15 11:27:17 +00:00
}else if(hex == 2){
for(i=0; i<payloadlen; i++){
fprintf(stdout, "%02X", payload[i]);
2018-03-15 11:27:17 +00:00
}
}
if(align == '-'){
printf("%*s", padlen, "");
}
}
#ifndef WITH_CJSON
static void write_json_payload(const char *payload, int payloadlen)
{
int i;
for(i=0; i<payloadlen; i++){
if(payload[i] == '"' || payload[i] == '\\' || (payload[i] >=0 && payload[i] < 32)){
printf("\\u%04x", payload[i]);
}else{
fputc(payload[i], stdout);
}
}
}
#endif
#ifdef WITH_CJSON
static int json_print_properties(cJSON *root, const mosquitto_property *properties)
{
int identifier;
uint8_t i8value = 0;
uint16_t i16value = 0;
uint32_t i32value = 0;
char *strname = NULL, *strvalue = NULL;
char *binvalue = NULL;
cJSON *tmp, *prop_json, *user_json = NULL;
const mosquitto_property *prop = NULL;
prop_json = cJSON_CreateObject();
if(prop_json == NULL){
cJSON_Delete(prop_json);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "properties", prop_json);
for(prop=properties; prop != NULL; prop = mosquitto_property_next(prop)){
tmp = NULL;
identifier = mosquitto_property_identifier(prop);
switch(identifier){
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
mosquitto_property_read_byte(prop, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, &i8value, false);
tmp = cJSON_CreateNumber(i8value);
break;
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
mosquitto_property_read_int32(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i32value, false);
tmp = cJSON_CreateNumber(i32value);
break;
case MQTT_PROP_CONTENT_TYPE:
case MQTT_PROP_RESPONSE_TOPIC:
mosquitto_property_read_string(prop, identifier, &strvalue, false);
if(strvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(strvalue);
free(strvalue);
strvalue = NULL;
break;
case MQTT_PROP_CORRELATION_DATA:
mosquitto_property_read_binary(prop, MQTT_PROP_CORRELATION_DATA, (void **)&binvalue, &i16value, false);
if(binvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(binvalue);
free(binvalue);
binvalue = NULL;
break;
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
mosquitto_property_read_varint(prop, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false);
tmp = cJSON_CreateNumber(i32value);
break;
case MQTT_PROP_TOPIC_ALIAS:
mosquitto_property_read_int16(prop, MQTT_PROP_TOPIC_ALIAS, &i16value, false);
tmp = cJSON_CreateNumber(i16value);
break;
case MQTT_PROP_USER_PROPERTY:
if(user_json == NULL){
user_json = cJSON_CreateObject();
if(user_json == NULL){
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(prop_json, "user-properties", user_json);
}
mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false);
if(strname == NULL || strvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(strvalue);
free(strvalue);
if(tmp == NULL){
free(strname);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(user_json, strname, tmp);
free(strname);
strname = NULL;
strvalue = NULL;
tmp = NULL; /* Don't add this to prop_json below */
break;
}
if(tmp != NULL){
cJSON_AddItemToObject(prop_json, mosquitto_property_identifier_to_string(identifier), tmp);
}
}
return MOSQ_ERR_SUCCESS;
}
#endif
static void format_time_8601(const struct tm *ti, int ns, char *buf, size_t len)
{
char c;
strftime(buf, len, "%Y-%m-%dT%H:%M:%S.000000%z", ti);
c = buf[strlen("2020-05-06T21:48:00.000000")];
snprintf(&buf[strlen("2020-05-06T21:48:00.")], 9, "%06d", ns/1000);
buf[strlen("2020-05-06T21:48:00.000000")] = c;
}
static int json_print(const struct mosquitto_message *message, const mosquitto_property *properties, const struct tm *ti, int ns, bool escaped, bool pretty)
{
char buf[100];
#ifdef WITH_CJSON
cJSON *root;
cJSON *tmp;
char *json_str;
const char *return_parse_end;
root = cJSON_CreateObject();
if(root == NULL){
return MOSQ_ERR_NOMEM;
}
format_time_8601(ti, ns, buf, sizeof(buf));
tmp = cJSON_CreateStringReference(buf);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "tst", tmp);
tmp = cJSON_CreateString(message->topic);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "topic", tmp);
tmp = cJSON_CreateNumber(message->qos);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "qos", tmp);
tmp = cJSON_CreateNumber(message->retain);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "retain", tmp);
tmp = cJSON_CreateNumber(message->payloadlen);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "payloadlen", tmp);
if(message->qos > 0){
tmp = cJSON_CreateNumber(message->mid);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "mid", tmp);
}
/* Properties */
if(properties){
if(json_print_properties(root, properties)){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
}
/* Payload */
if(escaped){
if(message->payload){
tmp = cJSON_CreateString(message->payload);
}else{
tmp = cJSON_CreateNull();
}
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "payload", tmp);
}else{
return_parse_end = NULL;
if(message->payload){
tmp = cJSON_ParseWithOpts(message->payload, &return_parse_end, true);
2020-12-02 20:38:21 +00:00
if(tmp == NULL || return_parse_end != (char *)message->payload + message->payloadlen){
cJSON_Delete(root);
return MOSQ_ERR_INVAL;
}
}else{
tmp = cJSON_CreateNull();
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_INVAL;
}
}
cJSON_AddItemToObject(root, "payload", tmp);
}
if(pretty){
json_str = cJSON_Print(root);
}else{
json_str = cJSON_PrintUnformatted(root);
}
cJSON_Delete(root);
2021-05-06 20:05:10 +00:00
if(json_str == NULL){
return MOSQ_ERR_NOMEM;
}
fputs(json_str, stdout);
free(json_str);
2021-10-05 14:20:37 +00:00
return MOSQ_ERR_SUCCESS;
#else
2021-01-21 13:33:54 +00:00
UNUSED(properties);
UNUSED(pretty);
format_time_8601(ti, ns, buf, sizeof(buf));
printf("{\"tst\":\"%s\",\"topic\":\"%s\",\"qos\":%d,\"retain\":%d,\"payloadlen\":%d,", buf, message->topic, message->qos, message->retain, message->payloadlen);
if(message->qos > 0){
printf("\"mid\":%d,", message->mid);
}
if(escaped){
fputs("\"payload\":\"", stdout);
write_json_payload(message->payload, message->payloadlen);
fputs("\"}", stdout);
}else{
fputs("\"payload\":", stdout);
2020-09-08 11:26:55 +00:00
write_payload(message->payload, message->payloadlen, 0, 0, 0, 0, 0);
fputs("}", stdout);
}
2021-10-05 14:20:37 +00:00
return MOSQ_ERR_SUCCESS;
#endif
}
static void formatted_print_blank(char pad, int field_width)
{
int i;
for(i=0; i<field_width; i++){
putchar(pad);
}
}
static void formatted_print_int(int value, char align, char pad, int field_width)
{
if(field_width == 0){
printf("%d", value);
}else{
if(align == '-'){
printf("%-*d", field_width, value);
}else{
if(pad == '0'){
printf("%0*d", field_width, value);
}else{
printf("%*d", field_width, value);
}
}
}
}
static void formatted_print_str(const char *value, char align, int field_width, int precision)
{
if(field_width == 0 && precision == -1){
fputs(value, stdout);
}else{
if(precision == -1){
if(align == '-'){
printf("%-*s", field_width, value);
}else{
printf("%*s", field_width, value);
}
}else if(field_width == 0){
if(align == '-'){
printf("%-.*s", precision, value);
}else{
printf("%.*s", precision, value);
}
}else{
if(align == '-'){
printf("%-*.*s", field_width, precision, value);
}else{
printf("%*.*s", field_width, precision, value);
}
}
}
}
static void formatted_print_percent(const struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties, char format, char align, char pad, int field_width, int precision)
{
struct tm *ti = NULL;
2020-12-02 10:10:21 +00:00
long ns = 0;
char buf[100];
int rc;
uint8_t i8value;
uint16_t i16value;
uint32_t i32value;
char *binvalue = NULL, *strname, *strvalue;
const mosquitto_property *prop;
switch(format){
case '%':
fputc('%', stdout);
break;
case 'A':
if(mosquitto_property_read_int16(properties, MQTT_PROP_TOPIC_ALIAS, &i16value, false)){
formatted_print_int(i16value, align, pad, field_width);
}else{
formatted_print_blank(pad, field_width);
}
break;
case 'C':
if(mosquitto_property_read_string(properties, MQTT_PROP_CONTENT_TYPE, &strvalue, false)){
formatted_print_str(strvalue, align, field_width, precision);
free(strvalue);
}else{
formatted_print_blank(' ', field_width);
}
break;
case 'D':
if(mosquitto_property_read_binary(properties, MQTT_PROP_CORRELATION_DATA, (void **)&binvalue, &i16value, false)){
fwrite(binvalue, 1, i16value, stdout);
free(binvalue);
}
break;
case 'E':
if(mosquitto_property_read_int32(properties, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i32value, false)){
2020-10-17 00:23:08 +00:00
formatted_print_int((int)i32value, align, pad, field_width);
}else{
formatted_print_blank(pad, field_width);
}
break;
case 'F':
if(mosquitto_property_read_byte(properties, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, &i8value, false)){
formatted_print_int(i8value, align, pad, field_width);
}else{
formatted_print_blank(pad, field_width);
}
break;
case 'I':
if(!ti){
if(get_time(&ti, &ns)){
err_printf(lcfg, "Error obtaining system time.\n");
return;
}
}
if(strftime(buf, 100, "%FT%T%z", ti) != 0){
formatted_print_str(buf, align, field_width, precision);
}else{
formatted_print_blank(' ', field_width);
}
break;
case 'j':
if(!ti){
if(get_time(&ti, &ns)){
err_printf(lcfg, "Error obtaining system time.\n");
return;
}
}
2020-10-17 00:23:08 +00:00
if(json_print(message, properties, ti, (int)ns, true, lcfg->pretty) != MOSQ_ERR_SUCCESS){
err_printf(lcfg, "Error: Out of memory.\n");
return;
}
break;
case 'J':
if(!ti){
if(get_time(&ti, &ns)){
err_printf(lcfg, "Error obtaining system time.\n");
return;
}
}
2020-10-17 00:23:08 +00:00
rc = json_print(message, properties, ti, (int)ns, false, lcfg->pretty);
if(rc == MOSQ_ERR_NOMEM){
err_printf(lcfg, "Error: Out of memory.\n");
return;
}else if(rc == MOSQ_ERR_INVAL){
err_printf(lcfg, "Error: Message payload is not valid JSON on topic %s.\n", message->topic);
return;
}
break;
case 'l':
formatted_print_int(message->payloadlen, align, pad, field_width);
break;
case 'm':
formatted_print_int(message->mid, align, pad, field_width);
break;
case 'P':
strname = NULL;
strvalue = NULL;
prop = mosquitto_property_read_string_pair(properties, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false);
while(prop){
printf("%s:%s", strname, strvalue);
free(strname);
free(strvalue);
strname = NULL;
strvalue = NULL;
prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, true);
if(prop){
fputc(' ', stdout);
}
}
free(strname);
free(strvalue);
break;
case 'p':
write_payload(message->payload, message->payloadlen, 0, align, pad, field_width, precision);
break;
case 'q':
fputc(message->qos + 48, stdout);
break;
case 'R':
if(mosquitto_property_read_string(properties, MQTT_PROP_RESPONSE_TOPIC, &strvalue, false)){
formatted_print_str(strvalue, align, field_width, precision);
free(strvalue);
}
break;
case 'r':
if(message->retain){
fputc('1', stdout);
}else{
fputc('0', stdout);
}
break;
case 'S':
if(mosquitto_property_read_varint(properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false)){
2020-10-17 00:23:08 +00:00
formatted_print_int((int)i32value, align, pad, field_width);
}else{
formatted_print_blank(pad, field_width);
}
break;
case 't':
formatted_print_str(message->topic, align, field_width, precision);
break;
case 'U':
if(!ti){
if(get_time(&ti, &ns)){
err_printf(lcfg, "Error obtaining system time.\n");
return;
}
}
if(strftime(buf, 100, "%s", ti) != 0){
printf("%s.%09ld", buf, ns);
}
break;
case 'x':
write_payload(message->payload, message->payloadlen, 1, align, pad, field_width, precision);
break;
case 'X':
write_payload(message->payload, message->payloadlen, 2, align, pad, field_width, precision);
break;
}
}
2018-03-15 11:27:17 +00:00
static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties)
{
2020-10-17 00:23:08 +00:00
size_t len;
2021-01-21 13:33:54 +00:00
size_t i;
struct tm *ti = NULL;
long ns = 0;
char strf[3] = {0, 0 ,0};
char buf[100];
char align, pad;
int field_width, precision;
len = strlen(lcfg->format);
for(i=0; i<len; i++){
if(lcfg->format[i] == '%'){
align = 0;
pad = ' ';
field_width = 0;
precision = -1;
if(i < len-1){
i++;
/* Optional alignment */
if(lcfg->format[i] == '-'){
align = lcfg->format[i];
if(i < len-1){
i++;
}
}
/* "%-040p" is allowed by this combination of checks, but isn't
* a valid format specifier, the '0' will be ignored. */
/* Optional zero padding */
if(lcfg->format[i] == '0'){
pad = '0';
if(i < len-1){
i++;
}
}
/* Optional field width */
while(i < len-1 && lcfg->format[i] >= '0' && lcfg->format[i] <= '9'){
field_width *= 10;
field_width += lcfg->format[i]-'0';
i++;
}
/* Optional precision */
if(lcfg->format[i] == '.'){
if(i < len-1){
i++;
precision = 0;
while(i < len-1 && lcfg->format[i] >= '0' && lcfg->format[i] <= '9'){
precision *= 10;
precision += lcfg->format[i]-'0';
i++;
}
}
}
if(i < len){
formatted_print_percent(lcfg, message, properties, lcfg->format[i], align, pad, field_width, precision);
}
}
}else if(lcfg->format[i] == '@'){
if(i < len-1){
i++;
if(lcfg->format[i] == '@'){
fputc('@', stdout);
}else{
if(!ti){
if(get_time(&ti, &ns)){
err_printf(lcfg, "Error obtaining system time.\n");
return;
}
}
strf[0] = '%';
strf[1] = lcfg->format[i];
strf[2] = 0;
if(lcfg->format[i] == 'N'){
printf("%09ld", ns);
}else{
if(strftime(buf, 100, strf, ti) != 0){
fputs(buf, stdout);
}
}
}
}
}else if(lcfg->format[i] == '\\'){
if(i < len-1){
i++;
switch(lcfg->format[i]){
case '\\':
fputc('\\', stdout);
break;
case '0':
fputc('\0', stdout);
break;
case 'a':
fputc('\a', stdout);
break;
case 'e':
fputc('\033', stdout);
break;
case 'n':
fputc('\n', stdout);
break;
case 'r':
fputc('\r', stdout);
break;
case 't':
fputc('\t', stdout);
break;
case 'v':
fputc('\v', stdout);
break;
}
}
}else{
fputc(lcfg->format[i], stdout);
}
}
if(lcfg->eol){
fputc('\n', stdout);
}
fflush(stdout);
}
void output_init(void)
{
2020-09-30 08:44:36 +00:00
#ifndef WIN32
struct tm *ti = NULL;
long ns;
if(!get_time(&ti, &ns)){
2020-10-17 00:23:08 +00:00
srandom((unsigned int)ns);
}
#else
/* Disable text translation so binary payloads aren't modified */
_setmode(_fileno(stdout), _O_BINARY);
2020-09-30 08:44:36 +00:00
#endif
}
void print_message(struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties)
{
2020-09-30 08:44:36 +00:00
#ifdef WIN32
unsigned int r = 0;
#else
long r = 0;
#endif
if(lcfg->random_filter < 10000){
2020-09-30 08:44:36 +00:00
#ifdef WIN32
rand_s(&r);
#else
r = random();
2020-09-30 08:44:36 +00:00
#endif
if((long)(r%10000) >= lcfg->random_filter){
return;
}
}
if(lcfg->format){
formatted_print(lcfg, message, properties);
}else if(lcfg->verbose){
if(message->payloadlen){
printf("%s ", message->topic);
write_payload(message->payload, message->payloadlen, false, 0, 0, 0, 0);
if(lcfg->eol){
printf("\n");
}
}else{
if(lcfg->eol){
printf("%s (null)\n", message->topic);
}
}
fflush(stdout);
}else{
if(message->payloadlen){
write_payload(message->payload, message->payloadlen, false, 0, 0, 0, 0);
if(lcfg->eol){
printf("\n");
}
fflush(stdout);
}
}
}