package mqttingest import ( "sync" "time" ) type rainMode int const ( rainModeUnknown rainMode = iota rainModeCumulative rainModeIncremental ) type Latest struct { mu sync.RWMutex lastTS time.Time last *WS90Payload // Rain tracking mode rainMode lastRainMM *float64 // rolling sums built from "rain increment" values (mm) rainIncs []rainIncPoint // last 1h dailyIncs []rainIncPoint // since midnight (or since start; we’ll trim daily by midnight) } type rainIncPoint struct { ts time.Time mm float64 // incremental rainfall at this timestamp (mm) } func (l *Latest) Update(ts time.Time, p *WS90Payload) { l.mu.Lock() defer l.mu.Unlock() l.lastTS = ts l.last = p inc := l.computeRainIncrement(p.RainMM) // Track last hour increments l.rainIncs = append(l.rainIncs, rainIncPoint{ts: ts, mm: inc}) cutoff := ts.Add(-1 * time.Hour) l.rainIncs = trimBefore(l.rainIncs, cutoff) // Track daily increments: trim before local midnight l.dailyIncs = append(l.dailyIncs, rainIncPoint{ts: ts, mm: inc}) midnight := localMidnight(ts) l.dailyIncs = trimBefore(l.dailyIncs, midnight) } func trimBefore(a []rainIncPoint, cutoff time.Time) []rainIncPoint { i := 0 for ; i < len(a); i++ { if !a[i].ts.Before(cutoff) { break } } if i > 0 { return a[i:] } return a } // localMidnight returns midnight in the local timezone of the *process*. // If you want a specific timezone (e.g. Australia/Sydney) we can wire that in later. func localMidnight(t time.Time) time.Time { lt := t.Local() return time.Date(lt.Year(), lt.Month(), lt.Day(), 0, 0, 0, 0, lt.Location()) } // computeRainIncrement returns the “incremental rain” in mm for this sample, // regardless of whether the incoming rain_mm is cumulative or incremental. func (l *Latest) computeRainIncrement(rainMM float64) float64 { // First sample: we can’t infer anything yet if l.lastRainMM == nil { l.lastRainMM = &rainMM return 0 } prev := *l.lastRainMM l.lastRainMM = &rainMM // Heuristic: // - If value often stays 0 and occasionally jumps by small amounts, it might be cumulative OR incremental. // - If it monotonically increases over time (with occasional resets), that’s cumulative. // - If it is usually small per message (e.g. 0, 0.2, 0, 0, 0.2) and not trending upward, that’s incremental. // // We’ll decide based on “trendiness” and deltas: delta := rainMM - prev // Handle reset (counter rollover / daily reset / device reboot) if delta < -0.001 { // If cumulative, after reset the “increment” is 0 for that sample. // If incremental, a reset doesn’t really make sense but we still treat as 0. if l.mode == rainModeUnknown { l.mode = rainModeCumulative } return 0 } // If we already decided switch l.mode { case rainModeCumulative: if delta > 0 { return delta } return 0 case rainModeIncremental: // in incremental mode we treat the sample as “this message’s rain” if rainMM > 0 { return rainMM } return 0 } // Decide mode (unknown): // If delta is consistently positive when rainMM > 0, cumulative is likely. // If delta is ~0 while rainMM occasionally > 0, incremental is likely. // // Single-sample heuristic: // - if rainMM > 0 and delta > 0 => lean cumulative // - if rainMM > 0 and delta ~ 0 => lean incremental if rainMM > 0 { if delta > 0.0009 { l.mode = rainModeCumulative return delta } // delta near zero but rainMM nonzero suggests incremental l.mode = rainModeIncremental return rainMM } return 0 } type Snapshot struct { TS time.Time P WS90Payload RainLastHourMM float64 DailyRainMM float64 } func (l *Latest) Snapshot() (Snapshot, bool) { l.mu.RLock() defer l.mu.RUnlock() if l.last == nil || l.lastTS.IsZero() { return Snapshot{}, false } var hourSum, daySum float64 for _, rp := range l.rainIncs { hourSum += rp.mm } for _, rp := range l.dailyIncs { daySum += rp.mm } return Snapshot{ TS: l.lastTS, P: *l.last, RainLastHourMM: hourSum, DailyRainMM: daySum, }, true }