diff --git a/dashboard/lib/dashboard/application.ex b/dashboard/lib/dashboard/application.ex index 5819447..7c7b4f4 100644 --- a/dashboard/lib/dashboard/application.ex +++ b/dashboard/lib/dashboard/application.ex @@ -13,8 +13,8 @@ defmodule Dashboard.Application do {Phoenix.PubSub, name: Dashboard.PubSub}, # Start the Finch HTTP client for sending emails {Finch, name: Dashboard.Finch}, - # Start a worker by calling: Dashboard.Worker.start_link(arg) - # {Dashboard.Worker, arg}, + # Start MQTT subscriber for systant hosts + Dashboard.MqttSubscriber, # Start to serve requests, typically the last entry DashboardWeb.Endpoint ] diff --git a/dashboard/lib/dashboard/mqtt_subscriber.ex b/dashboard/lib/dashboard/mqtt_subscriber.ex new file mode 100644 index 0000000..f40b3b1 --- /dev/null +++ b/dashboard/lib/dashboard/mqtt_subscriber.ex @@ -0,0 +1,122 @@ +defmodule Dashboard.MqttSubscriber do + @moduledoc """ + MQTT subscriber that listens to systant host messages and broadcasts updates via Phoenix PubSub. + """ + use GenServer + require Logger + + alias Phoenix.PubSub + + @mqtt_topic "systant/+/stats" + @pubsub_topic "systant:hosts" + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def get_hosts do + GenServer.call(__MODULE__, :get_hosts) + end + + @impl true + def init(_opts) do + # Start MQTT connection + client_id = "systant_dashboard_#{System.unique_integer([:positive])}" + + connection_opts = [ + client_id: client_id, + server: {Tortoise.Transport.Tcp, host: "mqtt.home", port: 1883}, + handler: {__MODULE__, []} + ] + + case Tortoise.Supervisor.start_child(client_id, connection_opts) do + {:ok, _pid} -> + Logger.info("MQTT subscriber connected as #{client_id}") + + # Subscribe to systant stats topic + Tortoise.Connection.subscribe(client_id, @mqtt_topic, qos: 0) + + {:ok, %{client_id: client_id, hosts: %{}}} + + {:error, reason} -> + Logger.error("Failed to start MQTT connection: #{inspect(reason)}") + {:stop, reason} + end + end + + @impl true + def handle_call(:get_hosts, _from, state) do + {:reply, state.hosts, state} + end + + @impl true + def handle_info({:tortoise, {:publish, @mqtt_topic, payload, _opts}}, state) do + case Jason.decode(payload) do + {:ok, data} -> + # Extract hostname from topic + hostname = extract_hostname_from_topic(@mqtt_topic) + + # Update host data with timestamp + host_data = Map.put(data, "last_seen", DateTime.utc_now()) + updated_hosts = Map.put(state.hosts, hostname, host_data) + + # Broadcast update via PubSub + PubSub.broadcast(Dashboard.PubSub, @pubsub_topic, {:host_update, hostname, host_data}) + + Logger.debug("Received update from #{hostname}: #{inspect(data)}") + + {:noreply, %{state | hosts: updated_hosts}} + + {:error, reason} -> + Logger.warning("Failed to decode JSON payload: #{inspect(reason)}") + {:noreply, state} + end + end + + def handle_info({:tortoise, {:publish, topic, payload, _opts}}, state) do + # Extract hostname from the actual topic + case String.split(topic, "/") do + ["systant", hostname, "stats"] -> + case Jason.decode(payload) do + {:ok, data} -> + # Update host data with timestamp + host_data = Map.put(data, "last_seen", DateTime.utc_now()) + updated_hosts = Map.put(state.hosts, hostname, host_data) + + # Broadcast update via PubSub + PubSub.broadcast(Dashboard.PubSub, @pubsub_topic, {:host_update, hostname, host_data}) + + Logger.debug("Received update from #{hostname}: #{inspect(data)}") + + {:noreply, %{state | hosts: updated_hosts}} + + {:error, reason} -> + Logger.warning("Failed to decode JSON payload: #{inspect(reason)}") + {:noreply, state} + end + + _ -> + Logger.debug("Received message on unexpected topic: #{topic}") + {:noreply, state} + end + end + + def handle_info({:tortoise, _msg}, state) do + # Handle other tortoise messages (connection status, etc.) + {:noreply, state} + end + + # Private functions + + defp extract_hostname_from_topic(topic) do + case String.split(topic, "/") do + ["systant", hostname, "stats"] -> hostname + _ -> "unknown" + end + end + + # Tortoise handler callbacks (required when using handler: {module, args}) + def connection(_status, _state), do: [] + def subscription(_status, _topic_filter, _state), do: [] + def terminate(_reason, _state), do: [] +end \ No newline at end of file diff --git a/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex b/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex index d72b03c..c997d17 100644 --- a/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex +++ b/dashboard/lib/dashboard_web/controllers/page_html/home.html.heex @@ -47,42 +47,34 @@ />

