import mqtt, { type MqttClient, type IClientOptions } from "mqtt"; import type { Config, EntityConfig } from "./config"; export interface MqttConnection { client: MqttClient; publish(topic: string, payload: string | object, retain?: boolean): Promise; subscribe(topic: string, handler: (topic: string, payload: Buffer) => void): Promise; disconnect(): Promise; } export async function connect(config: Config, hostname: string): Promise { const options: IClientOptions = { clientId: config.mqtt.clientId || `systant-${hostname}`, username: config.mqtt.username, password: config.mqtt.password, will: { topic: `${config.mqtt.topicPrefix}/${hostname}/status`, payload: Buffer.from("offline"), qos: 1, retain: true, }, }; const client = mqtt.connect(config.mqtt.broker, options); const handlers = new Map void>(); await new Promise((resolve, reject) => { client.on("connect", () => { console.log(`Connected to MQTT broker: ${config.mqtt.broker}`); resolve(); }); client.on("error", reject); }); client.on("message", (topic, payload) => { for (const [pattern, handler] of handlers) { if (topicMatches(pattern, topic)) { handler(topic, payload); } } }); // Publish online status await publishAsync(client, `${config.mqtt.topicPrefix}/${hostname}/status`, "online", true); // Publish HA discovery if enabled if (config.homeassistant.discovery) { await publishDiscovery(client, config, hostname); } return { client, async publish(topic: string, payload: string | object, retain = false): Promise { const fullTopic = `${config.mqtt.topicPrefix}/${hostname}/${topic}`; const data = typeof payload === "object" ? JSON.stringify(payload) : payload; await publishAsync(client, fullTopic, data, retain); }, async subscribe(topic: string, handler: (topic: string, payload: Buffer) => void): Promise { const fullTopic = `${config.mqtt.topicPrefix}/${hostname}/${topic}`; handlers.set(fullTopic, handler); await new Promise((resolve, reject) => { client.subscribe(fullTopic, { qos: 1 }, (err) => { if (err) reject(err); else resolve(); }); }); console.log(`Subscribed to: ${fullTopic}`); }, async disconnect(): Promise { await publishAsync(client, `${config.mqtt.topicPrefix}/${hostname}/status`, "offline", true); await new Promise((resolve) => client.end(false, {}, () => resolve())); }, }; } function publishAsync(client: MqttClient, topic: string, payload: string, retain: boolean): Promise { return new Promise((resolve, reject) => { client.publish(topic, payload, { qos: 1, retain }, (err) => { if (err) reject(err); else resolve(); }); }); } function topicMatches(pattern: string, topic: string): boolean { if (pattern === topic) return true; if (pattern.endsWith("#")) { return topic.startsWith(pattern.slice(0, -1)); } return false; } async function publishDiscovery(client: MqttClient, config: Config, hostname: string): Promise { const prefix = config.homeassistant.discoveryPrefix; const topicPrefix = config.mqtt.topicPrefix; const entityCount = Object.keys(config.entities).length; if (entityCount === 0) { console.log("No entities configured, skipping HA discovery"); return; } for (const [id, entity] of Object.entries(config.entities)) { const payload = buildDiscoveryPayload(id, entity, hostname, topicPrefix); const discoveryTopic = `${prefix}/${entity.type}/${hostname}_${id}/config`; await publishAsync(client, discoveryTopic, JSON.stringify(payload), true); } console.log(`Published Home Assistant discovery for ${entityCount} entity/entities`); } function buildDiscoveryPayload( id: string, entity: EntityConfig, hostname: string, topicPrefix: string ): Record { const displayName = entity.name || id.replace(/_/g, " "); const payload: Record = { name: displayName, unique_id: `systant_${hostname}_${id}`, device: { identifiers: [`systant_${hostname}`], name: `${hostname}`, manufacturer: "Systant", }, }; // Stateful entities have a state_topic (buttons don't) if (entity.type !== "button") { payload.state_topic = `${topicPrefix}/${hostname}/${id}/state`; } // Add availability tracking unless explicitly disabled if (entity.availability !== false) { payload.availability_topic = `${topicPrefix}/${hostname}/status`; payload.payload_available = "online"; payload.payload_not_available = "offline"; } // Common optional fields if (entity.icon) payload.icon = entity.icon; // Type-specific fields switch (entity.type) { case "sensor": if (entity.unit) payload.unit_of_measurement = entity.unit; if (entity.device_class) payload.device_class = entity.device_class; break; case "binary_sensor": payload.payload_on = "ON"; payload.payload_off = "OFF"; if (entity.device_class) payload.device_class = entity.device_class; break; case "light": case "switch": payload.command_topic = `${topicPrefix}/${hostname}/${id}/set`; payload.payload_on = "ON"; payload.payload_off = "OFF"; payload.state_on = "ON"; payload.state_off = "OFF"; break; case "button": payload.command_topic = `${topicPrefix}/${hostname}/${id}/press`; payload.payload_press = "PRESS"; break; } return payload; }