#include #include #include #ifndef WIN32 # include #else # include # define snprintf sprintf_s #endif #include #include #define db_host "localhost" #define db_username "mqtt_log" #define db_password "password" #define db_database "mqtt_log" #define db_port 3306 #define db_query "INSERT INTO mqtt_log (topic, payload) VALUES (?,?)" #define mqtt_host "localhost" #define mqtt_port 1883 static int run = 1; static MYSQL_STMT *stmt = NULL; void handle_signal(int s) { run = 0; } void connect_callback(struct mosquitto *mosq, void *obj, int result) { } void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { MYSQL_BIND bind[2]; memset(bind, 0, sizeof(bind)); bind[0].buffer_type = MYSQL_TYPE_STRING; bind[0].buffer = message->topic; bind[0].buffer_length = strlen(message->topic); // Note: payload is normally a binary blob and could contains // NULL byte. This sample does not handle it and assume payload is a // string. bind[1].buffer_type = MYSQL_TYPE_STRING; bind[1].buffer = message->payload; bind[1].buffer_length = message->payloadlen; mysql_stmt_bind_param(stmt, bind); mysql_stmt_execute(stmt); } int main(int argc, char *argv[]) { MYSQL *connection; my_bool reconnect = true; char clientid[24]; struct mosquitto *mosq; int rc = 0; signal(SIGINT, handle_signal); signal(SIGTERM, handle_signal); mysql_library_init(0, NULL, NULL); mosquitto_lib_init(); connection = mysql_init(NULL); if(connection){ mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect); connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0); if(connection){ stmt = mysql_stmt_init(connection); mysql_stmt_prepare(stmt, db_query, strlen(db_query)); memset(clientid, 0, 24); snprintf(clientid, 23, "mysql_log_%d", getpid()); mosq = mosquitto_new(clientid, true, connection); if(mosq){ mosquitto_connect_callback_set(mosq, connect_callback); mosquitto_message_callback_set(mosq, message_callback); rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60); mosquitto_subscribe(mosq, NULL, "#", 0); while(run){ rc = mosquitto_loop(mosq, -1, 1); if(run && rc){ sleep(20); mosquitto_reconnect(mosq); } } mosquitto_destroy(mosq); } mysql_stmt_close(stmt); mysql_close(connection); }else{ fprintf(stderr, "Error: Unable to connect to database.\n"); printf("%s\n", mysql_error(connection)); rc = 1; } }else{ fprintf(stderr, "Error: Unable to start mysql.\n"); rc = 1; } mysql_library_end(); mosquitto_lib_cleanup(); return rc; }