diff --git a/server/lib/systant/command_executor.ex b/server/lib/systant/command_executor.ex new file mode 100644 index 0000000..75ac770 --- /dev/null +++ b/server/lib/systant/command_executor.ex @@ -0,0 +1,199 @@ +defmodule Systant.CommandExecutor do + @moduledoc """ + Secure command execution system for Systant. + + Executes only predefined commands from the configuration with strict validation, + parameter checking, timeouts, and comprehensive logging. + """ + + require Logger + + @doc """ + Execute a command based on MQTT command message + """ + def execute_command(command_data, config) do + with {:ok, parsed_command} <- parse_command(command_data), + {:ok, command_config} <- find_command_config(parsed_command.trigger, config), + {:ok, validated_params} <- validate_parameters(parsed_command.params, command_config), + {:ok, final_command} <- build_command(command_config, validated_params) do + + Logger.info("Executing command: #{command_config["name"]} with params: #{inspect(validated_params)}") + + execute_system_command(final_command, command_config, parsed_command) + else + {:error, reason} -> + Logger.warning("Command execution failed: #{reason}") + {:error, reason} + end + end + + @doc """ + List all available commands from configuration + """ + def list_available_commands(config) do + commands_config = Systant.Config.get(config, ["commands"]) || %{} + + if commands_config["enabled"] do + available = commands_config["available"] || [] + + Enum.map(available, fn cmd -> + %{ + name: cmd["name"], + description: cmd["description"], + trigger: cmd["trigger"], + allowed_params: cmd["allowed_params"] || [], + requires_confirmation: cmd["requires_confirmation"] || false, + timeout: cmd["timeout"] || 10 + } + end) + else + [] + end + end + + # Private functions + + defp parse_command(command_data) do + case command_data do + %{"command" => trigger} = data when is_binary(trigger) -> + {:ok, %{ + trigger: trigger, + params: data["params"] || [], + request_id: data["request_id"] || generate_request_id(), + timestamp: data["timestamp"] + }} + + _ -> + {:error, "Invalid command format. Expected: {\"command\": \"trigger\", \"params\": [...]}"} + end + end + + defp find_command_config(trigger, config) do + commands_config = Systant.Config.get(config, ["commands"]) || %{} + + unless commands_config["enabled"] do + {:error, "Command execution is disabled in configuration"} + else + + available = commands_config["available"] || [] + + case Enum.find(available, fn cmd -> cmd["trigger"] == trigger end) do + nil -> {:error, "Command '#{trigger}' not found in configuration"} + command_config -> {:ok, command_config} + end + end + end + + defp validate_parameters(params, command_config) when is_list(params) do + allowed_params = command_config["allowed_params"] || [] + + # If no parameters are allowed, params must be empty + if Enum.empty?(allowed_params) and not Enum.empty?(params) do + {:error, "Command '#{command_config["trigger"]}' does not accept parameters"} + else + # Validate each parameter against allowed list + invalid_params = Enum.reject(params, fn param -> + Enum.member?(allowed_params, param) + end) + + if Enum.empty?(invalid_params) do + {:ok, params} + else + {:error, "Invalid parameters: #{inspect(invalid_params)}. Allowed: #{inspect(allowed_params)}"} + end + end + end + + defp validate_parameters(_, _), do: {:error, "Parameters must be a list"} + + defp build_command(command_config, params) do + base_command = command_config["command"] + + if is_list(base_command) do + final_command = substitute_parameters(base_command, params) + {:ok, final_command} + else + {:error, "Command configuration must be a list"} + end + end + + defp substitute_parameters(command_parts, params) do + param_map = build_param_map(params) + + Enum.map(command_parts, fn part -> + case part do + "$" <> var_name -> + Map.get(param_map, var_name, part) + _ -> + part + end + end) + end + + defp build_param_map(params) do + # For now, use simple mapping: first param is $SERVICE, $PATH, $PROCESS, $HOST, etc. + # In the future, could support named parameters + case params do + [param1] -> %{"SERVICE" => param1, "PATH" => param1, "PROCESS" => param1, "HOST" => param1} + [param1, param2] -> %{"SERVICE" => param1, "PATH" => param2, "PROCESS" => param1, "HOST" => param1} + _ -> %{} + end + end + + defp execute_system_command(final_command, command_config, parsed_command) do + timeout = (command_config["timeout"] || 10) * 1000 # Convert to milliseconds + start_time = System.monotonic_time(:millisecond) + + Logger.info("Executing system command: #{inspect(final_command)} (timeout: #{timeout}ms)") + + try do + case System.cmd(List.first(final_command), Enum.drop(final_command, 1), stderr_to_stdout: true) do + {output, 0} -> + execution_time = System.monotonic_time(:millisecond) - start_time + Logger.info("Command completed successfully in #{execution_time}ms") + + {:ok, %{ + request_id: parsed_command.request_id, + command: parsed_command.trigger, + status: "success", + output: String.trim(output), + error: nil, + execution_time: execution_time / 1000.0, + timestamp: DateTime.utc_now() |> DateTime.to_iso8601() + }} + + {output, exit_code} -> + execution_time = System.monotonic_time(:millisecond) - start_time + Logger.warning("Command failed with exit code #{exit_code} in #{execution_time}ms") + + {:ok, %{ + request_id: parsed_command.request_id, + command: parsed_command.trigger, + status: "error", + output: String.trim(output), + error: "Command exited with code #{exit_code}", + execution_time: execution_time / 1000.0, + timestamp: DateTime.utc_now() |> DateTime.to_iso8601() + }} + end + rescue + error -> + execution_time = System.monotonic_time(:millisecond) - start_time + Logger.error("Command execution failed: #{inspect(error)}") + + {:ok, %{ + request_id: parsed_command.request_id, + command: parsed_command.trigger, + status: "error", + output: "", + error: "Execution failed: #{inspect(error)}", + execution_time: execution_time / 1000.0, + timestamp: DateTime.utc_now() |> DateTime.to_iso8601() + }} + end + end + + defp generate_request_id do + :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) + end +end \ No newline at end of file diff --git a/server/lib/systant/mqtt_client.ex b/server/lib/systant/mqtt_client.ex index fc60ed1..0374908 100644 --- a/server/lib/systant/mqtt_client.ex +++ b/server/lib/systant/mqtt_client.ex @@ -26,7 +26,7 @@ defmodule Systant.MqttClient do connection_opts = [ client_id: mqtt_config.client_id, server: {Tortoise.Transport.Tcp, host: to_charlist(mqtt_config.host), port: mqtt_config.port}, - handler: {Tortoise.Handler.Logger, []}, + handler: {Systant.MqttHandler, []}, user_name: mqtt_config.username, password: mqtt_config.password, subscriptions: [{mqtt_config.command_topic, mqtt_config.qos}] diff --git a/server/lib/systant/mqtt_handler.ex b/server/lib/systant/mqtt_handler.ex new file mode 100644 index 0000000..9cbb202 --- /dev/null +++ b/server/lib/systant/mqtt_handler.ex @@ -0,0 +1,150 @@ +defmodule Systant.MqttHandler do + @moduledoc """ + Custom MQTT handler for processing command messages + """ + + @behaviour Tortoise.Handler + require Logger + + def init(args) do + Logger.info("Initializing MQTT handler") + {:ok, args} + end + + def connection(status, state) do + case status do + :up -> + Logger.info("MQTT connection established") + :down -> + Logger.warning("MQTT connection lost") + :terminating -> + Logger.info("MQTT connection terminating") + end + {:ok, state} + end + + def subscription(status, topic_filter, state) do + case status do + :up -> + Logger.info("Subscribed to #{topic_filter}") + :down -> + Logger.warning("Subscription to #{topic_filter} lost") + end + {:ok, state} + end + + def handle_message(topic, payload, state) do + Logger.info("Received MQTT message on topic: #{topic}") + + # Only process command topics + if String.contains?(topic, "/commands") do + process_command_message(topic, payload, state) + else + Logger.debug("Ignoring non-command message on topic: #{topic}") + end + + {:ok, state} + end + + def terminate(reason, _state) do + Logger.info("MQTT handler terminating: #{inspect(reason)}") + :ok + end + + # Private functions + + defp process_command_message(topic, payload, _state) do + try do + # Parse the JSON command + case Jason.decode(payload) do + {:ok, command_data} -> + Logger.info("Processing command: #{inspect(command_data)}") + execute_and_respond(command_data, topic) + + {:error, reason} -> + Logger.error("Failed to parse command JSON: #{inspect(reason)}") + send_error_response(topic, "Invalid JSON format", nil) + end + rescue + error -> + Logger.error("Error processing command: #{inspect(error)}") + send_error_response(topic, "Command processing failed", nil) + end + end + + defp execute_and_respond(command_data, topic) do + # Load current configuration + config = Systant.Config.load_config() + + # Extract client ID from current state/config + mqtt_config = Systant.Config.mqtt_config(config) + + # Handle special "list" command to show available commands + if command_data["command"] == "list" do + available_commands = Systant.CommandExecutor.list_available_commands(config) + + response = %{ + request_id: command_data["request_id"] || generate_request_id(), + command: "list", + status: "success", + output: "Available commands: #{Enum.map(available_commands, &(&1.trigger)) |> Enum.join(", ")}", + data: available_commands, + error: nil, + execution_time: 0.0, + timestamp: DateTime.utc_now() |> DateTime.to_iso8601() + } + + response_topic = String.replace(topic, "/commands", "/responses") + response_payload = Jason.encode!(response) + + Tortoise.publish(mqtt_config.client_id, response_topic, response_payload, qos: mqtt_config.qos) + else + case Systant.CommandExecutor.execute_command(command_data, config) do + {:ok, response} -> + # Send response to the response topic + response_topic = String.replace(topic, "/commands", "/responses") + response_payload = Jason.encode!(response) + + case Tortoise.publish(mqtt_config.client_id, response_topic, response_payload, qos: mqtt_config.qos) do + :ok -> + Logger.info("Command response sent successfully") + {:error, reason} -> + Logger.error("Failed to send command response: #{inspect(reason)}") + end + + {:error, reason} -> + send_error_response(topic, reason, command_data["request_id"]) + end + end + end + + defp send_error_response(topic, error_message, request_id) do + config = Systant.Config.load_config() + mqtt_config = Systant.Config.mqtt_config(config) + + response_topic = String.replace(topic, "/commands", "/responses") + + error_response = %{ + request_id: request_id || "unknown", + command: "unknown", + status: "error", + output: "", + error: error_message, + execution_time: 0.0, + timestamp: DateTime.utc_now() |> DateTime.to_iso8601() + } + + response_payload = Jason.encode!(error_response) + + case Tortoise.publish(mqtt_config.client_id, response_topic, response_payload, qos: mqtt_config.qos) do + :ok -> + Logger.info("Error response sent successfully") + {:error, reason} -> + Logger.error("Failed to send error response: #{inspect(reason)}") + end + end + + defp generate_request_id do + :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) + end +end \ No newline at end of file diff --git a/server/systant.toml b/server/systant.toml index 297f903..67400b9 100644 --- a/server/systant.toml +++ b/server/systant.toml @@ -93,4 +93,57 @@ qos = 0 level = "info" # Log configuration loading and metric collection details log_config_changes = true -log_metric_collection = false \ No newline at end of file +log_metric_collection = false + +# Command Execution Configuration +[commands] +enabled = true +# Security: only allow predefined commands, no arbitrary shell execution +max_execution_time = 30 # seconds +log_all_commands = true + +# Define your custom commands here - these are examples, customize for your system +[[commands.available]] +name = "restart_service" +description = "Restart a system service" +trigger = "restart" +command = ["systemctl", "restart", "$SERVICE"] +allowed_params = ["nginx", "postgresql", "redis", "docker", "ssh"] +timeout = 30 +requires_confirmation = true + +[[commands.available]] +name = "system_info" +description = "Get system information" +trigger = "info" +command = ["uname", "-a"] +allowed_params = [] +timeout = 10 +requires_confirmation = false + +[[commands.available]] +name = "disk_usage" +description = "Check disk usage for specific paths" +trigger = "df" +command = ["df", "-h", "$PATH"] +allowed_params = ["/", "/home", "/var", "/tmp"] +timeout = 5 +requires_confirmation = false + +[[commands.available]] +name = "process_status" +description = "Check if a process is running" +trigger = "ps" +command = ["pgrep", "-f", "$PROCESS"] +allowed_params = ["nginx", "postgres", "redis", "docker", "systemd"] +timeout = 5 +requires_confirmation = false + +[[commands.available]] +name = "network_test" +description = "Test network connectivity" +trigger = "ping" +command = ["ping", "-c", "3", "$HOST"] +allowed_params = ["google.com", "1.1.1.1", "localhost"] +timeout = 15 +requires_confirmation = false \ No newline at end of file