defmodule Systant.MqttHandler do @moduledoc """ Custom MQTT handler for processing command messages """ @behaviour Tortoise.Handler require Logger def init(args) do Logger.info("Initializing MQTT handler") # Get the client_id from the passed arguments client_id = Keyword.get(args, :client_id) Logger.info("Handler initialized with client_id: #{client_id}") state = %{client_id: client_id} {:ok, state} end def connection(status, state) do case status do :up -> Logger.info("MQTT connection established successfully") :down -> Logger.error("MQTT connection lost - check MQTT broker availability and configuration") :terminating -> Logger.info("MQTT connection terminating") {:error, reason} -> Logger.error("MQTT connection failed: #{inspect(reason)}") other -> Logger.error("MQTT connection status unknown: #{inspect(other)}") end {:ok, state} end def subscription(status, topic_filter, state) do case status do :up -> Logger.info("Subscribed to #{topic_filter}") :down -> Logger.warning("Subscription to #{topic_filter} lost") end {:ok, state} end def handle_message(topic, payload, state) do # Topic can come as a list or string, normalize it topic_str = case topic do topic when is_binary(topic) -> topic topic when is_list(topic) -> Enum.join(topic, "/") _ -> to_string(topic) end Logger.info("Received MQTT message on topic: #{topic_str}") # Only process command topics if String.contains?(topic_str, "/commands") do process_command_message(topic_str, payload, state) else Logger.debug("Ignoring non-command message on topic: #{topic_str}") end {:ok, state} end def terminate(reason, _state) do Logger.info("MQTT handler terminating: #{inspect(reason)}") :ok end # Private functions defp process_command_message(topic, payload, state) do try do # Parse the JSON command case Jason.decode(payload) do {:ok, command_data} -> Logger.info("Processing command: #{inspect(command_data)}") execute_and_respond(command_data, topic, state) {:error, reason} -> Logger.error("Failed to parse command JSON: #{inspect(reason)}") send_error_response(topic, "Invalid JSON format", nil, state) end rescue error -> Logger.error("Error processing command: #{inspect(error)}") send_error_response(topic, "Command processing failed", nil, state) end end defp execute_and_respond(command_data, topic, state) do # Load current configuration config = Systant.Config.load_config() # Use client_id from handler state client_id = state.client_id # Handle special "list" command to show available commands if command_data["command"] == "list" do available_commands = Systant.CommandExecutor.list_available_commands(config) response = %{ request_id: command_data["request_id"] || generate_request_id(), command: "list", status: "success", output: "Available commands: #{Enum.map(available_commands, &(&1.trigger)) |> Enum.join(", ")}", data: available_commands, execution_time: 0.0, timestamp: DateTime.utc_now() |> DateTime.to_iso8601() } response_topic = String.replace(topic, "/commands", "/responses") response_payload = Jason.encode!(response) Tortoise.publish_sync(client_id, response_topic, response_payload, qos: 0) else case Systant.CommandExecutor.execute_command(command_data, config) do {:ok, response} -> # Send response to the response topic response_topic = String.replace(topic, "/commands", "/responses") response_payload = Jason.encode!(response) case Tortoise.publish_sync(client_id, response_topic, response_payload, qos: 0) do :ok -> Logger.info("Command response sent successfully") {:error, reason} -> Logger.error("Failed to send command response: #{inspect(reason)}") end {:error, reason} -> send_error_response(topic, reason, command_data["request_id"], state) end end end defp send_error_response(topic, error_message, request_id, state) do client_id = state.client_id response_topic = String.replace(topic, "/commands", "/responses") error_response = %{ request_id: request_id || "unknown", command: "unknown", status: "error", output: "", error: error_message, execution_time: 0.0, timestamp: DateTime.utc_now() |> DateTime.to_iso8601() } response_payload = Jason.encode!(error_response) case Tortoise.publish_sync(client_id, response_topic, response_payload, qos: 0) do :ok -> Logger.info("Error response sent successfully") {:error, reason} -> Logger.error("Failed to send error response: #{inspect(reason)}") end end defp generate_request_id do :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) end end