package mqttclient import ( "encoding/json" "errors" "fmt" "strconv" "strings" "sync" "time" "invertergui/mk2driver" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/sirupsen/logrus" ) var log = logrus.WithField("ctx", "inverter-gui-mqtt") const keepAlive = 5 * time.Second const ( commandKindSetting = "setting" commandKindRAMVar = "ram_var" commandKindPanel = "panel_state" commandKindStandby = "standby" writeStatusOK = "ok" writeStatusError = "error" ) type HomeAssistantConfig struct { Enabled bool DiscoveryPrefix string NodeID string DeviceName string } // Config sets MQTT client configuration type Config struct { Broker string ClientID string Topic string CommandTopic string StatusTopic string HomeAssistant HomeAssistantConfig Username string Password string } type writeCommand struct { RequestID string Kind string ID uint16 Value int16 HasSwitch bool SwitchState mk2driver.PanelSwitchState SwitchName string CurrentLimitA *float64 Standby *bool } type writeCommandPayload struct { RequestID string `json:"request_id"` Kind string `json:"kind"` Type string `json:"type"` ID *uint16 `json:"id"` Value json.RawMessage `json:"value"` Switch string `json:"switch"` SwitchState string `json:"switch_state"` CurrentLimitA *float64 `json:"current_limit"` Standby *bool `json:"standby"` } type writeStatus struct { RequestID string `json:"request_id,omitempty"` Status string `json:"status"` Kind string `json:"kind,omitempty"` ID uint16 `json:"id"` Value int16 `json:"value"` Switch string `json:"switch,omitempty"` CurrentLimitA *float64 `json:"current_limit,omitempty"` Standby *bool `json:"standby,omitempty"` Error string `json:"error,omitempty"` Timestamp time.Time `json:"timestamp"` } type haDiscoveryDefinition struct { Component string ObjectID string Config map[string]any } type panelStateCache struct { mu sync.Mutex hasSwitch bool switchName string switchState mk2driver.PanelSwitchState } // New creates an MQTT client that publishes MK2 updates and optionally handles setting write commands. func New(mk2 mk2driver.Mk2, writer mk2driver.SettingsWriter, config Config) error { c := mqtt.NewClient(getOpts(config)) if token := c.Connect(); token.Wait() && token.Error() != nil { return token.Error() } cache := &panelStateCache{} if config.HomeAssistant.Enabled { if err := publishHAAvailability(c, config, "online"); err != nil { return fmt.Errorf("could not publish Home Assistant availability: %w", err) } if err := publishHADiscovery(c, config); err != nil { return fmt.Errorf("could not publish Home Assistant discovery payloads: %w", err) } if writer != nil { if err := subscribeHAPanelModeState(c, config, cache); err != nil { log.Warnf("Could not subscribe to Home Assistant panel mode state topic: %v", err) } } } if config.CommandTopic != "" { if writer == nil { log.Warnf("MQTT command topic %q configured, but no settings writer is available", config.CommandTopic) } else { t := c.Subscribe(config.CommandTopic, 1, commandHandler(c, writer, config, cache)) t.Wait() if t.Error() != nil { return fmt.Errorf("could not subscribe to MQTT command topic %q: %w", config.CommandTopic, t.Error()) } log.Infof("Subscribed to MQTT command topic: %s", config.CommandTopic) } } go func() { for e := range mk2.C() { if e == nil || !e.Valid { continue } if err := publishJSON(c, config.Topic, e, 0, false); err != nil { log.Errorf("Could not publish update to MQTT topic %q: %v", config.Topic, err) } } }() return nil } func subscribeHAPanelModeState(client mqtt.Client, config Config, cache *panelStateCache) error { if cache == nil { return nil } stateTopic := haPanelSwitchStateTopic(config) t := client.Subscribe(stateTopic, 1, func(_ mqtt.Client, msg mqtt.Message) { switchState, switchName, err := normalizePanelSwitch(string(msg.Payload())) if err != nil { return } cache.remember(writeCommand{ Kind: commandKindPanel, HasSwitch: true, SwitchState: switchState, SwitchName: switchName, }) }) t.Wait() return t.Error() } func commandHandler(client mqtt.Client, writer mk2driver.SettingsWriter, config Config, cache *panelStateCache) mqtt.MessageHandler { if cache == nil { cache = &panelStateCache{} } return func(_ mqtt.Client, msg mqtt.Message) { cmd, err := decodeWriteCommand(msg.Payload()) if err != nil { log.Errorf("Invalid MQTT write command payload from topic %q: %v", msg.Topic(), err) publishWriteStatus(client, config.StatusTopic, writeStatus{ Status: writeStatusError, Error: err.Error(), Timestamp: time.Now().UTC(), }) return } execCmd := cmd status := writeStatus{ RequestID: cmd.RequestID, Status: writeStatusOK, Kind: cmd.Kind, Timestamp: time.Now().UTC(), } switch cmd.Kind { case commandKindPanel: execCmd, err = cache.resolvePanelCommand(cmd) if err != nil { status.Status = writeStatusError status.Error = err.Error() log.Errorf("Invalid MQTT write command %s: %v", formatWriteCommandLog(cmd), err) publishWriteStatus(client, config.StatusTopic, status) return } status.Switch = execCmd.SwitchName status.CurrentLimitA = execCmd.CurrentLimitA case commandKindStandby: status.Standby = copyBoolPtr(execCmd.Standby) default: status.ID = cmd.ID status.Value = cmd.Value } if err := executeWriteCommand(writer, execCmd); err != nil { status.Status = writeStatusError status.Error = err.Error() log.Errorf("Failed MQTT write command %s: %v", formatWriteCommandLog(execCmd), err) } else { log.Infof("Applied MQTT write command %s", formatWriteCommandLog(execCmd)) cache.remember(execCmd) if config.HomeAssistant.Enabled { if err := publishHAControlState(client, config, execCmd); err != nil { log.Errorf("Could not publish Home Assistant control state update: %v", err) } } } publishWriteStatus(client, config.StatusTopic, status) } } func (c *panelStateCache) resolvePanelCommand(cmd writeCommand) (writeCommand, error) { if cmd.Kind != commandKindPanel { return cmd, nil } if cmd.HasSwitch { return cmd, nil } c.mu.Lock() defer c.mu.Unlock() if !c.hasSwitch { return writeCommand{}, errors.New("panel_state command missing switch and no prior mode is known; set mode once first") } cmd.HasSwitch = true cmd.SwitchName = c.switchName cmd.SwitchState = c.switchState return cmd, nil } func (c *panelStateCache) remember(cmd writeCommand) { if cmd.Kind != commandKindPanel || !cmd.HasSwitch { return } c.mu.Lock() c.hasSwitch = true c.switchName = cmd.SwitchName c.switchState = cmd.SwitchState c.mu.Unlock() } func decodeWriteCommand(payload []byte) (writeCommand, error) { msg := writeCommandPayload{} if err := json.Unmarshal(payload, &msg); err != nil { return writeCommand{}, fmt.Errorf("invalid JSON payload: %w", err) } kind := msg.Kind if kind == "" { kind = msg.Type } normalizedKind, err := normalizeWriteKind(kind) if err != nil { return writeCommand{}, err } if normalizedKind == commandKindPanel { switchName := msg.Switch if switchName == "" { switchName = msg.SwitchState } hasSwitch := false switchState := mk2driver.PanelSwitchState(0) normalizedSwitchName := "" if switchName != "" { var err error switchState, normalizedSwitchName, err = normalizePanelSwitch(switchName) if err != nil { return writeCommand{}, err } hasSwitch = true } if msg.CurrentLimitA != nil && *msg.CurrentLimitA < 0 { return writeCommand{}, fmt.Errorf("current_limit must be >= 0, got %.3f", *msg.CurrentLimitA) } if !hasSwitch && msg.CurrentLimitA == nil { return writeCommand{}, errors.New(`missing required field "switch" (or "switch_state"), or "current_limit"`) } return writeCommand{ RequestID: msg.RequestID, Kind: normalizedKind, HasSwitch: hasSwitch, SwitchState: switchState, SwitchName: normalizedSwitchName, CurrentLimitA: msg.CurrentLimitA, }, nil } if normalizedKind == commandKindStandby { standby, err := decodeStandbyValue(msg) if err != nil { return writeCommand{}, err } return writeCommand{ RequestID: msg.RequestID, Kind: normalizedKind, Standby: &standby, }, nil } if msg.ID == nil { return writeCommand{}, errors.New(`missing required field "id"`) } value, err := decodeInt16Value(msg.Value) if err != nil { return writeCommand{}, err } return writeCommand{ RequestID: msg.RequestID, Kind: normalizedKind, ID: *msg.ID, Value: value, }, nil } func normalizeWriteKind(raw string) (string, error) { switch strings.ToLower(strings.TrimSpace(raw)) { case "setting", "settings": return commandKindSetting, nil case "ram", "ramvar", "ram_var", "ram-variable", "ramvariable": return commandKindRAMVar, nil case "panel", "panel_state", "switch", "remote_panel": return commandKindPanel, nil case "standby", "panel_standby", "remote_panel_standby": return commandKindStandby, nil default: return "", fmt.Errorf("unsupported write command kind %q", raw) } } func normalizePanelSwitch(raw string) (mk2driver.PanelSwitchState, string, error) { switch strings.ToLower(strings.TrimSpace(raw)) { case "1", "charger_only", "charger-only", "charger": return mk2driver.PanelSwitchChargerOnly, "charger_only", nil case "2", "inverter_only", "inverter-only", "inverter": return mk2driver.PanelSwitchInverterOnly, "inverter_only", nil case "3", "on": return mk2driver.PanelSwitchOn, "on", nil case "4", "off": return mk2driver.PanelSwitchOff, "off", nil default: return 0, "", fmt.Errorf("unsupported panel switch %q", raw) } } func executeWriteCommand(writer mk2driver.SettingsWriter, cmd writeCommand) error { if writer == nil { return errors.New("settings writer is not available") } switch cmd.Kind { case commandKindPanel: if !cmd.HasSwitch { return errors.New("panel_state command requires a switch state") } return writer.SetPanelState(cmd.SwitchState, cmd.CurrentLimitA) case commandKindStandby: if cmd.Standby == nil { return errors.New("standby command missing standby value") } return writer.SetStandby(*cmd.Standby) case commandKindRAMVar: return writer.WriteRAMVar(cmd.ID, cmd.Value) case commandKindSetting: return writer.WriteSetting(cmd.ID, cmd.Value) default: return fmt.Errorf("unsupported write command kind %q", cmd.Kind) } } func formatWriteCommandLog(cmd writeCommand) string { switch cmd.Kind { case commandKindPanel: switchName := cmd.SwitchName if switchName == "" { switchName = "" } if cmd.CurrentLimitA == nil { return fmt.Sprintf("kind=%s switch=%s", cmd.Kind, switchName) } return fmt.Sprintf("kind=%s switch=%s current_limit=%.3f", cmd.Kind, switchName, *cmd.CurrentLimitA) case commandKindStandby: if cmd.Standby == nil { return fmt.Sprintf("kind=%s standby=", cmd.Kind) } return fmt.Sprintf("kind=%s standby=%t", cmd.Kind, *cmd.Standby) default: return fmt.Sprintf("kind=%s id=%d value=%d", cmd.Kind, cmd.ID, cmd.Value) } } func decodeInt16Value(raw json.RawMessage) (int16, error) { if len(raw) == 0 { return 0, errors.New(`missing required field "value"`) } var value int16 if err := json.Unmarshal(raw, &value); err != nil { return 0, fmt.Errorf(`field "value" must be a signed integer: %w`, err) } return value, nil } func decodeStandbyValue(msg writeCommandPayload) (bool, error) { if msg.Standby != nil { return *msg.Standby, nil } if len(msg.Value) == 0 { return false, errors.New(`missing required field "standby" (or boolean "value")`) } var boolValue bool if err := json.Unmarshal(msg.Value, &boolValue); err == nil { return boolValue, nil } var stringValue string if err := json.Unmarshal(msg.Value, &stringValue); err == nil { switch strings.ToLower(strings.TrimSpace(stringValue)) { case "1", "true", "on", "enable", "enabled": return true, nil case "0", "false", "off", "disable", "disabled": return false, nil } } var intValue int if err := json.Unmarshal(msg.Value, &intValue); err == nil { switch intValue { case 1: return true, nil case 0: return false, nil } } return false, errors.New(`field "standby" must be true/false`) } func publishWriteStatus(client mqtt.Client, topic string, status writeStatus) { if topic == "" { return } if err := publishJSON(client, topic, status, 1, false); err != nil { log.Errorf("Could not publish command status to MQTT topic %q: %v", topic, err) } } func publishHADiscovery(client mqtt.Client, config Config) error { definitions := buildHADiscoveryDefinitions(config) prefix := haDiscoveryPrefix(config) nodeID := haNodeID(config) for _, def := range definitions { topic := fmt.Sprintf("%s/%s/%s/%s/config", prefix, def.Component, nodeID, def.ObjectID) if err := publishJSON(client, topic, def.Config, 1, true); err != nil { return fmt.Errorf("could not publish discovery for %s/%s: %w", def.Component, def.ObjectID, err) } } return nil } func buildHADiscoveryDefinitions(config Config) []haDiscoveryDefinition { if !config.HomeAssistant.Enabled { return nil } nodeID := haNodeID(config) device := map[string]any{ "identifiers": []string{fmt.Sprintf("invertergui_%s", nodeID)}, "name": haDeviceName(config), "manufacturer": "Victron Energy", "model": "MultiPlus", "sw_version": "invertergui", } availabilityTopic := haAvailabilityTopic(config) stateTopic := config.Topic sensors := []haDiscoveryDefinition{ buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_voltage", "Battery Voltage", "{{ value_json.BatVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_current", "Battery Current", "{{ value_json.BatCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_charge", "Battery Charge", "{{ ((value_json.ChargeState | float(0)) * 100) | round(1) }}", "%", "battery", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_voltage", "Input Voltage", "{{ value_json.InVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_current", "Input Current", "{{ value_json.InCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_frequency", "Input Frequency", "{{ value_json.InFrequency }}", "Hz", "frequency", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_voltage", "Output Voltage", "{{ value_json.OutVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_current", "Output Current", "{{ value_json.OutCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_frequency", "Output Frequency", "{{ value_json.OutFrequency }}", "Hz", "frequency", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_power", "Input Power", "{{ ((value_json.InVoltage | float(0)) * (value_json.InCurrent | float(0))) | round(1) }}", "VA", "", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_power", "Output Power", "{{ ((value_json.OutVoltage | float(0)) * (value_json.OutCurrent | float(0))) | round(1) }}", "VA", "", "measurement"), { Component: "binary_sensor", ObjectID: "data_valid", Config: map[string]any{ "name": "Data Valid", "unique_id": fmt.Sprintf("%s_data_valid", nodeID), "state_topic": stateTopic, "value_template": "{{ value_json.Valid }}", "payload_on": "true", "payload_off": "false", "availability_topic": availabilityTopic, "device": device, "entity_category": "diagnostic", }, }, } if config.CommandTopic != "" { sensors = append(sensors, haDiscoveryDefinition{ Component: "select", ObjectID: "remote_panel_mode", Config: map[string]any{ "name": "Remote Panel Mode", "unique_id": fmt.Sprintf("%s_remote_panel_mode", nodeID), "state_topic": haPanelSwitchStateTopic(config), "command_topic": config.CommandTopic, "command_template": "{\"kind\":\"panel_state\",\"switch\":\"{{ value }}\"}", "options": []string{"charger_only", "inverter_only", "on", "off"}, "availability_topic": availabilityTopic, "device": device, "icon": "mdi:transmission-tower-export", }, }, haDiscoveryDefinition{ Component: "number", ObjectID: "remote_panel_current_limit", Config: map[string]any{ "name": "Remote Panel Current Limit", "unique_id": fmt.Sprintf("%s_remote_panel_current_limit", nodeID), "state_topic": haCurrentLimitStateTopic(config), "command_topic": config.CommandTopic, "command_template": "{\"kind\":\"panel_state\",\"current_limit\":{{ value | float(0) }}}", "unit_of_measurement": "A", "device_class": "current", "mode": "box", "min": 0, "max": 100, "step": 0.1, "availability_topic": availabilityTopic, "device": device, "icon": "mdi:current-ac", }, }, haDiscoveryDefinition{ Component: "switch", ObjectID: "remote_panel_standby", Config: map[string]any{ "name": "Remote Panel Standby", "unique_id": fmt.Sprintf("%s_remote_panel_standby", nodeID), "state_topic": haStandbyStateTopic(config), "command_topic": config.CommandTopic, "payload_on": "{\"kind\":\"standby\",\"standby\":true}", "payload_off": "{\"kind\":\"standby\",\"standby\":false}", "state_on": "ON", "state_off": "OFF", "availability_topic": availabilityTopic, "device": device, "icon": "mdi:power-sleep", }, }, ) } return sensors } func buildHASensor(device map[string]any, availabilityTopic, stateTopic, nodeID, objectID, name, valueTemplate, unit, deviceClass, stateClass string) haDiscoveryDefinition { config := map[string]any{ "name": name, "unique_id": fmt.Sprintf("%s_%s", nodeID, objectID), "state_topic": stateTopic, "value_template": valueTemplate, "availability_topic": availabilityTopic, "device": device, } if unit != "" { config["unit_of_measurement"] = unit } if deviceClass != "" { config["device_class"] = deviceClass } if stateClass != "" { config["state_class"] = stateClass } return haDiscoveryDefinition{ Component: "sensor", ObjectID: objectID, Config: config, } } func publishHAAvailability(client mqtt.Client, config Config, status string) error { return publishString(client, haAvailabilityTopic(config), status, 1, true) } func publishHAControlState(client mqtt.Client, config Config, cmd writeCommand) error { switch cmd.Kind { case commandKindPanel: if err := publishString(client, haPanelSwitchStateTopic(config), cmd.SwitchName, 1, true); err != nil { return err } if cmd.CurrentLimitA != nil { limit := strconv.FormatFloat(*cmd.CurrentLimitA, 'f', 1, 64) if err := publishString(client, haCurrentLimitStateTopic(config), limit, 1, true); err != nil { return err } } case commandKindStandby: if cmd.Standby == nil { return nil } state := "OFF" if *cmd.Standby { state = "ON" } if err := publishString(client, haStandbyStateTopic(config), state, 1, true); err != nil { return err } } return nil } func publishJSON(client mqtt.Client, topic string, payload any, qos byte, retained bool) error { if topic == "" { return errors.New("topic is empty") } data, err := json.Marshal(payload) if err != nil { return fmt.Errorf("could not marshal payload: %w", err) } t := client.Publish(topic, qos, retained, data) t.Wait() if t.Error() != nil { return t.Error() } return nil } func publishString(client mqtt.Client, topic, payload string, qos byte, retained bool) error { if topic == "" { return errors.New("topic is empty") } t := client.Publish(topic, qos, retained, payload) t.Wait() if t.Error() != nil { return t.Error() } return nil } func mqttTopicRoot(topic string) string { t := strings.Trim(strings.TrimSpace(topic), "/") if t == "" { return "invertergui" } if strings.HasSuffix(t, "/updates") { root := strings.TrimSuffix(t, "/updates") if root != "" { return root } } return t } func haAvailabilityTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/availability", mqttTopicRoot(config.Topic)) } func haPanelSwitchStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_mode/state", mqttTopicRoot(config.Topic)) } func haCurrentLimitStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_current_limit/state", mqttTopicRoot(config.Topic)) } func haStandbyStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_standby/state", mqttTopicRoot(config.Topic)) } func haDiscoveryPrefix(config Config) string { prefix := strings.Trim(strings.TrimSpace(config.HomeAssistant.DiscoveryPrefix), "/") if prefix == "" { return "homeassistant" } return prefix } func haNodeID(config Config) string { nodeID := normalizeID(config.HomeAssistant.NodeID) if nodeID == "" { nodeID = normalizeID(config.ClientID) } if nodeID == "" { return "invertergui" } return nodeID } func haDeviceName(config Config) string { name := strings.TrimSpace(config.HomeAssistant.DeviceName) if name == "" { return "Victron Inverter" } return name } func normalizeID(in string) string { trimmed := strings.TrimSpace(strings.ToLower(in)) if trimmed == "" { return "" } var b strings.Builder lastUnderscore := false for _, r := range trimmed { valid := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' || r == '_' if valid { b.WriteRune(r) lastUnderscore = false continue } if !lastUnderscore { b.WriteRune('_') lastUnderscore = true } } return strings.Trim(b.String(), "_") } func copyBoolPtr(in *bool) *bool { if in == nil { return nil } value := *in return &value } func getOpts(config Config) *mqtt.ClientOptions { opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) if config.Username != "" { opts.SetUsername(config.Username) } if config.Password != "" { opts.SetPassword(config.Password) } if config.HomeAssistant.Enabled { opts.SetWill(haAvailabilityTopic(config), "offline", 1, true) } opts.SetKeepAlive(keepAlive) opts.SetOnConnectHandler(func(mqtt.Client) { log.Info("Client connected to broker") }) opts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { log.Errorf("Client connection to broker lost: %v", err) }) return opts }