systant/server/lib/systant/mqtt_client.ex
2025-08-10 20:34:22 -07:00

188 lines
6.6 KiB
Elixir

defmodule Systant.MqttClient do
use GenServer
require Logger
@moduledoc """
MQTT client for publishing system stats and handling commands
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
# Load the TOML-based configuration
app_config = Systant.Config.load_config()
mqtt_config = Systant.Config.mqtt_config(app_config)
Logger.info("Starting MQTT client with config: #{inspect(mqtt_config)}")
Logger.info("Attempting to connect to MQTT broker at #{mqtt_config.host}:#{mqtt_config.port}")
# Get hostname using same method as SystemMetrics
{:ok, hostname_charlist} = :inet.gethostname()
hostname = List.to_string(hostname_charlist)
# Store both configs for later use
state_config = %{
app_config: app_config,
mqtt_config: mqtt_config,
previous_network_stats: nil,
hostname: hostname
}
connection_opts = [
client_id: mqtt_config.client_id,
server: {Tortoise.Transport.Tcp, host: to_charlist(mqtt_config.host), port: mqtt_config.port},
handler: {Systant.MqttHandler, [client_id: mqtt_config.client_id]},
user_name: mqtt_config.username,
password: mqtt_config.password,
subscriptions: [{mqtt_config.command_topic, mqtt_config.qos}]
]
case Tortoise.Connection.start_link(connection_opts) do
{:ok, _pid} ->
Logger.info("MQTT client process started, verifying connection...")
# Wait a bit to verify the connection actually works
case wait_for_connection(mqtt_config.client_id, 5000) do
:ok ->
Logger.info("MQTT connection verified successfully")
# Send system metrics after a short delay to ensure dashboard is ready
startup_delay = Systant.Config.get(app_config, ["general", "startup_delay"]) || 5000
Process.send_after(self(), :publish_startup_stats, startup_delay)
Logger.info("Will publish initial stats in #{startup_delay}ms")
# Publish Home Assistant discovery after MQTT connection
Process.send_after(self(), :publish_ha_discovery, 1000)
Logger.info("Will publish HA discovery in 1000ms")
schedule_stats_publish(mqtt_config.publish_interval)
{:ok, state_config}
:timeout ->
Logger.error("MQTT connection timeout - broker at #{mqtt_config.host}:#{mqtt_config.port} is not responding")
Logger.error("Shutting down systant due to MQTT connection failure")
System.stop(1)
{:stop, :connection_timeout}
{:error, reason} ->
Logger.error("MQTT connection verification failed: #{inspect(reason)}")
Logger.error("Shutting down systant due to MQTT connection failure")
System.stop(1)
{:stop, reason}
end
{:error, reason} ->
Logger.error("Failed to start MQTT client: #{inspect(reason)}")
{:stop, reason}
end
end
def handle_info(:publish_ha_discovery, state) do
Logger.info("Publishing Home Assistant discovery configuration")
# Get hostname from system metrics (reuse existing logic)
stats = Systant.SystemMetrics.collect_metrics(state.app_config)
Systant.HaDiscovery.publish_discovery(state.mqtt_config.client_id, stats.hostname, state.app_config)
{:noreply, state}
end
def handle_info(:publish_startup_stats, state) do
Logger.info("Publishing initial system metrics")
{_stats, updated_state} = collect_and_publish_stats(state)
{:noreply, updated_state}
end
def handle_info(:publish_stats, state) do
{_stats, updated_state} = collect_and_publish_stats(state)
schedule_stats_publish(state.mqtt_config.publish_interval)
{:noreply, updated_state}
end
def handle_info(_msg, state) do
{:noreply, state}
end
def terminate(reason, _state) do
Logger.info("MQTT client terminating: #{inspect(reason)}")
:ok
end
defp collect_and_publish_stats(state) do
# Collect metrics with previous network stats for throughput calculation
stats = Systant.SystemMetrics.collect_metrics(state.app_config, state.previous_network_stats)
# Store current network stats for next iteration
current_network_stats = case Map.get(stats, :network) do
network_data when is_list(network_data) ->
%{
interfaces: network_data,
timestamp: System.monotonic_time(:second)
}
_ -> nil
end
updated_state = Map.put(state, :previous_network_stats, current_network_stats)
# Publish the stats
payload = Jason.encode!(stats)
case Tortoise.publish(state.mqtt_config.client_id, state.mqtt_config.stats_topic, payload, qos: state.mqtt_config.qos) do
:ok ->
Logger.info("Published system metrics for #{stats.hostname}")
{:error, reason} ->
Logger.error("Failed to publish stats: #{inspect(reason)}")
end
{stats, updated_state}
end
# Legacy function for compatibility if needed
defp publish_stats(app_config, mqtt_config) do
stats = Systant.SystemMetrics.collect_metrics(app_config)
payload = Jason.encode!(stats)
case Tortoise.publish(mqtt_config.client_id, mqtt_config.stats_topic, payload, qos: mqtt_config.qos) do
:ok ->
Logger.info("Published system metrics for #{stats.hostname}")
{:error, reason} ->
Logger.error("Failed to publish stats: #{inspect(reason)}")
end
end
defp schedule_stats_publish(interval) do
Process.send_after(self(), :publish_stats, interval)
end
defp wait_for_connection(client_id, timeout_ms) do
# Try to publish a test message to verify the connection
test_topic = "systant/connection_test"
test_payload = "test"
try do
case Tortoise.publish_sync(client_id, test_topic, test_payload, qos: 0, timeout: timeout_ms) do
:ok ->
Logger.debug("MQTT connection test successful")
:ok
{:error, :timeout} ->
Logger.error("MQTT connection test timed out")
:timeout
{:error, reason} ->
Logger.error("MQTT connection test failed: #{inspect(reason)}")
{:error, reason}
other ->
Logger.error("MQTT connection test unexpected result: #{inspect(other)}")
{:error, other}
end
rescue
error ->
Logger.error("MQTT connection test exception: #{inspect(error)}")
{:error, :connection_failed}
catch
:exit, reason ->
Logger.error("MQTT connection test exit: #{inspect(reason)}")
{:error, :connection_failed}
end
end
end