From 96de648bcafb39db607e95d78ca75918b50934ac Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 2 Aug 2025 21:57:59 -0700 Subject: [PATCH] Add real-time MQTT-powered LiveView dashboard Features: - MQTT subscriber GenServer connects to mqtt.home - Real-time host discovery via systant/+/stats topic - LiveView with Phoenix PubSub for instant updates - Host cards showing live data and last seen timestamps - Clean UI with Tailwind styling - Proper OTP supervision tree Dashboard ready to receive live data from systant hosts! Visit /hosts to see real-time monitoring. --- dashboard/lib/dashboard/application.ex | 4 +- dashboard/lib/dashboard/mqtt_subscriber.ex | 122 +++++++++++++++ .../controllers/page_html/home.html.heex | 144 ++---------------- .../lib/dashboard_web/live/hosts_live.ex | 110 +++++++++++++ dashboard/lib/dashboard_web/router.ex | 1 + dashboard/mix.exs | 3 +- dashboard/mix.lock | 2 + 7 files changed, 249 insertions(+), 137 deletions(-) create mode 100644 dashboard/lib/dashboard/mqtt_subscriber.ex create mode 100644 dashboard/lib/dashboard_web/live/hosts_live.ex diff --git a/dashboard/lib/dashboard/application.ex b/dashboard/lib/dashboard/application.ex index 5819447..7c7b4f4 100644 --- a/dashboard/lib/dashboard/application.ex +++ b/dashboard/lib/dashboard/application.ex @@ -13,8 +13,8 @@ defmodule Dashboard.Application do {Phoenix.PubSub, name: Dashboard.PubSub}, # Start the Finch HTTP client for sending emails {Finch, name: Dashboard.Finch}, - # Start a worker by calling: Dashboard.Worker.start_link(arg) - # {Dashboard.Worker, arg}, + # Start MQTT subscriber for systant hosts + Dashboard.MqttSubscriber, # Start to serve requests, typically the last entry DashboardWeb.Endpoint ] diff --git a/dashboard/lib/dashboard/mqtt_subscriber.ex b/dashboard/lib/dashboard/mqtt_subscriber.ex new file mode 100644 index 0000000..f40b3b1 --- /dev/null +++ b/dashboard/lib/dashboard/mqtt_subscriber.ex @@ -0,0 +1,122 @@ +defmodule Dashboard.MqttSubscriber do + @moduledoc """ + MQTT subscriber that listens to systant host messages and broadcasts updates via Phoenix PubSub. + """ + use GenServer + require Logger + + alias Phoenix.PubSub + + @mqtt_topic "systant/+/stats" + @pubsub_topic "systant:hosts" + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def get_hosts do + GenServer.call(__MODULE__, :get_hosts) + end + + @impl true + def init(_opts) do + # Start MQTT connection + client_id = "systant_dashboard_#{System.unique_integer([:positive])}" + + connection_opts = [ + client_id: client_id, + server: {Tortoise.Transport.Tcp, host: "mqtt.home", port: 1883}, + handler: {__MODULE__, []} + ] + + case Tortoise.Supervisor.start_child(client_id, connection_opts) do + {:ok, _pid} -> + Logger.info("MQTT subscriber connected as #{client_id}") + + # Subscribe to systant stats topic + Tortoise.Connection.subscribe(client_id, @mqtt_topic, qos: 0) + + {:ok, %{client_id: client_id, hosts: %{}}} + + {:error, reason} -> + Logger.error("Failed to start MQTT connection: #{inspect(reason)}") + {:stop, reason} + end + end + + @impl true + def handle_call(:get_hosts, _from, state) do + {:reply, state.hosts, state} + end + + @impl true + def handle_info({:tortoise, {:publish, @mqtt_topic, payload, _opts}}, state) do + case Jason.decode(payload) do + {:ok, data} -> + # Extract hostname from topic + hostname = extract_hostname_from_topic(@mqtt_topic) + + # Update host data with timestamp + host_data = Map.put(data, "last_seen", DateTime.utc_now()) + updated_hosts = Map.put(state.hosts, hostname, host_data) + + # Broadcast update via PubSub + PubSub.broadcast(Dashboard.PubSub, @pubsub_topic, {:host_update, hostname, host_data}) + + Logger.debug("Received update from #{hostname}: #{inspect(data)}") + + {:noreply, %{state | hosts: updated_hosts}} + + {:error, reason} -> + Logger.warning("Failed to decode JSON payload: #{inspect(reason)}") + {:noreply, state} + end + end + + def handle_info({:tortoise, {:publish, topic, payload, _opts}}, state) do + # Extract hostname from the actual topic + case String.split(topic, "/") do + ["systant", hostname, "stats"] -> + case Jason.decode(payload) do + {:ok, data} -> + # Update host data with timestamp + host_data = Map.put(data, "last_seen", DateTime.utc_now()) + updated_hosts = Map.put(state.hosts, hostname, host_data) + + # Broadcast update via PubSub + PubSub.broadcast(Dashboard.PubSub, @pubsub_topic, {:host_update, hostname, host_data}) + + Logger.debug("Received update from #{hostname}: #{inspect(data)}") + + {:noreply, %{state | hosts: updated_hosts}} + + {:error, reason} -> + Logger.warning("Failed to decode JSON payload: #{inspect(reason)}") + {:noreply, state} + end + + _ -> + Logger.debug("Received message on unexpected topic: #{topic}") + {:noreply, state} + end + end + + def handle_info({:tortoise, _msg}, state) do + # Handle other tortoise messages (connection status, etc.) + {:noreply, state} + end + + # Private functions + + defp extract_hostname_from_topic(topic) do + case String.split(topic, "/") do + ["systant", hostname, "stats"] -> hostname + _ -> "unknown" + end + end + + # Tortoise handler callbacks (required when using handler: {module, args}) + def connection(_status, _state), do: [] + def subscription(_status, _topic_filter, _state), do: [] + def terminate(_reason, _state), do: [] +end \ No newline at end of file diff --git a/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex b/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex index d72b03c..c997d17 100644 --- a/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex +++ b/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex @@ -47,42 +47,34 @@ />

