/* Copyright (c) 2009-2020 Roger Light All rights reserved. This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 and Eclipse Distribution License v1.0 which accompany this distribution. The Eclipse Public License is available at https://www.eclipse.org/legal/epl-2.0/ and the Eclipse Distribution License is available at http://www.eclipse.org/org/documents/edl-v10.php. SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause Contributors: Roger Light - initial implementation and documentation. */ #include "config.h" #ifdef WIN32 /* For rand_s on Windows */ # define _CRT_RAND_S #endif #include #include #include #include #include #include #ifndef WIN32 #include #else #include #include #define snprintf sprintf_s #endif #ifdef WITH_CJSON # include #endif #ifdef __APPLE__ # include #endif #include #include #include "client_shared.h" #include "sub_client_output.h" extern struct mosq_config cfg; static int get_time(struct tm **ti, long *ns) { #ifdef WIN32 SYSTEMTIME st; #elif defined(__APPLE__) struct timeval tv; #else struct timespec ts; #endif time_t s; #ifdef WIN32 s = time(NULL); GetLocalTime(&st); *ns = st.wMilliseconds*1000000L; #elif defined(__APPLE__) gettimeofday(&tv, NULL); s = tv.tv_sec; *ns = tv.tv_usec*1000; #else if(clock_gettime(CLOCK_REALTIME, &ts) != 0){ err_printf(&cfg, "Error obtaining system time.\n"); return 1; } s = ts.tv_sec; *ns = ts.tv_nsec; #endif *ti = localtime(&s); if(!(*ti)){ err_printf(&cfg, "Error obtaining system time.\n"); return 1; } return 0; } static void write_payload(const unsigned char *payload, int payloadlen, int hex, char align, char pad, int field_width, int precision) { int i; int padlen; if(field_width > 0){ if(payloadlen > field_width){ payloadlen = field_width; } if(hex > 0){ payloadlen /= 2; padlen = field_width - payloadlen*2; }else{ padlen = field_width - payloadlen; } }else{ padlen = field_width - payloadlen; } if(align != '-'){ for(i=0; i=0 && payload[i] < 32)){ printf("\\u%04x", payload[i]); }else{ fputc(payload[i], stdout); } } } #endif #ifdef WITH_CJSON static int json_print_properties(cJSON *root, const mosquitto_property *properties) { int identifier; uint8_t i8value = 0; uint16_t i16value = 0; uint32_t i32value = 0; char *strname = NULL, *strvalue = NULL; char *binvalue = NULL; cJSON *tmp, *prop_json, *user_json = NULL; const mosquitto_property *prop = NULL; prop_json = cJSON_CreateObject(); if(prop_json == NULL){ cJSON_Delete(prop_json); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "properties", prop_json); for(prop=properties; prop != NULL; prop = mosquitto_property_next(prop)){ tmp = NULL; identifier = mosquitto_property_identifier(prop); switch(identifier){ case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: mosquitto_property_read_byte(prop, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, &i8value, false); tmp = cJSON_CreateNumber(i8value); break; case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: mosquitto_property_read_int32(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i32value, false); tmp = cJSON_CreateNumber(i32value); break; case MQTT_PROP_CONTENT_TYPE: case MQTT_PROP_RESPONSE_TOPIC: mosquitto_property_read_string(prop, identifier, &strvalue, false); if(strvalue == NULL) return MOSQ_ERR_NOMEM; tmp = cJSON_CreateString(strvalue); free(strvalue); strvalue = NULL; break; case MQTT_PROP_CORRELATION_DATA: mosquitto_property_read_binary(prop, MQTT_PROP_CORRELATION_DATA, (void **)&binvalue, &i16value, false); if(binvalue == NULL) return MOSQ_ERR_NOMEM; tmp = cJSON_CreateString(binvalue); free(binvalue); binvalue = NULL; break; case MQTT_PROP_SUBSCRIPTION_IDENTIFIER: mosquitto_property_read_varint(prop, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false); tmp = cJSON_CreateNumber(i32value); break; case MQTT_PROP_TOPIC_ALIAS: mosquitto_property_read_int16(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i16value, false); tmp = cJSON_CreateNumber(i16value); break; case MQTT_PROP_USER_PROPERTY: if(user_json == NULL){ user_json = cJSON_CreateObject(); if(user_json == NULL){ return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(prop_json, "user-properties", user_json); } mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false); if(strname == NULL || strvalue == NULL) return MOSQ_ERR_NOMEM; tmp = cJSON_CreateString(strvalue); free(strvalue); if(tmp == NULL){ free(strname); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(user_json, strname, tmp); free(strname); strname = NULL; strvalue = NULL; tmp = NULL; /* Don't add this to prop_json below */ break; } if(tmp != NULL){ cJSON_AddItemToObject(prop_json, mosquitto_property_identifier_to_string(identifier), tmp); } } return MOSQ_ERR_SUCCESS; } #endif static int json_print(const struct mosquitto_message *message, const mosquitto_property *properties, const struct tm *ti, int ns, bool escaped, bool pretty) { char buf[100]; #ifdef WITH_CJSON cJSON *root; cJSON *tmp; char *json_str; const char *return_parse_end; root = cJSON_CreateObject(); if(root == NULL){ return MOSQ_ERR_NOMEM; } strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.000000Z%z", ti); snprintf(&buf[strlen("2020-05-06T21:48:00.")], 9, "%06d", ns/1000); buf[strlen("2020-05-06T21:48:00.000000")] = 'Z'; tmp = cJSON_CreateStringReference(buf); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "tst", tmp); tmp = cJSON_CreateString(message->topic); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "topic", tmp); tmp = cJSON_CreateNumber(message->qos); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "qos", tmp); tmp = cJSON_CreateNumber(message->retain); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "retain", tmp); tmp = cJSON_CreateNumber(message->payloadlen); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "payloadlen", tmp); if(message->qos > 0){ tmp = cJSON_CreateNumber(message->mid); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "mid", tmp); } /* Properties */ if(properties){ if(json_print_properties(root, properties)){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } } /* Payload */ if(escaped){ if(message->payload){ tmp = cJSON_CreateString(message->payload); }else{ tmp = cJSON_CreateNull(); } if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_NOMEM; } cJSON_AddItemToObject(root, "payload", tmp); }else{ return_parse_end = NULL; if(message->payload){ tmp = cJSON_ParseWithOpts(message->payload, &return_parse_end, true); if(tmp == NULL || return_parse_end != (char *)message->payload + message->payloadlen){ cJSON_Delete(root); return MOSQ_ERR_INVAL; } }else{ tmp = cJSON_CreateNull(); if(tmp == NULL){ cJSON_Delete(root); return MOSQ_ERR_INVAL; } } cJSON_AddItemToObject(root, "payload", tmp); } if(pretty){ json_str = cJSON_Print(root); }else{ json_str = cJSON_PrintUnformatted(root); } cJSON_Delete(root); fputs(json_str, stdout); free(json_str); return MOSQ_ERR_SUCCESS; #else strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.000000Z%z", ti); snprintf(&buf[strlen("2020-05-06T21:48:00.")], 9, "%06d", ns/1000); buf[strlen("2020-05-06T21:48:00.000000")] = 'Z'; printf("{\"tst\":\"%s\",\"topic\":\"%s\",\"qos\":%d,\"retain\":%d,\"payloadlen\":%d,", buf, message->topic, message->qos, message->retain, message->payloadlen); if(message->qos > 0){ printf("\"mid\":%d,", message->mid); } if(escaped){ fputs("\"payload\":\"", stdout); write_json_payload(message->payload, message->payloadlen); fputs("\"}", stdout); }else{ fputs("\"payload\":", stdout); write_payload(message->payload, message->payloadlen, 0, 0, 0, 0, 0); fputs("}", stdout); } return MOSQ_ERR_SUCCESS; #endif } static void formatted_print_blank(char pad, int field_width) { int i; for(i=0; ipretty) != MOSQ_ERR_SUCCESS){ err_printf(lcfg, "Error: Out of memory.\n"); return; } break; case 'J': if(!ti){ if(get_time(&ti, &ns)){ err_printf(lcfg, "Error obtaining system time.\n"); return; } } rc = json_print(message, properties, ti, (int)ns, false, lcfg->pretty); if(rc == MOSQ_ERR_NOMEM){ err_printf(lcfg, "Error: Out of memory.\n"); return; }else if(rc == MOSQ_ERR_INVAL){ err_printf(lcfg, "Error: Message payload is not valid JSON on topic %s.\n", message->topic); return; } break; case 'l': formatted_print_int(message->payloadlen, align, pad, field_width); break; case 'm': formatted_print_int(message->mid, align, pad, field_width); break; case 'P': strname = NULL; strvalue = NULL; prop = mosquitto_property_read_string_pair(properties, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false); while(prop){ printf("%s:%s", strname, strvalue); free(strname); free(strvalue); strname = NULL; strvalue = NULL; prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, true); if(prop){ fputc(' ', stdout); } } free(strname); free(strvalue); break; case 'p': write_payload(message->payload, message->payloadlen, 0, align, pad, field_width, precision); break; case 'q': fputc(message->qos + 48, stdout); break; case 'R': if(mosquitto_property_read_string(properties, MQTT_PROP_RESPONSE_TOPIC, &strvalue, false)){ formatted_print_str(strvalue, align, field_width, precision); free(strvalue); } break; case 'r': if(message->retain){ fputc('1', stdout); }else{ fputc('0', stdout); } break; case 'S': if(mosquitto_property_read_varint(properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false)){ formatted_print_int((int)i32value, align, pad, field_width); }else{ formatted_print_blank(pad, field_width); } break; case 't': formatted_print_str(message->topic, align, field_width, precision); break; case 'U': if(!ti){ if(get_time(&ti, &ns)){ err_printf(lcfg, "Error obtaining system time.\n"); return; } } if(strftime(buf, 100, "%s", ti) != 0){ printf("%s.%09ld", buf, ns); } break; case 'x': write_payload(message->payload, message->payloadlen, 1, align, pad, field_width, precision); break; case 'X': write_payload(message->payload, message->payloadlen, 2, align, pad, field_width, precision); break; } } static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties) { size_t len; int i; struct tm *ti = NULL; long ns = 0; char strf[3] = {0, 0 ,0}; char buf[100]; char align, pad; int field_width, precision; len = strlen(lcfg->format); for(i=0; iformat[i] == '%'){ align = 0; pad = ' '; field_width = 0; precision = -1; if(i < len-1){ i++; /* Optional alignment */ if(lcfg->format[i] == '-'){ align = lcfg->format[i]; if(i < len-1){ i++; } } /* "%-040p" is allowed by this combination of checks, but isn't * a valid format specifier, the '0' will be ignored. */ /* Optional zero padding */ if(lcfg->format[i] == '0'){ pad = '0'; if(i < len-1){ i++; } } /* Optional field width */ while(i < len-1 && lcfg->format[i] >= '0' && lcfg->format[i] <= '9'){ field_width *= 10; field_width += lcfg->format[i]-'0'; i++; } /* Optional precision */ if(lcfg->format[i] == '.'){ if(i < len-1){ i++; precision = 0; while(i < len-1 && lcfg->format[i] >= '0' && lcfg->format[i] <= '9'){ precision *= 10; precision += lcfg->format[i]-'0'; i++; } } } if(i < len){ formatted_print_percent(lcfg, message, properties, lcfg->format[i], align, pad, field_width, precision); } } }else if(lcfg->format[i] == '@'){ if(i < len-1){ i++; if(lcfg->format[i] == '@'){ fputc('@', stdout); }else{ if(!ti){ if(get_time(&ti, &ns)){ err_printf(lcfg, "Error obtaining system time.\n"); return; } } strf[0] = '%'; strf[1] = lcfg->format[i]; strf[2] = 0; if(lcfg->format[i] == 'N'){ printf("%09ld", ns); }else{ if(strftime(buf, 100, strf, ti) != 0){ fputs(buf, stdout); } } } } }else if(lcfg->format[i] == '\\'){ if(i < len-1){ i++; switch(lcfg->format[i]){ case '\\': fputc('\\', stdout); break; case '0': fputc('\0', stdout); break; case 'a': fputc('\a', stdout); break; case 'e': fputc('\033', stdout); break; case 'n': fputc('\n', stdout); break; case 'r': fputc('\r', stdout); break; case 't': fputc('\t', stdout); break; case 'v': fputc('\v', stdout); break; } } }else{ fputc(lcfg->format[i], stdout); } } if(lcfg->eol){ fputc('\n', stdout); } fflush(stdout); } void rand_init(void) { #ifndef WIN32 struct tm *ti = NULL; long ns; if(!get_time(&ti, &ns)){ srandom((unsigned int)ns); } #endif } void print_message(struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties) { #ifdef WIN32 unsigned int r = 0; #else long r = 0; #endif if(lcfg->random_filter < 10000){ #ifdef WIN32 rand_s(&r); #else r = random(); #endif if((long)(r%10000) >= lcfg->random_filter){ return; } } if(lcfg->format){ formatted_print(lcfg, message, properties); }else if(lcfg->verbose){ if(message->payloadlen){ printf("%s ", message->topic); write_payload(message->payload, message->payloadlen, false, 0, 0, 0, 0); if(lcfg->eol){ printf("\n"); } }else{ if(lcfg->eol){ printf("%s (null)\n", message->topic); } } fflush(stdout); }else{ if(message->payloadlen){ write_payload(message->payload, message->payloadlen, false, 0, 0, 0, 0); if(lcfg->eol){ printf("\n"); } fflush(stdout); } } }