- Phoenix Framework + <.icon name="hero-computer-desktop" class="h-4 w-4" /> + Systant Dashboard - v{Application.spec(:phoenix, :vsn)} + Real-time monitoring

- Peace of mind from prototype to production. + Monitor all your hosts in real-time.

- Build rich, interactive web applications quickly, with less code and fewer moving parts. Join our growing community of developers using Phoenix to craft APIs, HTML5 apps and more, for fun or at scale. + Phoenix LiveView dashboard for systant hosts. Get real-time system statistics via MQTT from all your monitored servers.

-
diff --git a/dashboard/lib/dashboard_web/live/hosts_live.ex b/dashboard/lib/dashboard_web/live/hosts_live.ex new file mode 100644 index 0000000..da52a7b --- /dev/null +++ b/dashboard/lib/dashboard_web/live/hosts_live.ex @@ -0,0 +1,110 @@ +defmodule DashboardWeb.HostsLive do + @moduledoc """ + LiveView for real-time systant host monitoring. + """ + use DashboardWeb, :live_view + + alias Phoenix.PubSub + alias Dashboard.MqttSubscriber + + @pubsub_topic "systant:hosts" + + @impl true + def mount(_params, _session, socket) do + if connected?(socket) do + # Subscribe to host updates + PubSub.subscribe(Dashboard.PubSub, @pubsub_topic) + end + + # Get initial host data + hosts = MqttSubscriber.get_hosts() + + socket = + socket + |> assign(:hosts, hosts) + |> assign(:page_title, "Systant Hosts") + + {:ok, socket} + end + + @impl true + def handle_info({:host_update, hostname, host_data}, socket) do + updated_hosts = Map.put(socket.assigns.hosts, hostname, host_data) + {:noreply, assign(socket, :hosts, updated_hosts)} + end + + @impl true + def render(assigns) do + ~H""" +
+
+

+ <.icon name="hero-computer-desktop" class="h-4 w-4" /> + Systant Host Monitor +

+

+ Real-time system monitoring across all hosts +

+

+ Live MQTT-powered dashboard showing statistics from all your systant-enabled hosts. +

+ +
+ <%= if Enum.empty?(@hosts) do %> +
+ <.icon name="hero-signal-slash" class="mx-auto h-12 w-12 text-zinc-400" /> +

No hosts detected

+

+ Waiting for systant hosts to publish data via MQTT... +

+
+ <% else %> + <%= for {hostname, host_data} <- @hosts do %> + <.host_card hostname={hostname} data={host_data} /> + <% end %> + <% end %> +
+
+
+ """ + end + + attr :hostname, :string, required: true + attr :data, :map, required: true + + defp host_card(assigns) do + ~H""" +
+
+
+
+ <.icon name="hero-server" class="h-5 w-5 text-green-600" /> +
+
+

<%= @hostname %>

+

+ Last seen: <%= format_datetime(@data["last_seen"]) %> +

+
+
+
+ Online +
+
+ +
+

Raw Data:

+
+<%= Jason.encode!(@data, pretty: true) %>
+        
+
+
+ """ + end + + defp format_datetime(%DateTime{} = datetime) do + Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S UTC") + end + + defp format_datetime(_), do: "Unknown" +end \ No newline at end of file diff --git a/dashboard/lib/dashboard_web/router.ex b/dashboard/lib/dashboard_web/router.ex index b6d2368..dcdbf87 100644 --- a/dashboard/lib/dashboard_web/router.ex +++ b/dashboard/lib/dashboard_web/router.ex @@ -18,6 +18,7 @@ defmodule DashboardWeb.Router do pipe_through :browser get "/", PageController, :home + live "/hosts", HostsLive, :index end # Other scopes may use custom stacks. diff --git a/dashboard/mix.exs b/dashboard/mix.exs index 08cfd96..dd063ef 100644 --- a/dashboard/mix.exs +++ b/dashboard/mix.exs @@ -54,7 +54,8 @@ defmodule Dashboard.MixProject do {:gettext, "~> 0.26"}, {:jason, "~> 1.2"}, {:dns_cluster, "~> 0.1.1"}, - {:bandit, "~> 1.5"} + {:bandit, "~> 1.5"}, + {:tortoise, "~> 0.9.5"} ] end diff --git a/dashboard/mix.lock b/dashboard/mix.lock index 3678f01..38c7a14 100644 --- a/dashboard/mix.lock +++ b/dashboard/mix.lock @@ -7,6 +7,7 @@ "file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"}, "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, + "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized", depth: 1]}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, @@ -30,6 +31,7 @@ "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, + "tortoise": {:hex, :tortoise, "0.9.9", "2e467570ef1d342d4de8fdc6ba3861f841054ab524080ec3d7052ee07c04501d", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}], "hexpm", "4a316220b4b443c2497f42702f0c0616af3e4b2cbc6c150ebebb51657a773797"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.8", "3b97dc94e407e2d1fc666b2fb9acf6be81a1798a2602294aac000260a7c4a47d", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "315b9a1865552212b5f35140ad194e67ce31af45bcee443d4ecb96b5fd3f3782"}, }