package mqttclient import ( "encoding/json" "errors" "fmt" "sort" "strconv" "strings" "sync" "time" "git.coadcorp.com/nathan/invertergui/mk2driver" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/sirupsen/logrus" ) var log = logrus.WithField("ctx", "inverter-gui-mqtt") const keepAlive = 5 * time.Second const ( commandKindSetting = "setting" commandKindRAMVar = "ram_var" commandKindPanel = "panel_state" commandKindStandby = "standby" commandKindESSMode = "ess_mode" commandKindESSSet = "ess_setpoint" commandKindESSMaxC = "ess_max_charge_power" commandKindESSMaxD = "ess_max_discharge_power" writeStatusOK = "ok" writeStatusError = "error" ) type HomeAssistantConfig struct { Enabled bool DiscoveryPrefix string NodeID string DeviceName string } type VenusConfig struct { Enabled bool PortalID string Service string SubscribeWrites bool TopicPrefix string GuideCompat bool } // Config sets MQTT client configuration type Config struct { Broker string ClientID string Topic string CommandTopic string StatusTopic string DeviceID string HistorySize int InstanceID int Phase string PhaseGroup string HomeAssistant HomeAssistantConfig Venus VenusConfig Username string Password string } type writeCommand struct { Source mk2driver.CommandSource RequestID string Kind string ID uint16 Value int16 FloatValue *float64 HasSwitch bool SwitchState mk2driver.PanelSwitchState SwitchName string CurrentLimitA *float64 Standby *bool } type writeCommandPayload struct { RequestID string `json:"request_id"` Kind string `json:"kind"` Type string `json:"type"` ID *uint16 `json:"id"` Value json.RawMessage `json:"value"` Switch string `json:"switch"` SwitchState string `json:"switch_state"` CurrentLimitA *float64 `json:"current_limit"` Standby *bool `json:"standby"` } type writeStatus struct { RequestID string `json:"request_id,omitempty"` Status string `json:"status"` Kind string `json:"kind,omitempty"` ID uint16 `json:"id"` Value int16 `json:"value"` FloatValue *float64 `json:"float_value,omitempty"` Switch string `json:"switch,omitempty"` CurrentLimitA *float64 `json:"current_limit,omitempty"` Standby *bool `json:"standby,omitempty"` Error string `json:"error,omitempty"` Timestamp time.Time `json:"timestamp"` } type telemetryCache struct { mu sync.Mutex last telemetrySnapshot } type essControlSnapshot struct { HasSetpoint bool SetpointW float64 HasMaxCharge bool MaxChargeW float64 HasMaxDischarge bool MaxDischargeW float64 HasBatteryLife bool BatteryLifeState int } type essControlCache struct { mu sync.Mutex hasSetpoint bool setpointW float64 hasMaxCharge bool maxChargeW float64 hasMaxDischarge bool maxDischargeW float64 hasBatteryLife bool batteryLifeState int } type haDiscoveryDefinition struct { Component string ObjectID string Config map[string]any } type panelStateCache struct { mu sync.Mutex hasSwitch bool switchName string switchState mk2driver.PanelSwitchState hasCurrent bool currentA float64 hasStandby bool standby bool } type panelStateSnapshot struct { Switch string `json:"switch,omitempty"` CurrentLimit float64 `json:"current_limit,omitempty"` Standby bool `json:"standby"` HasSwitch bool `json:"has_switch"` HasCurrent bool `json:"has_current_limit"` HasStandby bool `json:"has_standby"` } type telemetrySnapshot struct { Timestamp time.Time `json:"timestamp"` Valid bool `json:"valid"` InputVoltage float64 `json:"input_voltage"` InputCurrent float64 `json:"input_current"` InputFrequency float64 `json:"input_frequency"` OutputVoltage float64 `json:"output_voltage"` OutputCurrent float64 `json:"output_current"` OutputFrequency float64 `json:"output_frequency"` BatteryVoltage float64 `json:"battery_voltage"` BatteryCurrent float64 `json:"battery_current"` BatteryCharge float64 `json:"battery_charge"` InputPower float64 `json:"input_power"` OutputPower float64 `json:"output_power"` BatteryPower float64 `json:"battery_power"` LEDs map[string]string `json:"leds"` } type historySample struct { Timestamp time.Time InputPowerVA float64 OutputPowerVA float64 BatteryPowerW float64 BatteryVoltage float64 Valid bool State operatingState } type historySummary struct { Samples int `json:"samples"` WindowSeconds float64 `json:"window_seconds"` AverageInputPower float64 `json:"average_input_power"` AverageOutputPower float64 `json:"average_output_power"` AverageBatteryPower float64 `json:"average_battery_power"` MaxOutputPower float64 `json:"max_output_power"` MinBatteryVoltage float64 `json:"min_battery_voltage"` EnergyInWh float64 `json:"energy_in_wh"` EnergyOutWh float64 `json:"energy_out_wh"` BatteryChargeWh float64 `json:"battery_charge_wh"` BatteryDischargeWh float64 `json:"battery_discharge_wh"` UptimeSeconds float64 `json:"uptime_seconds"` FaultCount uint64 `json:"fault_count"` LastFaultAt string `json:"last_fault_at,omitempty"` CurrentState string `json:"current_state"` } type orchestrationState struct { DeviceID string `json:"device_id"` InstanceID int `json:"instance_id"` Phase string `json:"phase"` PhaseGroup string `json:"phase_group"` Timestamp time.Time `json:"timestamp"` OperatingState string `json:"operating_state"` Telemetry telemetrySnapshot `json:"telemetry"` PanelState panelStateSnapshot `json:"panel_state"` History historySummary `json:"history"` Alarms []alarmState `json:"alarms"` } type alarmState struct { Code string `json:"code"` Level string `json:"level"` Message string `json:"message"` ActiveSince time.Time `json:"active_since"` } type alarmTracker struct { mu sync.Mutex active map[string]alarmState rules map[string]*alarmRule debounceOn time.Duration debounceOff time.Duration faultCount uint64 lastFault time.Time } type historyTracker struct { mu sync.Mutex max int samples []historySample energyInWh float64 energyOutWh float64 batteryChargeWh float64 batteryDischargeWh float64 uptimeSeconds float64 lastSample *historySample lastState operatingState lastFaultCount uint64 lastFaultAt time.Time } type alarmRule struct { code string level string message string pendingFrom time.Time clearFrom time.Time activeSince time.Time active bool } type diagnosticsBundle struct { DeviceID string `json:"device_id"` InstanceID int `json:"instance_id"` Phase string `json:"phase"` PhaseGroup string `json:"phase_group"` GeneratedAt time.Time `json:"generated_at"` HealthScore int `json:"health_score"` Driver *mk2driver.DriverDiagnostics `json:"driver,omitempty"` Commands []mk2driver.CommandEvent `json:"commands,omitempty"` } type operatingState string const ( operatingStateOff operatingState = "Off" operatingStateInverter operatingState = "Inverter" operatingStateCharger operatingState = "Charger" operatingStatePassthru operatingState = "Passthru" operatingStateFault operatingState = "Fault" ) // New creates an MQTT client that publishes MK2 updates and optionally handles setting write commands. func New(mk2 mk2driver.Mk2, writer mk2driver.SettingsWriter, config Config) error { if strings.TrimSpace(config.DeviceID) == "" { config.DeviceID = haNodeID(config) } if config.HistorySize <= 0 { config.HistorySize = 120 } if strings.TrimSpace(config.Venus.PortalID) == "" { config.Venus.PortalID = config.DeviceID } if strings.TrimSpace(config.Venus.Service) == "" { config.Venus.Service = "vebus/257" } if strings.TrimSpace(config.Venus.TopicPrefix) != "" { config.Venus.TopicPrefix = strings.Trim(strings.TrimSpace(config.Venus.TopicPrefix), "/") } if strings.TrimSpace(config.Phase) == "" { config.Phase = "L1" } if strings.TrimSpace(config.PhaseGroup) == "" { config.PhaseGroup = "default" } if config.InstanceID < 0 { config.InstanceID = 0 } log.WithFields(logrus.Fields{ "broker": config.Broker, "client_id": config.ClientID, "topic": config.Topic, "command_topic": config.CommandTopic, "status_topic": config.StatusTopic, "device_id": config.DeviceID, "history_size": config.HistorySize, "ha_enabled": config.HomeAssistant.Enabled, "ha_node_id": config.HomeAssistant.NodeID, "ha_device_name": config.HomeAssistant.DeviceName, "venus_enabled": config.Venus.Enabled, "venus_portal": config.Venus.PortalID, "venus_service": config.Venus.Service, "venus_prefix": config.Venus.TopicPrefix, "venus_guide": config.Venus.GuideCompat, "instance_id": config.InstanceID, "phase": config.Phase, "phase_group": config.PhaseGroup, }).Info("Initializing MQTT client") c := mqtt.NewClient(getOpts(config)) if token := c.Connect(); token.Wait() && token.Error() != nil { return token.Error() } cache := &panelStateCache{} alarms := newAlarmTracker() history := newHistoryTracker(config.HistorySize) telemetry := &telemetryCache{} ess := newESSControlCache() diagControl, hasDiag := mk2.(mk2driver.DiagnosticsControl) commandHistory, hasCommandHistory := writer.(mk2driver.CommandHistoryProvider) if config.HomeAssistant.Enabled { if err := publishHAAvailability(c, config, "online"); err != nil { return fmt.Errorf("could not publish Home Assistant availability: %w", err) } if err := publishHADiscovery(c, config); err != nil { return fmt.Errorf("could not publish Home Assistant discovery payloads: %w", err) } if writer != nil { if err := subscribeHAPanelModeState(c, config, cache); err != nil { log.Warnf("Could not subscribe to Home Assistant panel mode state topic: %v", err) } if err := subscribeHACurrentLimitState(c, config, cache); err != nil { log.Warnf("Could not subscribe to Home Assistant current limit state topic: %v", err) } if err := subscribeHAStandbyState(c, config, cache); err != nil { log.Warnf("Could not subscribe to Home Assistant standby state topic: %v", err) } } } if config.CommandTopic != "" { if writer == nil { log.Warnf("MQTT command topic %q configured, but no settings writer is available", config.CommandTopic) } else { t := c.Subscribe(config.CommandTopic, 1, commandHandler(c, writer, config, cache, alarms, ess, telemetry)) t.Wait() if t.Error() != nil { return fmt.Errorf("could not subscribe to MQTT command topic %q: %w", config.CommandTopic, t.Error()) } log.Infof("Subscribed to MQTT command topic: %s", config.CommandTopic) } } if config.Venus.Enabled { if err := publishVenusServiceInfo(c, config); err != nil { log.WithError(err).Warn("Could not publish initial Venus service metadata") } if writer != nil && config.Venus.SubscribeWrites { if err := subscribeVenusWriteTopics(c, writer, config, cache, alarms, ess, telemetry); err != nil { log.WithError(err).Warn("Could not subscribe to Venus write topics") } } } go func() { for e := range mk2.C() { if e == nil { log.Debug("Skipping nil MK2 event for MQTT publish") continue } snapshot := buildTelemetrySnapshot(e) telemetry.set(snapshot) panel := cache.snapshot() essSnapshot := ess.snapshot() activeAlarms := alarms.Update(snapshot) state := deriveOperatingState(snapshot, panel, activeAlarms) faultCount, lastFaultAt := alarms.Stats() var summary historySummary if snapshot.Valid { summary = history.Add(snapshot, state, faultCount, lastFaultAt) } else { summary = history.Summary(state, faultCount, lastFaultAt) } if err := publishOrchestrationTopics(c, config, snapshot, panel, summary, activeAlarms, state); err != nil { log.WithError(err).Warn("Could not publish orchestration MQTT topics") } if config.Venus.Enabled { if err := publishVenusTelemetry(c, config, snapshot, panel, activeAlarms, state, summary, essSnapshot); err != nil { log.WithError(err).Warn("Could not publish Venus MQTT topics") } } if hasDiag { var commands []mk2driver.CommandEvent if hasCommandHistory { commands = commandHistory.History(25) } if err := publishDiagnostics(c, config, diagControl.DriverDiagnostics(100), commands); err != nil { log.WithError(err).Warn("Could not publish diagnostics topic") } } if !e.Valid { log.Debug("Skipping invalid MK2 event for legacy MQTT update topic") continue } if err := publishJSON(c, config.Topic, e, 0, false); err != nil { log.Errorf("Could not publish update to MQTT topic %q: %v", config.Topic, err) } else { log.WithField("topic", config.Topic).Debug("Published MK2 update to MQTT") } } }() log.Info("MQTT client setup complete") return nil } func subscribeHAPanelModeState(client mqtt.Client, config Config, cache *panelStateCache) error { if cache == nil { return nil } stateTopic := haPanelSwitchStateTopic(config) log.WithField("topic", stateTopic).Info("Subscribing to Home Assistant mode state topic for panel cache") t := client.Subscribe(stateTopic, 1, func(_ mqtt.Client, msg mqtt.Message) { switchState, switchName, err := normalizePanelSwitch(string(msg.Payload())) if err != nil { log.WithFields(logrus.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).WithError(err).Warn("Ignoring invalid Home Assistant mode state payload") return } cache.remember(writeCommand{ Kind: commandKindPanel, HasSwitch: true, SwitchState: switchState, SwitchName: switchName, }) log.WithFields(logrus.Fields{ "topic": msg.Topic(), "switch_name": switchName, }).Debug("Updated panel mode cache from Home Assistant state topic") }) t.Wait() return t.Error() } func subscribeHACurrentLimitState(client mqtt.Client, config Config, cache *panelStateCache) error { if cache == nil { return nil } stateTopic := haCurrentLimitStateTopic(config) log.WithField("topic", stateTopic).Info("Subscribing to Home Assistant current limit state topic for panel cache") t := client.Subscribe(stateTopic, 1, func(_ mqtt.Client, msg mqtt.Message) { trimmed := strings.TrimSpace(string(msg.Payload())) if trimmed == "" { return } limit, err := strconv.ParseFloat(trimmed, 64) if err != nil { log.WithFields(logrus.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).WithError(err).Warn("Ignoring invalid Home Assistant current limit payload") return } cache.setCurrentLimit(limit) }) t.Wait() return t.Error() } func subscribeHAStandbyState(client mqtt.Client, config Config, cache *panelStateCache) error { if cache == nil { return nil } stateTopic := haStandbyStateTopic(config) log.WithField("topic", stateTopic).Info("Subscribing to Home Assistant standby state topic for panel cache") t := client.Subscribe(stateTopic, 1, func(_ mqtt.Client, msg mqtt.Message) { standby, err := parseStandbyText(string(msg.Payload())) if err != nil { log.WithFields(logrus.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).WithError(err).Warn("Ignoring invalid Home Assistant standby payload") return } cache.setStandby(standby) }) t.Wait() return t.Error() } func commandHandler(client mqtt.Client, writer mk2driver.SettingsWriter, config Config, cache *panelStateCache, alarms *alarmTracker, ess *essControlCache, telemetry *telemetryCache) mqtt.MessageHandler { if cache == nil { cache = &panelStateCache{} } return func(_ mqtt.Client, msg mqtt.Message) { log.WithFields(logrus.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).Debug("Received MQTT command message") cmd, err := decodeWriteCommand(msg.Payload()) if err != nil { log.Errorf("Invalid MQTT write command payload from topic %q: %v", msg.Topic(), err) publishWriteStatus(client, config.StatusTopic, writeStatus{ Status: writeStatusError, Error: err.Error(), Timestamp: time.Now().UTC(), }) return } status := applyWriteCommand(client, writer, config, cache, alarms, ess, telemetry, cmd) publishWriteStatus(client, config.StatusTopic, status) } } func applyWriteCommand(client mqtt.Client, writer mk2driver.SettingsWriter, config Config, cache *panelStateCache, alarms *alarmTracker, ess *essControlCache, telemetry *telemetryCache, cmd writeCommand) writeStatus { if ess == nil { ess = newESSControlCache() } if telemetry == nil { telemetry = &telemetryCache{} } execCmd := cmd status := writeStatus{ RequestID: cmd.RequestID, Status: writeStatusOK, Kind: cmd.Kind, Timestamp: time.Now().UTC(), } switch cmd.Kind { case commandKindPanel: resolved, err := cache.resolvePanelCommand(cmd) if err != nil { status.Status = writeStatusError status.Error = err.Error() log.Errorf("Invalid MQTT write command %s: %v", formatWriteCommandLog(cmd), err) return status } execCmd = resolved status.Switch = execCmd.SwitchName status.CurrentLimitA = execCmd.CurrentLimitA case commandKindStandby: status.Standby = copyBoolPtr(execCmd.Standby) case commandKindESSSet, commandKindESSMode, commandKindESSMaxC, commandKindESSMaxD: if cmd.FloatValue != nil { status.FloatValue = float64Ptr(*cmd.FloatValue) } mapped, err := resolveESSWriteCommand(cmd, ess, telemetry) if err != nil { status.Status = writeStatusError status.Error = err.Error() log.Errorf("Invalid ESS compatibility command %s: %v", formatWriteCommandLog(cmd), err) return status } if mapped != nil { execCmd = *mapped status.Kind = execCmd.Kind status.Switch = execCmd.SwitchName status.CurrentLimitA = execCmd.CurrentLimitA } else { if config.Venus.Enabled { if err := publishVenusESSState(client, config, ess.snapshot()); err != nil { log.WithError(err).Warn("Could not publish ESS compatibility state") } } return status } default: status.ID = cmd.ID status.Value = cmd.Value } if err := executeWriteCommand(writer, execCmd); err != nil { status.Status = writeStatusError status.Error = err.Error() log.Errorf("Failed MQTT write command %s: %v", formatWriteCommandLog(execCmd), err) if alarms != nil { active := alarms.RecordCommandFailure(err, time.Now().UTC()) if publishErr := publishActiveAlarms(client, config, active); publishErr != nil { log.WithError(publishErr).Warn("Could not publish active alarms after command failure") } } return status } if alarms != nil { alarms.RecordCommandSuccess(time.Now().UTC()) } log.Infof("Applied MQTT write command %s", formatWriteCommandLog(execCmd)) cache.remember(execCmd) if config.HomeAssistant.Enabled { if err := publishHAControlState(client, config, execCmd); err != nil { log.Errorf("Could not publish Home Assistant control state update: %v", err) } } if config.Venus.Enabled { if err := publishVenusControlState(client, config, execCmd, cache.snapshot()); err != nil { log.Errorf("Could not publish Venus control state update: %v", err) } if err := publishVenusESSState(client, config, ess.snapshot()); err != nil { log.WithError(err).Warn("Could not publish ESS compatibility state") } } return status } func (c *panelStateCache) resolvePanelCommand(cmd writeCommand) (writeCommand, error) { if cmd.Kind != commandKindPanel { return cmd, nil } if cmd.HasSwitch { return cmd, nil } c.mu.Lock() defer c.mu.Unlock() if !c.hasSwitch { return writeCommand{}, errors.New("panel_state command missing switch and no prior mode is known; set mode once first") } cmd.HasSwitch = true cmd.SwitchName = c.switchName cmd.SwitchState = c.switchState log.WithField("switch", cmd.SwitchName).Debug("Resolved panel command switch from cache") return cmd, nil } func (c *panelStateCache) remember(cmd writeCommand) { c.mu.Lock() defer c.mu.Unlock() switch cmd.Kind { case commandKindPanel: if cmd.HasSwitch { c.hasSwitch = true c.switchName = cmd.SwitchName c.switchState = cmd.SwitchState } if cmd.CurrentLimitA != nil { c.hasCurrent = true c.currentA = *cmd.CurrentLimitA } case commandKindStandby: if cmd.Standby != nil { c.hasStandby = true c.standby = *cmd.Standby } } } func (c *panelStateCache) setCurrentLimit(limit float64) { c.mu.Lock() c.hasCurrent = true c.currentA = limit c.mu.Unlock() } func (c *panelStateCache) setStandby(standby bool) { c.mu.Lock() c.hasStandby = true c.standby = standby c.mu.Unlock() } func (c *panelStateCache) snapshot() panelStateSnapshot { c.mu.Lock() defer c.mu.Unlock() return panelStateSnapshot{ Switch: c.switchName, CurrentLimit: c.currentA, Standby: c.standby, HasSwitch: c.hasSwitch, HasCurrent: c.hasCurrent, HasStandby: c.hasStandby, } } func newESSControlCache() *essControlCache { return &essControlCache{ hasBatteryLife: true, batteryLifeState: 10, } } func (e *essControlCache) snapshot() essControlSnapshot { e.mu.Lock() defer e.mu.Unlock() return essControlSnapshot{ HasSetpoint: e.hasSetpoint, SetpointW: e.setpointW, HasMaxCharge: e.hasMaxCharge, MaxChargeW: e.maxChargeW, HasMaxDischarge: e.hasMaxDischarge, MaxDischargeW: e.maxDischargeW, HasBatteryLife: e.hasBatteryLife, BatteryLifeState: e.batteryLifeState, } } func (e *essControlCache) setSetpoint(value float64) { e.mu.Lock() e.hasSetpoint = true e.setpointW = value e.mu.Unlock() } func (e *essControlCache) setMaxCharge(value float64) { e.mu.Lock() e.hasMaxCharge = true e.maxChargeW = value e.mu.Unlock() } func (e *essControlCache) setMaxDischarge(value float64) { e.mu.Lock() e.hasMaxDischarge = true e.maxDischargeW = value e.mu.Unlock() } func (e *essControlCache) setBatteryLifeState(value int) { e.mu.Lock() e.hasBatteryLife = true e.batteryLifeState = value e.mu.Unlock() } func (t *telemetryCache) set(snapshot telemetrySnapshot) { t.mu.Lock() t.last = snapshot t.mu.Unlock() } func (t *telemetryCache) snapshot() telemetrySnapshot { t.mu.Lock() defer t.mu.Unlock() return t.last } func resolveESSWriteCommand(cmd writeCommand, ess *essControlCache, telemetry *telemetryCache) (*writeCommand, error) { if ess == nil { ess = newESSControlCache() } switch cmd.Kind { case commandKindESSMaxC: if cmd.FloatValue == nil { return nil, errors.New("missing ess max charge power value") } if *cmd.FloatValue < 0 { return nil, fmt.Errorf("ess max charge power must be >= 0, got %.3f", *cmd.FloatValue) } ess.setMaxCharge(*cmd.FloatValue) return nil, nil case commandKindESSMaxD: if cmd.FloatValue == nil { return nil, errors.New("missing ess max discharge power value") } if *cmd.FloatValue < 0 { return nil, fmt.Errorf("ess max discharge power must be >= 0, got %.3f", *cmd.FloatValue) } ess.setMaxDischarge(*cmd.FloatValue) return nil, nil case commandKindESSMode: mode := int(cmd.Value) if mode == 0 && cmd.FloatValue != nil { mode = int(*cmd.FloatValue) } if mode != 9 && mode != 10 { return nil, fmt.Errorf("unsupported ess mode %d; expected 9 or 10", mode) } ess.setBatteryLifeState(mode) switchState := mk2driver.PanelSwitchOn switchName := "on" if mode == 9 { switchState = mk2driver.PanelSwitchChargerOnly switchName = "charger_only" } mapped := writeCommand{ Source: cmd.Source, RequestID: cmd.RequestID, Kind: commandKindPanel, HasSwitch: true, SwitchState: switchState, SwitchName: switchName, } return &mapped, nil case commandKindESSSet: if cmd.FloatValue == nil { return nil, errors.New("missing ess setpoint value") } setpoint := *cmd.FloatValue essSnapshot := ess.snapshot() if setpoint > 0 && essSnapshot.HasMaxCharge && setpoint > essSnapshot.MaxChargeW { setpoint = essSnapshot.MaxChargeW } if setpoint < 0 && essSnapshot.HasMaxDischarge && -setpoint > essSnapshot.MaxDischargeW { setpoint = -essSnapshot.MaxDischargeW } ess.setSetpoint(setpoint) if setpoint > 1 { voltage := 230.0 if telemetry != nil { last := telemetry.snapshot() if last.InputVoltage > 1 { voltage = last.InputVoltage } } currentLimit := setpoint / voltage mapped := writeCommand{ Source: cmd.Source, RequestID: cmd.RequestID, Kind: commandKindPanel, HasSwitch: true, SwitchState: mk2driver.PanelSwitchChargerOnly, SwitchName: "charger_only", CurrentLimitA: ¤tLimit, } return &mapped, nil } if setpoint < -1 { mapped := writeCommand{ Source: cmd.Source, RequestID: cmd.RequestID, Kind: commandKindPanel, HasSwitch: true, SwitchState: mk2driver.PanelSwitchInverterOnly, SwitchName: "inverter_only", } return &mapped, nil } mapped := writeCommand{ Source: cmd.Source, RequestID: cmd.RequestID, Kind: commandKindPanel, HasSwitch: true, SwitchState: mk2driver.PanelSwitchOn, SwitchName: "on", } return &mapped, nil default: return &cmd, nil } } func decodeWriteCommand(payload []byte) (writeCommand, error) { msg := writeCommandPayload{} if err := json.Unmarshal(payload, &msg); err != nil { return writeCommand{}, fmt.Errorf("invalid JSON payload: %w", err) } kind := msg.Kind if kind == "" { kind = msg.Type } normalizedKind, err := normalizeWriteKind(kind) if err != nil { return writeCommand{}, err } if normalizedKind == commandKindPanel { switchName := msg.Switch if switchName == "" { switchName = msg.SwitchState } hasSwitch := false switchState := mk2driver.PanelSwitchState(0) normalizedSwitchName := "" if switchName != "" { var err error switchState, normalizedSwitchName, err = normalizePanelSwitch(switchName) if err != nil { return writeCommand{}, err } hasSwitch = true } if msg.CurrentLimitA != nil && *msg.CurrentLimitA < 0 { return writeCommand{}, fmt.Errorf("current_limit must be >= 0, got %.3f", *msg.CurrentLimitA) } if !hasSwitch && msg.CurrentLimitA == nil { return writeCommand{}, errors.New(`missing required field "switch" (or "switch_state"), or "current_limit"`) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, RequestID: msg.RequestID, Kind: normalizedKind, HasSwitch: hasSwitch, SwitchState: switchState, SwitchName: normalizedSwitchName, CurrentLimitA: msg.CurrentLimitA, }, nil } if normalizedKind == commandKindStandby { standby, err := decodeStandbyValue(msg) if err != nil { return writeCommand{}, err } return writeCommand{ Source: mk2driver.CommandSourceMQTT, RequestID: msg.RequestID, Kind: normalizedKind, Standby: &standby, }, nil } if normalizedKind == commandKindESSSet || normalizedKind == commandKindESSMaxC || normalizedKind == commandKindESSMaxD { value, err := decodeFloatValue(msg.Value) if err != nil { return writeCommand{}, err } return writeCommand{ Source: mk2driver.CommandSourceMQTT, RequestID: msg.RequestID, Kind: normalizedKind, FloatValue: &value, }, nil } if normalizedKind == commandKindESSMode { modeValue, err := decodeIntValue(msg.Value) if err != nil { return writeCommand{}, err } modeFloat := float64(modeValue) return writeCommand{ Source: mk2driver.CommandSourceMQTT, RequestID: msg.RequestID, Kind: normalizedKind, Value: int16(modeValue), FloatValue: &modeFloat, }, nil } if msg.ID == nil { return writeCommand{}, errors.New(`missing required field "id"`) } value, err := decodeInt16Value(msg.Value) if err != nil { return writeCommand{}, err } return writeCommand{ Source: mk2driver.CommandSourceMQTT, RequestID: msg.RequestID, Kind: normalizedKind, ID: *msg.ID, Value: value, }, nil } func normalizeWriteKind(raw string) (string, error) { switch strings.ToLower(strings.TrimSpace(raw)) { case "setting", "settings": return commandKindSetting, nil case "ram", "ramvar", "ram_var", "ram-variable", "ramvariable": return commandKindRAMVar, nil case "panel", "panel_state", "switch", "remote_panel": return commandKindPanel, nil case "standby", "panel_standby", "remote_panel_standby": return commandKindStandby, nil case "ess_mode", "ess_battery_life_state", "battery_life_state": return commandKindESSMode, nil case "ess_setpoint", "ac_power_setpoint", "grid_setpoint": return commandKindESSSet, nil case "ess_max_charge_power", "max_charge_power": return commandKindESSMaxC, nil case "ess_max_discharge_power", "max_discharge_power": return commandKindESSMaxD, nil default: return "", fmt.Errorf("unsupported write command kind %q", raw) } } func normalizePanelSwitch(raw string) (mk2driver.PanelSwitchState, string, error) { switch strings.ToLower(strings.TrimSpace(raw)) { case "1", "charger_only", "charger-only", "charger": return mk2driver.PanelSwitchChargerOnly, "charger_only", nil case "2", "inverter_only", "inverter-only", "inverter": return mk2driver.PanelSwitchInverterOnly, "inverter_only", nil case "3", "on": return mk2driver.PanelSwitchOn, "on", nil case "4", "off": return mk2driver.PanelSwitchOff, "off", nil default: return 0, "", fmt.Errorf("unsupported panel switch %q", raw) } } func executeWriteCommand(writer mk2driver.SettingsWriter, cmd writeCommand) error { if writer == nil { return errors.New("settings writer is not available") } source := cmd.Source if source == "" { source = mk2driver.CommandSourceMQTT } if sourceAware, ok := writer.(mk2driver.SourceAwareSettingsWriter); ok { switch cmd.Kind { case commandKindPanel: if !cmd.HasSwitch { return errors.New("panel_state command requires a switch state") } return sourceAware.SetPanelStateWithSource(source, cmd.SwitchState, cmd.CurrentLimitA) case commandKindStandby: if cmd.Standby == nil { return errors.New("standby command missing standby value") } return sourceAware.SetStandbyWithSource(source, *cmd.Standby) case commandKindRAMVar: return sourceAware.WriteRAMVarWithSource(source, cmd.ID, cmd.Value) case commandKindSetting: return sourceAware.WriteSettingWithSource(source, cmd.ID, cmd.Value) default: return fmt.Errorf("unsupported write command kind %q", cmd.Kind) } } switch cmd.Kind { case commandKindPanel: if !cmd.HasSwitch { return errors.New("panel_state command requires a switch state") } return writer.SetPanelState(cmd.SwitchState, cmd.CurrentLimitA) case commandKindStandby: if cmd.Standby == nil { return errors.New("standby command missing standby value") } return writer.SetStandby(*cmd.Standby) case commandKindRAMVar: return writer.WriteRAMVar(cmd.ID, cmd.Value) case commandKindSetting: return writer.WriteSetting(cmd.ID, cmd.Value) default: return fmt.Errorf("unsupported write command kind %q", cmd.Kind) } } func formatWriteCommandLog(cmd writeCommand) string { switch cmd.Kind { case commandKindPanel: switchName := cmd.SwitchName if switchName == "" { switchName = "" } if cmd.CurrentLimitA == nil { return fmt.Sprintf("kind=%s switch=%s", cmd.Kind, switchName) } return fmt.Sprintf("kind=%s switch=%s current_limit=%.3f", cmd.Kind, switchName, *cmd.CurrentLimitA) case commandKindStandby: if cmd.Standby == nil { return fmt.Sprintf("kind=%s standby=", cmd.Kind) } return fmt.Sprintf("kind=%s standby=%t", cmd.Kind, *cmd.Standby) case commandKindESSSet, commandKindESSMaxC, commandKindESSMaxD: if cmd.FloatValue == nil { return fmt.Sprintf("kind=%s value=", cmd.Kind) } return fmt.Sprintf("kind=%s value=%.3f", cmd.Kind, *cmd.FloatValue) case commandKindESSMode: return fmt.Sprintf("kind=%s value=%d", cmd.Kind, cmd.Value) default: return fmt.Sprintf("kind=%s id=%d value=%d", cmd.Kind, cmd.ID, cmd.Value) } } func decodeInt16Value(raw json.RawMessage) (int16, error) { if len(raw) == 0 { return 0, errors.New(`missing required field "value"`) } var value int16 if err := json.Unmarshal(raw, &value); err != nil { return 0, fmt.Errorf(`field "value" must be a signed integer: %w`, err) } return value, nil } func decodeIntValue(raw json.RawMessage) (int, error) { if len(raw) == 0 { return 0, errors.New(`missing required field "value"`) } var intValue int if err := json.Unmarshal(raw, &intValue); err == nil { return intValue, nil } var floatValue float64 if err := json.Unmarshal(raw, &floatValue); err == nil { return int(floatValue), nil } var stringValue string if err := json.Unmarshal(raw, &stringValue); err == nil { parsed, parseErr := strconv.Atoi(strings.TrimSpace(stringValue)) if parseErr != nil { return 0, fmt.Errorf(`field "value" must be integer-like: %w`, parseErr) } return parsed, nil } return 0, errors.New(`field "value" must be integer-like`) } func decodeFloatValue(raw json.RawMessage) (float64, error) { if len(raw) == 0 { return 0, errors.New(`missing required field "value"`) } var floatValue float64 if err := json.Unmarshal(raw, &floatValue); err == nil { return floatValue, nil } var intValue int if err := json.Unmarshal(raw, &intValue); err == nil { return float64(intValue), nil } var stringValue string if err := json.Unmarshal(raw, &stringValue); err == nil { parsed, parseErr := strconv.ParseFloat(strings.TrimSpace(stringValue), 64) if parseErr != nil { return 0, fmt.Errorf(`field "value" must be numeric: %w`, parseErr) } return parsed, nil } return 0, errors.New(`field "value" must be numeric`) } func parseStandbyText(raw string) (bool, error) { switch strings.ToLower(strings.TrimSpace(raw)) { case "1", "true", "on", "enable", "enabled": return true, nil case "0", "false", "off", "disable", "disabled": return false, nil default: return false, fmt.Errorf("field \"standby\" must be true/false, got %q", raw) } } func decodeStandbyValue(msg writeCommandPayload) (bool, error) { if msg.Standby != nil { return *msg.Standby, nil } if len(msg.Value) == 0 { return false, errors.New(`missing required field "standby" (or boolean "value")`) } var boolValue bool if err := json.Unmarshal(msg.Value, &boolValue); err == nil { return boolValue, nil } var stringValue string if err := json.Unmarshal(msg.Value, &stringValue); err == nil { return parseStandbyText(stringValue) } var intValue int if err := json.Unmarshal(msg.Value, &intValue); err == nil { switch intValue { case 1: return true, nil case 0: return false, nil } } return false, errors.New(`field "standby" must be true/false`) } func publishWriteStatus(client mqtt.Client, topic string, status writeStatus) { if topic == "" { log.Debug("Skipping write status publish; status topic is empty") return } if err := publishJSON(client, topic, status, 1, false); err != nil { log.Errorf("Could not publish command status to MQTT topic %q: %v", topic, err) } } type venusValueEnvelope struct { Value any `json:"value"` } func subscribeVenusWriteTopics(client mqtt.Client, writer mk2driver.SettingsWriter, config Config, cache *panelStateCache, alarms *alarmTracker, ess *essControlCache, telemetry *telemetryCache) error { if writer == nil { return nil } subscribeTopics := []string{venusWriteTopicWildcard(config)} if config.Venus.GuideCompat { guideTopic := venusWriteTopicWildcardWithService(config, "settings/0") if guideTopic != subscribeTopics[0] { subscribeTopics = append(subscribeTopics, guideTopic) } } handler := func(_ mqtt.Client, msg mqtt.Message) { cmd, err := decodeVenusWriteCommand(config, msg.Topic(), msg.Payload()) if err != nil { log.WithFields(logrus.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).WithError(err).Warn("Ignoring unsupported Venus write request") return } status := applyWriteCommand(client, writer, config, cache, alarms, ess, telemetry, cmd) publishWriteStatus(client, config.StatusTopic, status) } for _, topic := range subscribeTopics { log.WithField("topic", topic).Info("Subscribing to Venus write topic wildcard") t := client.Subscribe(topic, 1, handler) t.Wait() if t.Error() != nil { return t.Error() } } return nil } func decodeVenusWriteCommand(config Config, topic string, payload []byte) (writeCommand, error) { service, path, err := parseVenusWriteTopic(config, topic) if err != nil { return writeCommand{}, err } if path == "" { return writeCommand{}, errors.New("missing Venus write path") } value, err := decodeVenusValue(payload) if err != nil { return writeCommand{}, err } switch path { case "Mode": switchState, switchName, err := decodePanelSwitchFromAny(value) if err != nil { return writeCommand{}, err } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindPanel, HasSwitch: true, SwitchState: switchState, SwitchName: switchName, }, nil case "Ac/ActiveIn/CurrentLimit": limit, err := toFloat64(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid current limit payload: %w", err) } if limit < 0 { return writeCommand{}, fmt.Errorf("current_limit must be >= 0, got %.3f", limit) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindPanel, CurrentLimitA: &limit, }, nil case "Settings/Standby", "RemotePanel/Standby": standby, err := toBool(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid standby payload: %w", err) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindStandby, Standby: &standby, }, nil case "Settings/CGwacs/AcPowerSetPoint": setpoint, err := toFloat64(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid ESS setpoint payload: %w", err) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindESSSet, FloatValue: &setpoint, }, nil case "Settings/CGwacs/MaxChargePower": maxCharge, err := toFloat64(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid ESS max charge power payload: %w", err) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindESSMaxC, FloatValue: &maxCharge, }, nil case "Settings/CGwacs/MaxDischargePower": maxDischarge, err := toFloat64(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid ESS max discharge power payload: %w", err) } return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindESSMaxD, FloatValue: &maxDischarge, }, nil case "Settings/CGwacs/BatteryLife/State": modeValue, err := toInt(value) if err != nil { return writeCommand{}, fmt.Errorf("invalid ESS battery life state payload: %w", err) } modeFloat := float64(modeValue) return writeCommand{ Source: mk2driver.CommandSourceMQTT, Kind: commandKindESSMode, Value: int16(modeValue), FloatValue: &modeFloat, }, nil default: return writeCommand{}, fmt.Errorf("unsupported Venus write path %q for service %q", path, service) } } func parseVenusWriteTopic(config Config, topic string) (service string, path string, err error) { withoutPrefix := stripVenusTopicPrefix(config, topic) withoutPrefix = strings.Trim(withoutPrefix, "/") parts := strings.Split(withoutPrefix, "/") if len(parts) < 4 || parts[0] != "W" { return "", "", fmt.Errorf("topic %q does not appear to be a Venus write topic", topic) } if parts[1] != venusPortalID(config) { return "", "", fmt.Errorf("topic %q is for portal %q, expected %q", topic, parts[1], venusPortalID(config)) } remainder := parts[2:] primary := strings.Split(venusService(config), "/") if len(remainder) >= len(primary) && strings.EqualFold(strings.Join(remainder[:len(primary)], "/"), strings.Join(primary, "/")) { return strings.Join(primary, "/"), strings.Join(remainder[len(primary):], "/"), nil } guide := []string{"settings", "0"} if config.Venus.GuideCompat && len(remainder) >= len(guide) && strings.EqualFold(strings.Join(remainder[:len(guide)], "/"), strings.Join(guide, "/")) { return "settings/0", strings.Join(remainder[len(guide):], "/"), nil } return "", "", fmt.Errorf("topic %q does not match expected Venus services", topic) } func decodeVenusValue(payload []byte) (any, error) { var envelope venusValueEnvelope if err := json.Unmarshal(payload, &envelope); err == nil { if envelope.Value != nil { return envelope.Value, nil } } var out any if err := json.Unmarshal(payload, &out); err != nil { return nil, fmt.Errorf("invalid Venus payload: %w", err) } return out, nil } func decodePanelSwitchFromAny(value any) (mk2driver.PanelSwitchState, string, error) { switch typed := value.(type) { case string: return normalizePanelSwitch(typed) case float64: asInt := int(typed) if typed != float64(asInt) { return 0, "", fmt.Errorf("mode must be integer-like, got %.3f", typed) } return normalizePanelSwitch(strconv.Itoa(asInt)) default: return 0, "", fmt.Errorf("unsupported mode payload type %T", value) } } func toFloat64(value any) (float64, error) { switch typed := value.(type) { case float64: return typed, nil case string: return strconv.ParseFloat(strings.TrimSpace(typed), 64) case int: return float64(typed), nil case int64: return float64(typed), nil default: return 0, fmt.Errorf("unsupported numeric payload type %T", value) } } func toInt(value any) (int, error) { switch typed := value.(type) { case int: return typed, nil case int64: return int(typed), nil case float64: return int(typed), nil case string: return strconv.Atoi(strings.TrimSpace(typed)) default: return 0, fmt.Errorf("unsupported integer payload type %T", value) } } func toBool(value any) (bool, error) { switch typed := value.(type) { case bool: return typed, nil case float64: if typed == 0 { return false, nil } if typed == 1 { return true, nil } return false, fmt.Errorf("unsupported numeric boolean value %.3f", typed) case string: return parseStandbyText(typed) default: return false, fmt.Errorf("unsupported boolean payload type %T", value) } } func publishHADiscovery(client mqtt.Client, config Config) error { definitions := buildHADiscoveryDefinitions(config) prefix := haDiscoveryPrefix(config) nodeID := haNodeID(config) for _, def := range definitions { topic := fmt.Sprintf("%s/%s/%s/%s/config", prefix, def.Component, nodeID, def.ObjectID) log.WithFields(logrus.Fields{ "topic": topic, "component": def.Component, "object_id": def.ObjectID, }).Debug("Publishing Home Assistant discovery definition") if err := publishJSON(client, topic, def.Config, 1, true); err != nil { return fmt.Errorf("could not publish discovery for %s/%s: %w", def.Component, def.ObjectID, err) } } return nil } func buildHADiscoveryDefinitions(config Config) []haDiscoveryDefinition { if !config.HomeAssistant.Enabled { return nil } nodeID := haNodeID(config) device := map[string]any{ "identifiers": []string{fmt.Sprintf("invertergui_%s", nodeID)}, "name": haDeviceName(config), "manufacturer": "Victron Energy", "model": "MultiPlus", "sw_version": "invertergui", } availabilityTopic := haAvailabilityTopic(config) stateTopic := config.Topic sensors := []haDiscoveryDefinition{ buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_voltage", "Battery Voltage", "{{ value_json.BatVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_current", "Battery Current", "{{ value_json.BatCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "battery_charge", "Battery Charge", "{{ ((value_json.ChargeState | float(0)) * 100) | round(1) }}", "%", "battery", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_voltage", "Input Voltage", "{{ value_json.InVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_current", "Input Current", "{{ value_json.InCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_frequency", "Input Frequency", "{{ value_json.InFrequency }}", "Hz", "frequency", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_voltage", "Output Voltage", "{{ value_json.OutVoltage }}", "V", "voltage", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_current", "Output Current", "{{ value_json.OutCurrent }}", "A", "current", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_frequency", "Output Frequency", "{{ value_json.OutFrequency }}", "Hz", "frequency", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "input_power", "Input Power", "{{ ((value_json.InVoltage | float(0)) * (value_json.InCurrent | float(0))) | round(1) }}", "VA", "", "measurement"), buildHASensor(device, availabilityTopic, stateTopic, nodeID, "output_power", "Output Power", "{{ ((value_json.OutVoltage | float(0)) * (value_json.OutCurrent | float(0))) | round(1) }}", "VA", "", "measurement"), { Component: "binary_sensor", ObjectID: "data_valid", Config: map[string]any{ "name": "Data Valid", "unique_id": fmt.Sprintf("%s_data_valid", nodeID), "state_topic": stateTopic, "value_template": "{{ value_json.Valid }}", "payload_on": "true", "payload_off": "false", "availability_topic": availabilityTopic, "device": device, "entity_category": "diagnostic", }, }, } if config.CommandTopic != "" { sensors = append(sensors, haDiscoveryDefinition{ Component: "select", ObjectID: "remote_panel_mode", Config: map[string]any{ "name": "Remote Panel Mode", "unique_id": fmt.Sprintf("%s_remote_panel_mode", nodeID), "state_topic": haPanelSwitchStateTopic(config), "command_topic": config.CommandTopic, "command_template": "{\"kind\":\"panel_state\",\"switch\":\"{{ value }}\"}", "options": []string{"charger_only", "inverter_only", "on", "off"}, "availability_topic": availabilityTopic, "device": device, "icon": "mdi:transmission-tower-export", }, }, haDiscoveryDefinition{ Component: "number", ObjectID: "remote_panel_current_limit", Config: map[string]any{ "name": "Remote Panel Current Limit", "unique_id": fmt.Sprintf("%s_remote_panel_current_limit", nodeID), "state_topic": haCurrentLimitStateTopic(config), "command_topic": config.CommandTopic, "command_template": "{\"kind\":\"panel_state\",\"current_limit\":{{ value | float(0) }}}", "unit_of_measurement": "A", "device_class": "current", "mode": "box", "min": 0, "max": 100, "step": 0.1, "availability_topic": availabilityTopic, "device": device, "icon": "mdi:current-ac", }, }, haDiscoveryDefinition{ Component: "switch", ObjectID: "remote_panel_standby", Config: map[string]any{ "name": "Remote Panel Standby", "unique_id": fmt.Sprintf("%s_remote_panel_standby", nodeID), "state_topic": haStandbyStateTopic(config), "command_topic": config.CommandTopic, "payload_on": "{\"kind\":\"standby\",\"standby\":true}", "payload_off": "{\"kind\":\"standby\",\"standby\":false}", "state_on": "ON", "state_off": "OFF", "availability_topic": availabilityTopic, "device": device, "icon": "mdi:power-sleep", }, }, ) } return sensors } func buildHASensor(device map[string]any, availabilityTopic, stateTopic, nodeID, objectID, name, valueTemplate, unit, deviceClass, stateClass string) haDiscoveryDefinition { config := map[string]any{ "name": name, "unique_id": fmt.Sprintf("%s_%s", nodeID, objectID), "state_topic": stateTopic, "value_template": valueTemplate, "availability_topic": availabilityTopic, "device": device, } if unit != "" { config["unit_of_measurement"] = unit } if deviceClass != "" { config["device_class"] = deviceClass } if stateClass != "" { config["state_class"] = stateClass } return haDiscoveryDefinition{ Component: "sensor", ObjectID: objectID, Config: config, } } func publishHAAvailability(client mqtt.Client, config Config, status string) error { return publishString(client, haAvailabilityTopic(config), status, 1, true) } func publishHAControlState(client mqtt.Client, config Config, cmd writeCommand) error { switch cmd.Kind { case commandKindPanel: if err := publishString(client, haPanelSwitchStateTopic(config), cmd.SwitchName, 1, true); err != nil { return err } if cmd.CurrentLimitA != nil { limit := strconv.FormatFloat(*cmd.CurrentLimitA, 'f', 1, 64) if err := publishString(client, haCurrentLimitStateTopic(config), limit, 1, true); err != nil { return err } } case commandKindStandby: if cmd.Standby == nil { return nil } state := "OFF" if *cmd.Standby { state = "ON" } if err := publishString(client, haStandbyStateTopic(config), state, 1, true); err != nil { return err } } return nil } func buildTelemetrySnapshot(info *mk2driver.Mk2Info) telemetrySnapshot { snapshot := telemetrySnapshot{ Timestamp: info.Timestamp.UTC(), Valid: info.Valid, InputVoltage: info.InVoltage, InputCurrent: info.InCurrent, InputFrequency: info.InFrequency, OutputVoltage: info.OutVoltage, OutputCurrent: info.OutCurrent, OutputFrequency: info.OutFrequency, BatteryVoltage: info.BatVoltage, BatteryCurrent: info.BatCurrent, BatteryCharge: info.ChargeState * 100, InputPower: info.InVoltage * info.InCurrent, OutputPower: info.OutVoltage * info.OutCurrent, BatteryPower: info.BatVoltage * info.BatCurrent, LEDs: map[string]string{}, } if snapshot.Timestamp.IsZero() { snapshot.Timestamp = time.Now().UTC() } for led, state := range info.LEDs { name, ok := mk2driver.LedNames[led] if !ok { continue } if stateName, exists := mk2driver.StateNames[state]; exists { snapshot.LEDs[name] = stateName } } return snapshot } func newHistoryTracker(max int) *historyTracker { if max <= 0 { max = 120 } return &historyTracker{ max: max, } } func (h *historyTracker) Add(snapshot telemetrySnapshot, state operatingState, faultCount uint64, lastFaultAt *time.Time) historySummary { h.mu.Lock() defer h.mu.Unlock() sample := historySample{ Timestamp: snapshot.Timestamp, InputPowerVA: snapshot.InputPower, OutputPowerVA: snapshot.OutputPower, BatteryPowerW: snapshot.BatteryPower, BatteryVoltage: snapshot.BatteryVoltage, Valid: snapshot.Valid, State: state, } if h.lastSample != nil && sample.Timestamp.After(h.lastSample.Timestamp) { delta := sample.Timestamp.Sub(h.lastSample.Timestamp) deltaHours := delta.Hours() if sample.InputPowerVA > 0 { h.energyInWh += sample.InputPowerVA * deltaHours } if sample.OutputPowerVA > 0 { h.energyOutWh += sample.OutputPowerVA * deltaHours } if sample.BatteryPowerW > 0 { h.batteryChargeWh += sample.BatteryPowerW * deltaHours } if sample.BatteryPowerW < 0 { h.batteryDischargeWh += -sample.BatteryPowerW * deltaHours } if sample.Valid && sample.State != operatingStateOff { h.uptimeSeconds += delta.Seconds() } } h.samples = append(h.samples, sample) if len(h.samples) > h.max { h.samples = h.samples[len(h.samples)-h.max:] } sampleCopy := sample h.lastSample = &sampleCopy h.lastState = state h.lastFaultCount = faultCount if lastFaultAt != nil { h.lastFaultAt = lastFaultAt.UTC() } return h.summaryLocked(state, faultCount, lastFaultAt) } func (h *historyTracker) Summary(state operatingState, faultCount uint64, lastFaultAt *time.Time) historySummary { h.mu.Lock() defer h.mu.Unlock() return h.summaryLocked(state, faultCount, lastFaultAt) } func (h *historyTracker) summaryLocked(state operatingState, faultCount uint64, lastFaultAt *time.Time) historySummary { summary := summarizeHistory(h.samples) summary.EnergyInWh = h.energyInWh summary.EnergyOutWh = h.energyOutWh summary.BatteryChargeWh = h.batteryChargeWh summary.BatteryDischargeWh = h.batteryDischargeWh summary.UptimeSeconds = h.uptimeSeconds if faultCount == 0 { faultCount = h.lastFaultCount } summary.FaultCount = faultCount if lastFaultAt == nil && !h.lastFaultAt.IsZero() { lastFaultAt = &h.lastFaultAt } if lastFaultAt != nil && !lastFaultAt.IsZero() { summary.LastFaultAt = lastFaultAt.UTC().Format(time.RFC3339) } if state == "" { state = h.lastState } summary.CurrentState = string(state) return summary } func summarizeHistory(samples []historySample) historySummary { summary := historySummary{ Samples: len(samples), } if len(samples) == 0 { return summary } first := samples[0] last := samples[len(samples)-1] if last.Timestamp.After(first.Timestamp) { summary.WindowSeconds = last.Timestamp.Sub(first.Timestamp).Seconds() } minBatteryVoltage := samples[0].BatteryVoltage maxOutputPower := samples[0].OutputPowerVA var inTotal, outTotal, batTotal float64 for _, sample := range samples { inTotal += sample.InputPowerVA outTotal += sample.OutputPowerVA batTotal += sample.BatteryPowerW if sample.BatteryVoltage < minBatteryVoltage { minBatteryVoltage = sample.BatteryVoltage } if sample.OutputPowerVA > maxOutputPower { maxOutputPower = sample.OutputPowerVA } } count := float64(len(samples)) summary.AverageInputPower = inTotal / count summary.AverageOutputPower = outTotal / count summary.AverageBatteryPower = batTotal / count summary.MaxOutputPower = maxOutputPower summary.MinBatteryVoltage = minBatteryVoltage return summary } func newAlarmTracker() *alarmTracker { return &alarmTracker{ active: map[string]alarmState{}, rules: map[string]*alarmRule{}, debounceOn: 1500 * time.Millisecond, debounceOff: 3 * time.Second, } } func (a *alarmTracker) Update(snapshot telemetrySnapshot) []alarmState { now := snapshot.Timestamp if now.IsZero() { now = time.Now().UTC() } a.mu.Lock() defer a.mu.Unlock() a.setLocked("invalid_data", "critical", "MK2 data frame marked invalid", !snapshot.Valid, now) a.setFromLEDLocked(snapshot, "led_overload", "overload", "critical", "Inverter overload alarm", now) a.setFromLEDLocked(snapshot, "led_over_temp", "over_temperature", "critical", "Inverter over temperature alarm", now) a.setFromLEDLocked(snapshot, "led_bat_low", "battery_low", "warning", "Battery low warning", now) a.setLocked("command_failure", "warning", "Recent command failed", false, now) return a.activeLocked() } func (a *alarmTracker) RecordCommandFailure(err error, now time.Time) []alarmState { if now.IsZero() { now = time.Now().UTC() } a.mu.Lock() defer a.mu.Unlock() message := "Recent command failed" if err != nil { message = fmt.Sprintf("Command failed: %v", err) } a.setLocked("command_failure", "warning", message, true, now) return a.activeLocked() } func (a *alarmTracker) RecordCommandSuccess(now time.Time) []alarmState { if now.IsZero() { now = time.Now().UTC() } a.mu.Lock() defer a.mu.Unlock() a.setLocked("command_failure", "warning", "Recent command failed", false, now) return a.activeLocked() } func (a *alarmTracker) Stats() (uint64, *time.Time) { a.mu.Lock() defer a.mu.Unlock() if a.lastFault.IsZero() { return a.faultCount, nil } last := a.lastFault return a.faultCount, &last } func (a *alarmTracker) activeLocked() []alarmState { alarms := make([]alarmState, 0, len(a.active)) for _, alarm := range a.active { alarms = append(alarms, alarm) } sort.Slice(alarms, func(i, j int) bool { return alarms[i].Code < alarms[j].Code }) return alarms } func (a *alarmTracker) getRuleLocked(code, level, message string) *alarmRule { rule, ok := a.rules[code] if !ok { rule = &alarmRule{ code: code, level: level, message: message, } a.rules[code] = rule return rule } rule.level = level rule.message = message return rule } func (a *alarmTracker) setFromLEDLocked(snapshot telemetrySnapshot, ledKey, code, level, message string, now time.Time) { state := strings.ToLower(strings.TrimSpace(snapshot.LEDs[ledKey])) active := state == "on" || state == "blink" a.setLocked(code, level, message, active, now) } func (a *alarmTracker) setLocked(code, level, message string, active bool, now time.Time) { rule := a.getRuleLocked(code, level, message) if active { rule.clearFrom = time.Time{} if rule.active { a.active[code] = alarmState{ Code: code, Level: level, Message: message, ActiveSince: rule.activeSince, } return } if rule.pendingFrom.IsZero() { rule.pendingFrom = now } if now.Sub(rule.pendingFrom) >= a.debounceOn { rule.active = true rule.activeSince = rule.pendingFrom a.active[code] = alarmState{ Code: code, Level: level, Message: message, ActiveSince: rule.activeSince, } a.faultCount++ a.lastFault = now } return } rule.pendingFrom = time.Time{} if !rule.active { delete(a.active, code) return } if rule.clearFrom.IsZero() { rule.clearFrom = now return } if now.Sub(rule.clearFrom) >= a.debounceOff { rule.active = false rule.activeSince = time.Time{} rule.clearFrom = time.Time{} delete(a.active, code) } } func deriveOperatingState(snapshot telemetrySnapshot, panel panelStateSnapshot, alarms []alarmState) operatingState { for _, alarm := range alarms { if strings.EqualFold(alarm.Level, "critical") { return operatingStateFault } } if !snapshot.Valid { return operatingStateFault } if panel.HasSwitch && panel.Switch == "off" { return operatingStateOff } mainsState := strings.ToLower(strings.TrimSpace(snapshot.LEDs["led_mains"])) inverterState := strings.ToLower(strings.TrimSpace(snapshot.LEDs["led_inverter"])) mainsOn := mainsState == "on" || mainsState == "blink" inverterOn := inverterState == "on" || inverterState == "blink" charging := snapshot.BatteryCurrent > 0.2 if inverterOn && !mainsOn { return operatingStateInverter } if mainsOn && snapshot.OutputPower > 20 { if charging { return operatingStateCharger } return operatingStatePassthru } if charging { return operatingStateCharger } if snapshot.OutputPower > 20 { return operatingStateInverter } return operatingStateOff } func publishOrchestrationTopics(client mqtt.Client, config Config, snapshot telemetrySnapshot, panel panelStateSnapshot, summary historySummary, alarms []alarmState, state operatingState) error { deviceRoot := deviceTopicRoot(config) payload := orchestrationState{ DeviceID: normalizedDeviceID(config), InstanceID: config.InstanceID, Phase: strings.ToUpper(strings.TrimSpace(config.Phase)), PhaseGroup: strings.TrimSpace(config.PhaseGroup), Timestamp: snapshot.Timestamp, OperatingState: string(state), Telemetry: snapshot, PanelState: panel, History: summary, Alarms: alarms, } if err := publishJSON(client, fmt.Sprintf("%s/state", deviceRoot), payload, 0, false); err != nil { return err } if err := publishJSON(client, fmt.Sprintf("%s/history/summary", deviceRoot), summary, 0, false); err != nil { return err } if err := publishActiveAlarms(client, config, alarms); err != nil { return err } groupPhase := strings.ToUpper(strings.TrimSpace(config.Phase)) if groupPhase == "" { groupPhase = "L1" } groupTopic := fmt.Sprintf("%s/groups/%s/%s/state", mqttTopicRoot(config.Topic), normalizeID(config.PhaseGroup), groupPhase) if err := publishJSON(client, groupTopic, payload, 0, false); err != nil { return err } return nil } func publishActiveAlarms(client mqtt.Client, config Config, alarms []alarmState) error { return publishJSON(client, fmt.Sprintf("%s/alarms/active", deviceTopicRoot(config)), alarms, 1, true) } func publishDiagnostics(client mqtt.Client, config Config, driverDiag mk2driver.DriverDiagnostics, commands []mk2driver.CommandEvent) error { diag := diagnosticsBundle{ DeviceID: normalizedDeviceID(config), InstanceID: config.InstanceID, Phase: strings.ToUpper(strings.TrimSpace(config.Phase)), PhaseGroup: strings.TrimSpace(config.PhaseGroup), GeneratedAt: time.Now().UTC(), HealthScore: driverDiag.HealthScore, Driver: &driverDiag, Commands: commands, } return publishJSON(client, fmt.Sprintf("%s/diagnostics", deviceTopicRoot(config)), diag, 1, false) } func publishVenusServiceInfo(client mqtt.Client, config Config) error { if !config.Venus.Enabled { return nil } if err := publishVenusValue(client, config, "Connected", 1, true); err != nil { return err } if err := publishVenusValue(client, config, "ProductName", "invertergui", true); err != nil { return err } if err := publishVenusValue(client, config, "DeviceInstance", config.InstanceID, true); err != nil { return err } return nil } func publishVenusTelemetry(client mqtt.Client, config Config, snapshot telemetrySnapshot, panel panelStateSnapshot, alarms []alarmState, state operatingState, history historySummary, ess essControlSnapshot) error { phase := strings.ToUpper(strings.TrimSpace(config.Phase)) if phase == "" { phase = "L1" } if err := publishVenusValue(client, config, "Connected", 1, true); err != nil { return err } if err := publishVenusValue(client, config, "State", string(state), true); err != nil { return err } if err := publishVenusValue(client, config, "Dc/0/Voltage", snapshot.BatteryVoltage, true); err != nil { return err } if err := publishVenusValue(client, config, "Dc/0/Current", snapshot.BatteryCurrent, true); err != nil { return err } if err := publishVenusValue(client, config, "Dc/0/Power", snapshot.BatteryPower, true); err != nil { return err } if err := publishVenusValue(client, config, "Soc", snapshot.BatteryCharge, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/ActiveIn/%s/V", phase), snapshot.InputVoltage, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/ActiveIn/%s/I", phase), snapshot.InputCurrent, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/ActiveIn/%s/F", phase), snapshot.InputFrequency, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/ActiveIn/%s/P", phase), snapshot.InputPower, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/Out/%s/V", phase), snapshot.OutputVoltage, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/Out/%s/I", phase), snapshot.OutputCurrent, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/Out/%s/F", phase), snapshot.OutputFrequency, true); err != nil { return err } if err := publishVenusValue(client, config, fmt.Sprintf("Ac/Out/%s/P", phase), snapshot.OutputPower, true); err != nil { return err } if err := publishVenusValue(client, config, "History/EnergyIn", history.EnergyInWh, true); err != nil { return err } if err := publishVenusValue(client, config, "History/EnergyOut", history.EnergyOutWh, true); err != nil { return err } if err := publishVenusValue(client, config, "History/FaultCount", history.FaultCount, true); err != nil { return err } if err := publishVenusESSState(client, config, ess); err != nil { return err } if panel.HasSwitch { if err := publishVenusValue(client, config, "Mode", panel.Switch, true); err != nil { return err } } if panel.HasCurrent { if err := publishVenusValue(client, config, "Ac/ActiveIn/CurrentLimit", panel.CurrentLimit, true); err != nil { return err } } if panel.HasStandby { if err := publishVenusValue(client, config, "Settings/Standby", panel.Standby, true); err != nil { return err } } alarmLevels := venusAlarmLevels(alarms) if err := publishVenusValue(client, config, "Alarms/LowBattery", alarmLevels["battery_low"], true); err != nil { return err } if err := publishVenusValue(client, config, "Alarms/HighTemperature", alarmLevels["over_temperature"], true); err != nil { return err } if err := publishVenusValue(client, config, "Alarms/Overload", alarmLevels["overload"], true); err != nil { return err } if err := publishVenusValue(client, config, "Alarms/Communication", alarmLevels["invalid_data"], true); err != nil { return err } if err := publishVenusValue(client, config, "Alarms/Command", alarmLevels["command_failure"], true); err != nil { return err } return nil } func publishVenusControlState(client mqtt.Client, config Config, cmd writeCommand, panel panelStateSnapshot) error { switch cmd.Kind { case commandKindPanel: if panel.HasSwitch { if err := publishVenusValue(client, config, "Mode", panel.Switch, true); err != nil { return err } } if panel.HasCurrent { if err := publishVenusValue(client, config, "Ac/ActiveIn/CurrentLimit", panel.CurrentLimit, true); err != nil { return err } } case commandKindStandby: if panel.HasStandby { if err := publishVenusValue(client, config, "Settings/Standby", panel.Standby, true); err != nil { return err } } } return nil } func publishVenusESSState(client mqtt.Client, config Config, ess essControlSnapshot) error { if !config.Venus.GuideCompat { return nil } if ess.HasSetpoint { if err := publishVenusValueWithService(client, config, "settings/0", "Settings/CGwacs/AcPowerSetPoint", ess.SetpointW, true); err != nil { return err } } if ess.HasMaxCharge { if err := publishVenusValueWithService(client, config, "settings/0", "Settings/CGwacs/MaxChargePower", ess.MaxChargeW, true); err != nil { return err } } if ess.HasMaxDischarge { if err := publishVenusValueWithService(client, config, "settings/0", "Settings/CGwacs/MaxDischargePower", ess.MaxDischargeW, true); err != nil { return err } } if ess.HasBatteryLife { if err := publishVenusValueWithService(client, config, "settings/0", "Settings/CGwacs/BatteryLife/State", ess.BatteryLifeState, true); err != nil { return err } } return nil } func venusAlarmLevels(alarms []alarmState) map[string]int { levels := map[string]int{ "battery_low": 0, "over_temperature": 0, "overload": 0, "invalid_data": 0, "command_failure": 0, } for _, alarm := range alarms { level := 1 if strings.EqualFold(alarm.Level, "critical") { level = 2 } if existing, ok := levels[alarm.Code]; ok && level > existing { levels[alarm.Code] = level } } return levels } func publishVenusValue(client mqtt.Client, config Config, path string, value any, retained bool) error { topic := venusNotifyTopic(config, path) payload := venusValueEnvelope{Value: value} return publishJSON(client, topic, payload, 1, retained) } func publishVenusValueWithService(client mqtt.Client, config Config, service, path string, value any, retained bool) error { topic := venusNotifyTopicWithService(config, service, path) payload := venusValueEnvelope{Value: value} return publishJSON(client, topic, payload, 1, retained) } func publishJSON(client mqtt.Client, topic string, payload any, qos byte, retained bool) error { if topic == "" { return errors.New("topic is empty") } data, err := json.Marshal(payload) if err != nil { return fmt.Errorf("could not marshal payload: %w", err) } t := client.Publish(topic, qos, retained, data) t.Wait() if t.Error() != nil { return t.Error() } log.WithFields(logrus.Fields{ "topic": topic, "qos": qos, "retained": retained, "bytes": len(data), }).Debug("Published JSON message") return nil } func publishString(client mqtt.Client, topic, payload string, qos byte, retained bool) error { if topic == "" { return errors.New("topic is empty") } t := client.Publish(topic, qos, retained, payload) t.Wait() if t.Error() != nil { return t.Error() } log.WithFields(logrus.Fields{ "topic": topic, "qos": qos, "retained": retained, "payload": payload, }).Debug("Published string message") return nil } func mqttTopicRoot(topic string) string { t := strings.Trim(strings.TrimSpace(topic), "/") if t == "" { return "invertergui" } if strings.HasSuffix(t, "/updates") { root := strings.TrimSuffix(t, "/updates") if root != "" { return root } } return t } func normalizedDeviceID(config Config) string { deviceID := normalizeID(config.DeviceID) if deviceID == "" { deviceID = normalizeID(config.ClientID) } if deviceID == "" { return "invertergui" } return deviceID } func deviceTopicRoot(config Config) string { return fmt.Sprintf("%s/devices/%s", mqttTopicRoot(config.Topic), normalizedDeviceID(config)) } func venusPortalID(config Config) string { portal := normalizeID(config.Venus.PortalID) if portal == "" { return normalizedDeviceID(config) } return portal } func venusService(config Config) string { service := strings.Trim(strings.TrimSpace(config.Venus.Service), "/") if service == "" { return "vebus/257" } return service } func venusNotifyTopic(config Config, path string) string { return venusNotifyTopicWithService(config, venusService(config), path) } func venusNotifyTopicWithService(config Config, service, path string) string { cleanPath := strings.Trim(strings.TrimSpace(path), "/") cleanService := strings.Trim(strings.TrimSpace(service), "/") base := fmt.Sprintf("N/%s/%s/%s", venusPortalID(config), cleanService, cleanPath) return venusPrefixedTopic(config, base) } func venusWriteTopicRoot(config Config) string { return venusWriteTopicRootWithService(config, venusService(config)) } func venusWriteTopicRootWithService(config Config, service string) string { cleanService := strings.Trim(strings.TrimSpace(service), "/") base := fmt.Sprintf("W/%s/%s", venusPortalID(config), cleanService) return venusPrefixedTopic(config, base) } func venusWriteTopicWildcard(config Config) string { return fmt.Sprintf("%s/#", venusWriteTopicRoot(config)) } func venusWriteTopicWildcardWithService(config Config, service string) string { return fmt.Sprintf("%s/#", venusWriteTopicRootWithService(config, service)) } func venusPrefixedTopic(config Config, topic string) string { prefix := strings.Trim(strings.TrimSpace(config.Venus.TopicPrefix), "/") cleanTopic := strings.Trim(strings.TrimSpace(topic), "/") if prefix == "" { return cleanTopic } return fmt.Sprintf("%s/%s", prefix, cleanTopic) } func stripVenusTopicPrefix(config Config, topic string) string { clean := strings.Trim(strings.TrimSpace(topic), "/") prefix := strings.Trim(strings.TrimSpace(config.Venus.TopicPrefix), "/") if prefix == "" { return clean } if strings.HasPrefix(clean, prefix+"/") { return strings.TrimPrefix(clean, prefix+"/") } return clean } func haAvailabilityTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/availability", mqttTopicRoot(config.Topic)) } func haPanelSwitchStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_mode/state", mqttTopicRoot(config.Topic)) } func haCurrentLimitStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_current_limit/state", mqttTopicRoot(config.Topic)) } func haStandbyStateTopic(config Config) string { return fmt.Sprintf("%s/homeassistant/remote_panel_standby/state", mqttTopicRoot(config.Topic)) } func haDiscoveryPrefix(config Config) string { prefix := strings.Trim(strings.TrimSpace(config.HomeAssistant.DiscoveryPrefix), "/") if prefix == "" { return "homeassistant" } return prefix } func haNodeID(config Config) string { nodeID := normalizeID(config.HomeAssistant.NodeID) if nodeID == "" { nodeID = normalizeID(config.ClientID) } if nodeID == "" { return "invertergui" } return nodeID } func haDeviceName(config Config) string { name := strings.TrimSpace(config.HomeAssistant.DeviceName) if name == "" { return "Victron Inverter" } return name } func normalizeID(in string) string { trimmed := strings.TrimSpace(strings.ToLower(in)) if trimmed == "" { return "" } var b strings.Builder lastUnderscore := false for _, r := range trimmed { valid := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' || r == '_' if valid { b.WriteRune(r) lastUnderscore = false continue } if !lastUnderscore { b.WriteRune('_') lastUnderscore = true } } return strings.Trim(b.String(), "_") } func copyBoolPtr(in *bool) *bool { if in == nil { return nil } value := *in return &value } func float64Ptr(in float64) *float64 { return &in } func getOpts(config Config) *mqtt.ClientOptions { opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) if config.Username != "" { opts.SetUsername(config.Username) } if config.Password != "" { opts.SetPassword(config.Password) } if config.HomeAssistant.Enabled { opts.SetWill(haAvailabilityTopic(config), "offline", 1, true) } else if config.Venus.Enabled { opts.SetWill(venusNotifyTopic(config, "Connected"), `{"value":0}`, 1, true) } opts.SetKeepAlive(keepAlive) opts.SetOnConnectHandler(func(mqtt.Client) { log.Info("Client connected to broker") }) opts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { log.Errorf("Client connection to broker lost: %v", err) }) return opts }