183 lines
4.1 KiB
Go
183 lines
4.1 KiB
Go
package mqttingest
|
||
|
||
import (
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type rainMode int
|
||
|
||
const (
|
||
rainModeUnknown rainMode = iota
|
||
rainModeCumulative
|
||
rainModeIncremental
|
||
)
|
||
|
||
type Latest struct {
|
||
mu sync.RWMutex
|
||
|
||
lastTS time.Time
|
||
last *WS90Payload
|
||
baroTS time.Time
|
||
baroHPA *float64
|
||
|
||
// Rain tracking
|
||
mode rainMode
|
||
lastRainMM *float64
|
||
|
||
// rolling sums built from "rain increment" values (mm)
|
||
rainIncs []rainIncPoint // last 1h
|
||
dailyIncs []rainIncPoint // since midnight (or since start; we’ll trim daily by midnight)
|
||
}
|
||
|
||
type rainIncPoint struct {
|
||
ts time.Time
|
||
mm float64 // incremental rainfall at this timestamp (mm)
|
||
}
|
||
|
||
func (l *Latest) Update(ts time.Time, p *WS90Payload) {
|
||
l.mu.Lock()
|
||
defer l.mu.Unlock()
|
||
|
||
l.lastTS = ts
|
||
l.last = p
|
||
|
||
inc := l.computeRainIncrement(p.RainMM)
|
||
|
||
// Track last hour increments
|
||
l.rainIncs = append(l.rainIncs, rainIncPoint{ts: ts, mm: inc})
|
||
cutoff := ts.Add(-1 * time.Hour)
|
||
l.rainIncs = trimBefore(l.rainIncs, cutoff)
|
||
|
||
// Track daily increments: trim before local midnight
|
||
l.dailyIncs = append(l.dailyIncs, rainIncPoint{ts: ts, mm: inc})
|
||
midnight := localMidnight(ts)
|
||
l.dailyIncs = trimBefore(l.dailyIncs, midnight)
|
||
}
|
||
|
||
func trimBefore(a []rainIncPoint, cutoff time.Time) []rainIncPoint {
|
||
i := 0
|
||
for ; i < len(a); i++ {
|
||
if !a[i].ts.Before(cutoff) {
|
||
break
|
||
}
|
||
}
|
||
if i > 0 {
|
||
return a[i:]
|
||
}
|
||
return a
|
||
}
|
||
|
||
// localMidnight returns midnight in the local timezone of the *process*.
|
||
// If you want a specific timezone (e.g. Australia/Sydney) we can wire that in later.
|
||
func localMidnight(t time.Time) time.Time {
|
||
lt := t.Local()
|
||
return time.Date(lt.Year(), lt.Month(), lt.Day(), 0, 0, 0, 0, lt.Location())
|
||
}
|
||
|
||
// computeRainIncrement returns the “incremental rain” in mm for this sample,
|
||
// regardless of whether the incoming rain_mm is cumulative or incremental.
|
||
func (l *Latest) computeRainIncrement(rainMM float64) float64 {
|
||
// First sample: we can’t infer anything yet
|
||
if l.lastRainMM == nil {
|
||
l.lastRainMM = &rainMM
|
||
return 0
|
||
}
|
||
|
||
prev := *l.lastRainMM
|
||
l.lastRainMM = &rainMM
|
||
|
||
// Heuristic:
|
||
// - If value often stays 0 and occasionally jumps by small amounts, it might be cumulative OR incremental.
|
||
// - If it monotonically increases over time (with occasional resets), that’s cumulative.
|
||
// - If it is usually small per message (e.g. 0, 0.2, 0, 0, 0.2) and not trending upward, that’s incremental.
|
||
//
|
||
// We’ll decide based on “trendiness” and deltas:
|
||
delta := rainMM - prev
|
||
|
||
// Handle reset (counter rollover / daily reset / device reboot)
|
||
if delta < -0.001 {
|
||
// If cumulative, after reset the “increment” is 0 for that sample.
|
||
// If incremental, a reset doesn’t really make sense but we still treat as 0.
|
||
if l.mode == rainModeUnknown {
|
||
l.mode = rainModeCumulative
|
||
}
|
||
return 0
|
||
}
|
||
|
||
// If we already decided
|
||
switch l.mode {
|
||
case rainModeCumulative:
|
||
if delta > 0 {
|
||
return delta
|
||
}
|
||
return 0
|
||
case rainModeIncremental:
|
||
// in incremental mode we treat the sample as “this message’s rain”
|
||
if rainMM > 0 {
|
||
return rainMM
|
||
}
|
||
return 0
|
||
}
|
||
|
||
// Decide mode (unknown):
|
||
// Default to cumulative once we see any non-zero rain value. This avoids
|
||
// wildly overcounting when the service starts and rain_mm is a cumulative counter.
|
||
if rainMM > 0 {
|
||
l.mode = rainModeCumulative
|
||
if delta > 0.0009 {
|
||
return delta
|
||
}
|
||
}
|
||
|
||
return 0
|
||
}
|
||
|
||
type Snapshot struct {
|
||
TS time.Time
|
||
P WS90Payload
|
||
|
||
RainLastHourMM float64
|
||
DailyRainMM float64
|
||
PressureHPA *float64
|
||
}
|
||
|
||
func (l *Latest) Snapshot() (Snapshot, bool) {
|
||
l.mu.RLock()
|
||
defer l.mu.RUnlock()
|
||
|
||
if l.last == nil || l.lastTS.IsZero() {
|
||
return Snapshot{}, false
|
||
}
|
||
|
||
var hourSum, daySum float64
|
||
for _, rp := range l.rainIncs {
|
||
hourSum += rp.mm
|
||
}
|
||
for _, rp := range l.dailyIncs {
|
||
daySum += rp.mm
|
||
}
|
||
|
||
var pressure *float64
|
||
if l.baroHPA != nil {
|
||
v := *l.baroHPA
|
||
pressure = &v
|
||
}
|
||
|
||
return Snapshot{
|
||
TS: l.lastTS,
|
||
P: *l.last,
|
||
RainLastHourMM: hourSum,
|
||
DailyRainMM: daySum,
|
||
PressureHPA: pressure,
|
||
}, true
|
||
}
|
||
|
||
func (l *Latest) UpdateBarometer(ts time.Time, pressureHPA float64) {
|
||
l.mu.Lock()
|
||
defer l.mu.Unlock()
|
||
|
||
l.baroTS = ts
|
||
l.baroHPA = &pressureHPA
|
||
}
|