#include "mqtt_manager.h" #include "esp_log.h" #include #include static const char *TAG = "MQTT_MANAGER"; // MQTT manager state static bool is_initialized = false; static mqtt_status_t current_status = MQTT_STATUS_DISCONNECTED; static esp_mqtt_client_handle_t mqtt_client = NULL; static mqtt_event_callback_t user_callback = NULL; // MQTT event handler static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { esp_mqtt_event_handle_t event = event_data; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_BEFORE_CONNECT: ESP_LOGI(TAG, "MQTT connecting to broker..."); current_status = MQTT_STATUS_CONNECTING; if (user_callback) { user_callback(current_status, NULL, NULL, 0); } break; case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT connected to broker"); current_status = MQTT_STATUS_CONNECTED; if (user_callback) { user_callback(current_status, NULL, NULL, 0); } break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT disconnected from broker"); current_status = MQTT_STATUS_DISCONNECTED; if (user_callback) { user_callback(current_status, NULL, NULL, 0); } break; case MQTT_EVENT_PUBLISHED: ESP_LOGD(TAG, "MQTT message published, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGD(TAG, "MQTT data received - Topic: %.*s, Data: %.*s", event->topic_len, event->topic, event->data_len, event->data); if (user_callback) { // Null-terminate the topic and data for callback char topic_str[256]; char data_str[512]; int topic_len = (event->topic_len < sizeof(topic_str) - 1) ? event->topic_len : sizeof(topic_str) - 1; int data_len = (event->data_len < sizeof(data_str) - 1) ? event->data_len : sizeof(data_str) - 1; memcpy(topic_str, event->topic, topic_len); topic_str[topic_len] = '\0'; memcpy(data_str, event->data, data_len); data_str[data_len] = '\0'; user_callback(current_status, topic_str, data_str, data_len); } break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT error occurred, error_handle->error_type: %d", event->error_handle->error_type); current_status = MQTT_STATUS_ERROR; if (user_callback) { user_callback(current_status, NULL, NULL, 0); } break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGD(TAG, "MQTT subscribed, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGD(TAG, "MQTT unsubscribed, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DELETED: ESP_LOGD(TAG, "MQTT client deleted"); break; case MQTT_USER_EVENT: case MQTT_EVENT_ANY: default: ESP_LOGD(TAG, "MQTT event: %d", event_id); break; } } esp_err_t mqtt_manager_init(const mqtt_config_t *config, mqtt_event_callback_t event_callback) { if (is_initialized) { ESP_LOGW(TAG, "MQTT manager already initialized"); return ESP_OK; } if (config == NULL || config->broker_url == NULL || config->client_id == NULL) { ESP_LOGE(TAG, "Invalid MQTT configuration"); return ESP_ERR_INVALID_ARG; } ESP_LOGI(TAG, "Initializing MQTT manager"); ESP_LOGI(TAG, "MQTT Broker: %s:%d", config->broker_url, config->broker_port); ESP_LOGI(TAG, "MQTT Client ID: %s", config->client_id); user_callback = event_callback; current_status = MQTT_STATUS_DISCONNECTED; // Configure MQTT client esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = config->broker_url, .credentials.client_id = config->client_id, .network.timeout_ms = config->network_timeout_ms > 0 ? config->network_timeout_ms : 10000, .network.refresh_connection_after_ms = 20000, .session.keepalive = config->keepalive > 0 ? config->keepalive : 60, }; // Set port if specified if (config->broker_port > 0) { mqtt_cfg.broker.address.port = config->broker_port; } // Set credentials if provided if (config->username && strlen(config->username) > 0) { mqtt_cfg.credentials.username = config->username; } if (config->password && strlen(config->password) > 0) { mqtt_cfg.credentials.authentication.password = config->password; } if (config->ca_cert_pem && strlen(config->ca_cert_pem) > 0) { mqtt_cfg.broker.verification.certificate = config->ca_cert_pem; } // Initialize MQTT client mqtt_client = esp_mqtt_client_init(&mqtt_cfg); if (mqtt_client == NULL) { ESP_LOGE(TAG, "Failed to initialize MQTT client"); return ESP_ERR_NO_MEM; } // Register event handler esp_err_t ret = esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to register MQTT event handler: %s", esp_err_to_name(ret)); esp_mqtt_client_destroy(mqtt_client); mqtt_client = NULL; return ret; } is_initialized = true; ESP_LOGI(TAG, "MQTT manager initialized successfully"); return ESP_OK; } esp_err_t mqtt_manager_start(void) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return ESP_ERR_INVALID_STATE; } ESP_LOGI(TAG, "Starting MQTT client"); esp_err_t ret = esp_mqtt_client_start(mqtt_client); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to start MQTT client: %s", esp_err_to_name(ret)); current_status = MQTT_STATUS_ERROR; return ret; } return ESP_OK; } esp_err_t mqtt_manager_stop(void) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return ESP_ERR_INVALID_STATE; } ESP_LOGI(TAG, "Stopping MQTT client"); current_status = MQTT_STATUS_DISCONNECTED; esp_err_t ret = esp_mqtt_client_stop(mqtt_client); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to stop MQTT client: %s", esp_err_to_name(ret)); return ret; } return ESP_OK; } mqtt_status_t mqtt_manager_get_status(void) { return current_status; } int mqtt_manager_publish(const char *topic, const char *data, int len, int qos, int retain) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return -1; } if (current_status != MQTT_STATUS_CONNECTED) { ESP_LOGW(TAG, "MQTT not connected, cannot publish"); return -1; } if (topic == NULL || data == NULL) { ESP_LOGE(TAG, "Topic or data is NULL"); return -1; } int data_len = (len == 0) ? strlen(data) : len; int msg_id = esp_mqtt_client_publish(mqtt_client, topic, data, data_len, qos, retain); if (msg_id >= 0) { ESP_LOGD(TAG, "Published to topic '%s', msg_id=%d", topic, msg_id); } else { ESP_LOGE(TAG, "Failed to publish to topic '%s'", topic); } return msg_id; } int mqtt_manager_publish_string(const char *topic, const char *message, int qos, int retain) { return mqtt_manager_publish(topic, message, 0, qos, retain); } int mqtt_manager_publish_sensor_data(const char *topic, float temperature, float humidity, const char *device_id, int qos, int retain) { if (topic == NULL || device_id == NULL) { ESP_LOGE(TAG, "Topic or device_id is NULL"); return -1; } char json_data[256]; int len = snprintf(json_data, sizeof(json_data), "{\"temperature\":%.2f,\"humidity\":%.2f,\"device\":\"%s\"}", temperature, humidity, device_id); if (len >= sizeof(json_data)) { ESP_LOGW(TAG, "JSON data truncated"); } ESP_LOGI(TAG, "Publishing sensor data - Temp: %.2f°C, Humidity: %.2f%%", temperature, humidity); return mqtt_manager_publish(topic, json_data, 0, qos, retain); } int mqtt_manager_subscribe(const char *topic, int qos) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return -1; } if (current_status != MQTT_STATUS_CONNECTED) { ESP_LOGW(TAG, "MQTT not connected, cannot subscribe"); return -1; } if (topic == NULL) { ESP_LOGE(TAG, "Topic is NULL"); return -1; } int msg_id = esp_mqtt_client_subscribe(mqtt_client, topic, qos); if (msg_id >= 0) { ESP_LOGI(TAG, "Subscribed to topic '%s', msg_id=%d", topic, msg_id); } else { ESP_LOGE(TAG, "Failed to subscribe to topic '%s'", topic); } return msg_id; } int mqtt_manager_unsubscribe(const char *topic) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return -1; } if (current_status != MQTT_STATUS_CONNECTED) { ESP_LOGW(TAG, "MQTT not connected, cannot unsubscribe"); return -1; } if (topic == NULL) { ESP_LOGE(TAG, "Topic is NULL"); return -1; } int msg_id = esp_mqtt_client_unsubscribe(mqtt_client, topic); if (msg_id >= 0) { ESP_LOGI(TAG, "Unsubscribed from topic '%s', msg_id=%d", topic, msg_id); } else { ESP_LOGE(TAG, "Failed to unsubscribe from topic '%s'", topic); } return msg_id; } bool mqtt_manager_is_connected(void) { return current_status == MQTT_STATUS_CONNECTED; } esp_err_t mqtt_manager_reconnect(void) { if (!is_initialized || mqtt_client == NULL) { ESP_LOGE(TAG, "MQTT manager not initialized"); return ESP_ERR_INVALID_STATE; } ESP_LOGI(TAG, "Forcing MQTT reconnection"); // Stop and restart the client esp_err_t ret = mqtt_manager_stop(); if (ret != ESP_OK) { return ret; } // Small delay before restarting vTaskDelay(pdMS_TO_TICKS(1000)); return mqtt_manager_start(); } esp_err_t mqtt_manager_deinit(void) { if (!is_initialized) { ESP_LOGW(TAG, "MQTT manager not initialized"); return ESP_OK; } ESP_LOGI(TAG, "Deinitializing MQTT manager"); // Stop client if running if (mqtt_client != NULL) { esp_mqtt_client_stop(mqtt_client); esp_mqtt_client_destroy(mqtt_client); mqtt_client = NULL; } current_status = MQTT_STATUS_DISCONNECTED; user_callback = NULL; is_initialized = false; ESP_LOGI(TAG, "MQTT manager deinitialized"); return ESP_OK; }