- Phoenix Framework + <.icon name="hero-computer-desktop" class="h-4 w-4" /> + Systant Dashboard - v{Application.spec(:phoenix, :vsn)} + Real-time monitoring

- Peace of mind from prototype to production. + Monitor all your hosts in real-time.

- Build rich, interactive web applications quickly, with less code and fewer moving parts. Join our growing community of developers using Phoenix to craft APIs, HTML5 apps and more, for fun or at scale. + Phoenix LiveView dashboard for systant hosts. Get real-time system statistics via MQTT from all your monitored servers.

diff --git a/dashboard/lib/dashboard_web/live/hosts_live.ex b/dashboard/lib/dashboard_web/live/hosts_live.ex new file mode 100644 index 0000000..da52a7b --- /dev/null +++ b/dashboard/lib/dashboard_web/live/hosts_live.ex @@ -0,0 +1,110 @@ +defmodule DashboardWeb.HostsLive do + @moduledoc """ + LiveView for real-time systant host monitoring. + """ + use DashboardWeb, :live_view + + alias Phoenix.PubSub + alias Dashboard.MqttSubscriber + + @pubsub_topic "systant:hosts" + + @impl true + def mount(_params, _session, socket) do + if connected?(socket) do + # Subscribe to host updates + PubSub.subscribe(Dashboard.PubSub, @pubsub_topic) + end + + # Get initial host data + hosts = MqttSubscriber.get_hosts() + + socket = + socket + |> assign(:hosts, hosts) + |> assign(:page_title, "Systant Hosts") + + {:ok, socket} + end + + @impl true + def handle_info({:host_update, hostname, host_data}, socket) do + updated_hosts = Map.put(socket.assigns.hosts, hostname, host_data) + {:noreply, assign(socket, :hosts, updated_hosts)} + end + + @impl true + def render(assigns) do + ~H""" +
+
+

+ <.icon name="hero-computer-desktop" class="h-4 w-4" /> + Systant Host Monitor +

+

+ Real-time system monitoring across all hosts +

+

+ Live MQTT-powered dashboard showing statistics from all your systant-enabled hosts. +

+ +
+ <%= if Enum.empty?(@hosts) do %> +
+ <.icon name="hero-signal-slash" class="mx-auto h-12 w-12 text-zinc-400" /> +

No hosts detected

+

+ Waiting for systant hosts to publish data via MQTT... +

+
+ <% else %> + <%= for {hostname, host_data} <- @hosts do %> + <.host_card hostname={hostname} data={host_data} /> + <% end %> + <% end %> +
+
+
+ """ + end + + attr :hostname, :string, required: true + attr :data, :map, required: true + + defp host_card(assigns) do + ~H""" +
+
+
+
+ <.icon name="hero-server" class="h-5 w-5 text-green-600" /> +
+
+

<%= @hostname %>

+

+ Last seen: <%= format_datetime(@data["last_seen"]) %> +

+
+
+
+ Online +
+
+ +
+

Raw Data:

+
+<%= Jason.encode!(@data, pretty: true) %>
+        
+
+
+ """ + end + + defp format_datetime(%DateTime{} = datetime) do + Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S UTC") + end + + defp format_datetime(_), do: "Unknown" +end \ No newline at end of file diff --git a/dashboard/lib/dashboard_web/router.ex b/dashboard/lib/dashboard_web/router.ex index b6d2368..dcdbf87 100644 --- a/dashboard/lib/dashboard_web/router.ex +++ b/dashboard/lib/dashboard_web/router.ex @@ -18,6 +18,7 @@ defmodule DashboardWeb.Router do pipe_through :browser get "/", PageController, :home + live "/hosts", HostsLive, :index end # Other scopes may use custom stacks. diff --git a/dashboard/mix.exs b/dashboard/mix.exs index 08cfd96..dd063ef 100644 --- a/dashboard/mix.exs +++ b/dashboard/mix.exs @@ -54,7 +54,8 @@ defmodule Dashboard.MixProject do {:gettext, "~> 0.26"}, {:jason, "~> 1.2"}, {:dns_cluster, "~> 0.1.1"}, - {:bandit, "~> 1.5"} + {:bandit, "~> 1.5"}, + {:tortoise, "~> 0.9.5"} ] end diff --git a/dashboard/mix.lock b/dashboard/mix.lock index 3678f01..38c7a14 100644 --- a/dashboard/mix.lock +++ b/dashboard/mix.lock @@ -7,6 +7,7 @@ "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, + "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized", depth: 1]}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, @@ -30,6 +31,7 @@ "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, + "tortoise": {:hex, :tortoise, "0.9.9", "2e467570ef1d342d4de8fdc6ba3861f841054ab524080ec3d7052ee07c04501d", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}], "hexpm", "4a316220b4b443c2497f42702f0c0616af3e4b2cbc6c150ebebb51657a773797"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.8", "3b97dc94e407e2d1fc666b2fb9acf6be81a1798a2602294aac000260a7c4a47d", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "315b9a1865552212b5f35140ad194e67ce31af45bcee443d4ecb96b5fd3f3782"}, }