Files
go-weatherstation/internal/mqttingest/latest.go
2026-01-26 12:46:06 +11:00

172 lines
4.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package mqttingest
import (
"sync"
"time"
)
type rainMode int
const (
rainModeUnknown rainMode = iota
rainModeCumulative
rainModeIncremental
)
type Latest struct {
mu sync.RWMutex
lastTS time.Time
last *WS90Payload
// Rain tracking
mode rainMode
lastRainMM *float64
// rolling sums built from "rain increment" values (mm)
rainIncs []rainIncPoint // last 1h
dailyIncs []rainIncPoint // since midnight (or since start; well trim daily by midnight)
}
type rainIncPoint struct {
ts time.Time
mm float64 // incremental rainfall at this timestamp (mm)
}
func (l *Latest) Update(ts time.Time, p *WS90Payload) {
l.mu.Lock()
defer l.mu.Unlock()
l.lastTS = ts
l.last = p
inc := l.computeRainIncrement(p.RainMM)
// Track last hour increments
l.rainIncs = append(l.rainIncs, rainIncPoint{ts: ts, mm: inc})
cutoff := ts.Add(-1 * time.Hour)
l.rainIncs = trimBefore(l.rainIncs, cutoff)
// Track daily increments: trim before local midnight
l.dailyIncs = append(l.dailyIncs, rainIncPoint{ts: ts, mm: inc})
midnight := localMidnight(ts)
l.dailyIncs = trimBefore(l.dailyIncs, midnight)
}
func trimBefore(a []rainIncPoint, cutoff time.Time) []rainIncPoint {
i := 0
for ; i < len(a); i++ {
if !a[i].ts.Before(cutoff) {
break
}
}
if i > 0 {
return a[i:]
}
return a
}
// localMidnight returns midnight in the local timezone of the *process*.
// If you want a specific timezone (e.g. Australia/Sydney) we can wire that in later.
func localMidnight(t time.Time) time.Time {
lt := t.Local()
return time.Date(lt.Year(), lt.Month(), lt.Day(), 0, 0, 0, 0, lt.Location())
}
// computeRainIncrement returns the “incremental rain” in mm for this sample,
// regardless of whether the incoming rain_mm is cumulative or incremental.
func (l *Latest) computeRainIncrement(rainMM float64) float64 {
// First sample: we cant infer anything yet
if l.lastRainMM == nil {
l.lastRainMM = &rainMM
return 0
}
prev := *l.lastRainMM
l.lastRainMM = &rainMM
// Heuristic:
// - If value often stays 0 and occasionally jumps by small amounts, it might be cumulative OR incremental.
// - If it monotonically increases over time (with occasional resets), thats cumulative.
// - If it is usually small per message (e.g. 0, 0.2, 0, 0, 0.2) and not trending upward, thats incremental.
//
// Well decide based on “trendiness” and deltas:
delta := rainMM - prev
// Handle reset (counter rollover / daily reset / device reboot)
if delta < -0.001 {
// If cumulative, after reset the “increment” is 0 for that sample.
// If incremental, a reset doesnt really make sense but we still treat as 0.
if l.mode == rainModeUnknown {
l.mode = rainModeCumulative
}
return 0
}
// If we already decided
switch l.mode {
case rainModeCumulative:
if delta > 0 {
return delta
}
return 0
case rainModeIncremental:
// in incremental mode we treat the sample as “this messages rain”
if rainMM > 0 {
return rainMM
}
return 0
}
// Decide mode (unknown):
// If delta is consistently positive when rainMM > 0, cumulative is likely.
// If delta is ~0 while rainMM occasionally > 0, incremental is likely.
//
// Single-sample heuristic:
// - if rainMM > 0 and delta > 0 => lean cumulative
// - if rainMM > 0 and delta ~ 0 => lean incremental
if rainMM > 0 {
if delta > 0.0009 {
l.mode = rainModeCumulative
return delta
}
// delta near zero but rainMM nonzero suggests incremental
l.mode = rainModeIncremental
return rainMM
}
return 0
}
type Snapshot struct {
TS time.Time
P WS90Payload
RainLastHourMM float64
DailyRainMM float64
}
func (l *Latest) Snapshot() (Snapshot, bool) {
l.mu.RLock()
defer l.mu.RUnlock()
if l.last == nil || l.lastTS.IsZero() {
return Snapshot{}, false
}
var hourSum, daySum float64
for _, rp := range l.rainIncs {
hourSum += rp.mm
}
for _, rp := range l.dailyIncs {
daySum += rp.mm
}
return Snapshot{
TS: l.lastTS,
P: *l.last,
RainLastHourMM: hourSum,
DailyRainMM: daySum,
}, true
}