- Create basic integration structure with manifest.json - Add config flow for easy setup via UI - Implement MQTT-based host discovery and sensor creation - Auto-discover Systant hosts via systant/+/stats topic - Create device entities with last_seen sensor for each host - Add Python tooling to flake.nix for HA development Integration features: - Automatic host discovery via MQTT - Device representation for each monitored host - Extensible sensor architecture - Proper Home Assistant integration patterns
161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
"""Support for Systant sensors."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
from homeassistant.components import mqtt
|
|
from homeassistant.components.sensor import SensorEntity
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import DOMAIN, MQTT_STATS_TOPIC
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
config_entry: ConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up Systant sensors from config entry."""
|
|
mqtt_prefix = config_entry.data.get("mqtt_prefix", "systant")
|
|
|
|
# Create a coordinator that will manage discovered hosts
|
|
coordinator = SystantCoordinator(hass, mqtt_prefix)
|
|
await coordinator.async_setup()
|
|
|
|
# Store coordinator in hass data
|
|
hass.data[DOMAIN][config_entry.entry_id] = coordinator
|
|
|
|
|
|
class SystantCoordinator:
|
|
"""Coordinate discovery and management of Systant hosts."""
|
|
|
|
def __init__(self, hass: HomeAssistant, mqtt_prefix: str) -> None:
|
|
"""Initialize the coordinator."""
|
|
self.hass = hass
|
|
self.mqtt_prefix = mqtt_prefix
|
|
self.hosts: dict[str, SystantHost] = {}
|
|
self._unsubscribe_mqtt = None
|
|
|
|
async def async_setup(self) -> None:
|
|
"""Set up MQTT subscription for host discovery."""
|
|
topic = f"{self.mqtt_prefix}/+/stats"
|
|
|
|
self._unsubscribe_mqtt = await mqtt.async_subscribe(
|
|
self.hass, topic, self._message_received, 0
|
|
)
|
|
|
|
_LOGGER.info("Subscribed to %s for Systant host discovery", topic)
|
|
|
|
@callback
|
|
def _message_received(self, message: mqtt.ReceiveMessage) -> None:
|
|
"""Handle received MQTT message."""
|
|
try:
|
|
# Extract hostname from topic: systant/hostname/stats
|
|
topic_parts = message.topic.split("/")
|
|
if len(topic_parts) != 3:
|
|
return
|
|
|
|
hostname = topic_parts[1]
|
|
|
|
# Parse message payload
|
|
payload = json.loads(message.payload)
|
|
|
|
# Create or update host
|
|
if hostname not in self.hosts:
|
|
_LOGGER.info("Discovered new Systant host: %s", hostname)
|
|
self.hosts[hostname] = SystantHost(self.hass, hostname, self.mqtt_prefix)
|
|
|
|
# Update host with new data
|
|
self.hosts[hostname].update_data(payload)
|
|
|
|
except json.JSONDecodeError:
|
|
_LOGGER.warning("Invalid JSON received from %s", message.topic)
|
|
except Exception as err:
|
|
_LOGGER.error("Error processing message from %s: %s", message.topic, err)
|
|
|
|
async def async_unload(self) -> None:
|
|
"""Unload the coordinator."""
|
|
if self._unsubscribe_mqtt:
|
|
self._unsubscribe_mqtt()
|
|
|
|
|
|
class SystantHost:
|
|
"""Represent a Systant host with its sensors."""
|
|
|
|
def __init__(self, hass: HomeAssistant, hostname: str, mqtt_prefix: str) -> None:
|
|
"""Initialize the host."""
|
|
self.hass = hass
|
|
self.hostname = hostname
|
|
self.mqtt_prefix = mqtt_prefix
|
|
self.sensors: list[SystantSensor] = []
|
|
self.last_seen = None
|
|
|
|
# Create sensors for this host
|
|
self._create_sensors()
|
|
|
|
def _create_sensors(self) -> None:
|
|
"""Create sensor entities for this host."""
|
|
# Last seen sensor
|
|
last_seen_sensor = SystantLastSeenSensor(self.hostname, self.mqtt_prefix)
|
|
self.sensors.append(last_seen_sensor)
|
|
|
|
# Add sensors to Home Assistant
|
|
self.hass.async_create_task(
|
|
self.hass.helpers.entity_platform.async_add_entities(
|
|
self.sensors, update_before_add=True
|
|
)
|
|
)
|
|
|
|
def update_data(self, data: dict[str, Any]) -> None:
|
|
"""Update sensors with new data."""
|
|
self.last_seen = dt_util.utcnow()
|
|
|
|
# Update all sensors
|
|
for sensor in self.sensors:
|
|
sensor.update_from_data(data)
|
|
|
|
|
|
class SystantSensor(SensorEntity):
|
|
"""Base class for Systant sensors."""
|
|
|
|
def __init__(self, hostname: str, mqtt_prefix: str, sensor_type: str) -> None:
|
|
"""Initialize the sensor."""
|
|
self._hostname = hostname
|
|
self._mqtt_prefix = mqtt_prefix
|
|
self._sensor_type = sensor_type
|
|
self._attr_unique_id = f"{mqtt_prefix}_{hostname}_{sensor_type}"
|
|
self._attr_name = f"Systant {hostname} {sensor_type.replace('_', ' ').title()}"
|
|
self._attr_device_info = {
|
|
"identifiers": {(DOMAIN, f"{mqtt_prefix}_{hostname}")},
|
|
"name": f"Systant {hostname}",
|
|
"manufacturer": "Systant",
|
|
"model": "System Monitor",
|
|
"via_device": (DOMAIN, mqtt_prefix),
|
|
}
|
|
|
|
def update_from_data(self, data: dict[str, Any]) -> None:
|
|
"""Update sensor from MQTT data."""
|
|
# To be implemented by subclasses
|
|
pass
|
|
|
|
|
|
class SystantLastSeenSensor(SystantSensor):
|
|
"""Sensor for last seen timestamp."""
|
|
|
|
def __init__(self, hostname: str, mqtt_prefix: str) -> None:
|
|
"""Initialize the last seen sensor."""
|
|
super().__init__(hostname, mqtt_prefix, "last_seen")
|
|
self._attr_device_class = "timestamp"
|
|
self._attr_icon = "mdi:clock-outline"
|
|
|
|
def update_from_data(self, data: dict[str, Any]) -> None:
|
|
"""Update with current timestamp."""
|
|
self._attr_native_value = dt_util.utcnow()
|
|
self.async_write_ha_state() |