commit adaa57f9e27ceb2a4acdb4a2c70247f1b8c9a0f1 Author: Nathan Coad Date: Mon Jan 26 12:40:47 2026 +1100 first commit diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..37aa52b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# ---- build stage ---- +FROM golang:1.25-alpine AS build +WORKDIR /src + +# Git CA certs (for go modules / HTTPS) +RUN apk add --no-cache ca-certificates + +# Copy mod files first for better caching +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the rest of the source +COPY . . + +# Build a static-ish binary (alpine musl) +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -trimpath -ldflags="-s -w" -o /out/ingestd ./cmd/ingestd + +# ---- runtime stage ---- +FROM alpine:3.22 +WORKDIR /app + +RUN apk add --no-cache ca-certificates && \ + adduser -D -H -u 10001 appuser + +# Copy binary + schema file used at startup +COPY --from=build /out/ingestd /app/ingestd +COPY internal/db/schema.sql /app/internal/db/schema.sql + +USER appuser + +# default config path inside container +ENTRYPOINT ["/app/ingestd"] +CMD ["-config", "/app/config.yaml"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..03ab2a3 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# go-weatherstation + +Starter go-weatherstationrology data pipeline: +- MQTT ingest of WS90 payloads -> TimescaleDB +- Baseline forecast polling (Open-Meteo) -> TimescaleDB + +## Run +1) Start services: + docker compose up -d + +2) Copy config: + cp config.example.yaml config.yaml + # edit mqtt topic/broker + site lat/lon + +3) Run: + go run ./cmd/ingestd -config config.yaml + +## Publish a test WS90 payload +mosquitto_pub -h localhost -t ecowitt/ws90 -m '{"model":"Fineoffset-WS90","id":70618,"battery_ok":1,"battery_mV":3180,"temperature_C":24.2,"humidity":60,"wind_dir_deg":129,"wind_avg_m_s":0,"wind_max_m_s":0,"uvi":0,"light_lux":0,"flags":130,"rain_mm":0,"rain_start":0,"supercap_V":0.5,"firmware":160,"data":"3fff000000------0000ff7ff70000","mic":"CRC","protocol":"Fine Offset Electronics WS90 weather station","rssi":-44,"duration":32996}' diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go new file mode 100644 index 0000000..350dea6 --- /dev/null +++ b/cmd/ingestd/main.go @@ -0,0 +1,207 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + "go-weatherstation/internal/config" + "go-weatherstation/internal/db" + "go-weatherstation/internal/mqttingest" + "go-weatherstation/internal/providers" +) + +func main() { + var cfgPath string + flag.StringVar(&cfgPath, "config", "config.yaml", "path to config yaml") + flag.Parse() + + cfg, err := config.Load(cfgPath) + if err != nil { + log.Fatalf("config load: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Shutdown handling + go func() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + cancel() + }() + + latest := &mqttingest.Latest{} + + d, err := db.Open(ctx, cfg.DB.ConnString) + if err != nil { + log.Fatalf("db open: %v", err) + } + defer d.Close() + + site := providers.Site{ + Name: cfg.Site.Name, + Latitude: cfg.Site.Latitude, + Longitude: cfg.Site.Longitude, + } + + // Start Open-Meteo poller (optional) + if cfg.Pollers.OpenMeteo.Enabled { + go runOpenMeteoPoller(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval) + } + + if cfg.Wunderground.Enabled { + go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval) + } + + // MQTT subscriber (blocks until ctx done) + err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{ + Broker: cfg.MQTT.Broker, + ClientID: cfg.MQTT.ClientID, + Username: cfg.MQTT.Username, + Password: cfg.MQTT.Password, + Topic: cfg.MQTT.Topic, + QoS: cfg.MQTT.QoS, + }, func(ctx context.Context, topic string, payload []byte) error { + p, raw, err := mqttingest.ParseWS90(payload) + if err != nil { + log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload)) + return nil + } + + // Use receive time as observation ts (WS90 payload doesn't include a timestamp). + ts := time.Now().UTC() + + latest.Update(ts, p) + + if err := d.InsertWS90(ctx, db.InsertWS90Params{ + TS: ts, + Site: cfg.Site.Name, + StationID: p.ID, + Model: p.Model, + BatteryOK: p.BatteryOK, + BatteryMV: p.BatteryMV, + TempC: p.TemperatureC, + Humidity: p.Humidity, + WindDirDeg: p.WindDirDeg, + WindAvgMS: p.WindAvgMS, + WindMaxMS: p.WindMaxMS, + UVI: p.UVI, + LightLux: p.LightLux, + Flags: p.Flags, + RainMM: p.RainMM, + RainStart: p.RainStart, + SupercapV: p.SupercapV, + Firmware: p.Firmware, + RawData: p.Data, + MIC: p.MIC, + Protocol: p.Protocol, + RSSI: p.RSSI, + Duration: p.Duration, + Payload: raw, + }); err != nil { + log.Printf("db insert ws90 error: %v", err) + } + + return nil + }) + if err != nil { + log.Fatalf("mqtt subscriber: %v", err) + } +} + +func runOpenMeteoPoller(ctx context.Context, d *db.DB, site providers.Site, model string, interval time.Duration) { + p := &providers.OpenMeteo{} + t := time.NewTicker(interval) + defer t.Stop() + + // poll immediately at startup + pollOnce(ctx, d, p, site, model) + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + pollOnce(ctx, d, p, site, model) + } + } +} + +func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, stationID, stationKey string, interval time.Duration) { + client := &providers.WundergroundClient{} + t := time.NewTicker(interval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + snap, ok := latest.Snapshot() + if !ok { + log.Printf("wunderground: no data yet") + continue + } + + up := providers.WUUpload{ + StationID: stationID, + StationKey: stationKey, + TempC: snap.P.TemperatureC, + Humidity: snap.P.Humidity, + WindDirDeg: snap.P.WindDirDeg, + WindAvgMS: snap.P.WindAvgMS, + WindMaxMS: snap.P.WindMaxMS, + UVI: snap.P.UVI, + RainLastHourMM: snap.RainLastHourMM, + DailyRainMM: snap.DailyRainMM, + DateUTC: "now", + } + + resp, err := client.Upload(ctx, up) + if err != nil { + log.Printf("wunderground upload failed: %v", err) + continue + } + log.Printf("wunderground upload ok: %s", resp) + } + } +} + +func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site providers.Site, model string) { + res, err := p.Fetch(ctx.Done(), site, model) + if err != nil { + log.Printf("forecast fetch error provider=%s err=%v", p.Name(), err) + return + } + + for _, pt := range res.Hourly { + err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{ + TS: pt.TS, + RetrievedAt: res.RetrievedAt, + Site: site.Name, + Model: res.Model, + TempC: pt.TempC, + RH: pt.RH, + PressureMSLH: pt.PressureMSLH, + WindMS: pt.WindMS, + WindGustMS: pt.WindGustMS, + WindDirDeg: pt.WindDirDeg, + PrecipMM: pt.PrecipMM, + PrecipProb: pt.PrecipProb, + CloudCover: pt.CloudCover, + SourcePayload: res.Raw, + }) + if err != nil { + log.Printf("forecast upsert error ts=%s err=%v", pt.TS.Format(time.RFC3339), err) + } + } + + log.Printf("forecast stored provider=%s model=%s points=%d retrieved_at=%s", + p.Name(), res.Model, len(res.Hourly), res.RetrievedAt.Format(time.RFC3339)) +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..36d3f05 --- /dev/null +++ b/config.yaml @@ -0,0 +1,26 @@ +mqtt: + broker: "tcp://mosquitto:1883" + client_id: "go-weatherstation-ingestd" + topic: "ecowitt/ws90" + qos: 1 + +db: + conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable" + +site: + name: "home" + latitude: -33.8688 + longitude: 151.2093 + elevation_m: 50 + +pollers: + open_meteo: + enabled: true + interval: "30m" + model: "ecmwf" + +wunderground: + enabled: true + station_id: "ISSYDWXYZ123" + station_key: "your_station_key_here" + interval: "60s" diff --git a/db/init/001_schema.sql b/db/init/001_schema.sql new file mode 100644 index 0000000..3e8a29b --- /dev/null +++ b/db/init/001_schema.sql @@ -0,0 +1,191 @@ +CREATE EXTENSION IF NOT EXISTS timescaledb; + +-- WS90 observations (as received) +CREATE TABLE IF NOT EXISTS observations_ws90 ( + ts TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT now(), + site TEXT NOT NULL, + station_id BIGINT NOT NULL, + + model TEXT, + battery_ok SMALLINT, + battery_mv INT, + + temperature_c DOUBLE PRECISION, + humidity DOUBLE PRECISION, + + wind_dir_deg DOUBLE PRECISION, + wind_avg_m_s DOUBLE PRECISION, + wind_max_m_s DOUBLE PRECISION, + + uvi DOUBLE PRECISION, + light_lux DOUBLE PRECISION, + + flags INT, + rain_mm DOUBLE PRECISION, + rain_start BIGINT, + + supercap_v DOUBLE PRECISION, + firmware INT, + raw_data TEXT, + mic TEXT, + protocol TEXT, + rssi INT, + duration BIGINT, + + payload_json JSONB +); + +SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC); +CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC); + +-- Open-Meteo hourly baseline forecasts +CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly ( + ts TIMESTAMPTZ NOT NULL, + retrieved_at TIMESTAMPTZ NOT NULL, + site TEXT NOT NULL, + model TEXT NOT NULL, + + temp_c DOUBLE PRECISION, + rh DOUBLE PRECISION, + pressure_msl_hpa DOUBLE PRECISION, + + wind_m_s DOUBLE PRECISION, + wind_gust_m_s DOUBLE PRECISION, + wind_dir_deg DOUBLE PRECISION, + + precip_mm DOUBLE PRECISION, + precip_prob DOUBLE PRECISION, + cloud_cover DOUBLE PRECISION, + + source_json JSONB, + + PRIMARY KEY (site, model, retrieved_at, ts) +); + +SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC); + +-- Raw retention: 90 days +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs j + WHERE j.proc_name = 'policy_retention' + AND j.hypertable_name = 'observations_ws90' + ) THEN + PERFORM add_retention_policy('observations_ws90', INTERVAL '90 days'); + END IF; +END$$; + +-- Compression after 7 days (recommended) +ALTER TABLE observations_ws90 SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'site,station_id' +); + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'observations_ws90' + ) THEN + PERFORM add_compression_policy('observations_ws90', INTERVAL '7 days'); + END IF; +END$$; + +-- 1-minute continuous aggregate +CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m +WITH (timescaledb.continuous) AS +SELECT + time_bucket(INTERVAL '1 minute', ts) AS bucket, + site, + station_id, + + avg(temperature_c) AS temp_c_avg, + min(temperature_c) AS temp_c_min, + max(temperature_c) AS temp_c_max, + + avg(humidity) AS rh_avg, + min(humidity) AS rh_min, + max(humidity) AS rh_max, + + avg(wind_avg_m_s) AS wind_avg_ms_avg, + max(wind_max_m_s) AS wind_gust_ms_max, + avg(wind_dir_deg) AS wind_dir_deg_avg, + + max(uvi) AS uvi_max, + max(light_lux) AS light_lux_max, + + min(rain_mm) AS rain_mm_min, + max(rain_mm) AS rain_mm_max + +FROM observations_ws90 +GROUP BY 1,2,3; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs + WHERE proc_name = 'policy_refresh_continuous_aggregate' + AND hypertable_name = 'cagg_ws90_1m' + ) THEN + PERFORM add_continuous_aggregate_policy('cagg_ws90_1m', + start_offset => INTERVAL '2 hours', + end_offset => INTERVAL '2 minutes', + schedule_interval => INTERVAL '5 minutes' + ); + END IF; +END$$; + +-- 5-minute continuous aggregate (optional but useful) +CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_5m +WITH (timescaledb.continuous) AS +SELECT + time_bucket(INTERVAL '5 minutes', ts) AS bucket, + site, + station_id, + + avg(temperature_c) AS temp_c_avg, + min(temperature_c) AS temp_c_min, + max(temperature_c) AS temp_c_max, + + avg(humidity) AS rh_avg, + min(humidity) AS rh_min, + max(humidity) AS rh_max, + + avg(wind_avg_m_s) AS wind_avg_ms_avg, + max(wind_max_m_s) AS wind_gust_ms_max, + avg(wind_dir_deg) AS wind_dir_deg_avg, + + max(uvi) AS uvi_max, + max(light_lux) AS light_lux_max, + + min(rain_mm) AS rain_mm_min, + max(rain_mm) AS rain_mm_max + +FROM observations_ws90 +GROUP BY 1,2,3; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs + WHERE proc_name = 'policy_refresh_continuous_aggregate' + AND hypertable_name = 'cagg_ws90_5m' + ) THEN + PERFORM add_continuous_aggregate_policy('cagg_ws90_5m', + start_offset => INTERVAL '24 hours', + end_offset => INTERVAL '10 minutes', + schedule_interval => INTERVAL '15 minutes' + ); + END IF; +END$$; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..70f770f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,26 @@ +services: + timescaledb: + image: timescale/timescaledb:latest-pg16 + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + POSTGRES_DB: micrometeo + ports: + - "5432:5432" + volumes: + - tsdata:/var/lib/postgresql/data + # runs on first DB initialization only + - ./db/init:/docker-entrypoint-initdb.d:ro + + ingestd: + build: + context: . + dockerfile: Dockerfile + depends_on: + - timescaledb + restart: unless-stopped + volumes: + - ./config.yaml:/app/config.yaml:ro + +volumes: + tsdata: \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..65ee7ac --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module go-weatherstation + +go 1.25 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/jackc/pgx/v5 v5.8.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/gorilla/websocket v1.5.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..563ff1f --- /dev/null +++ b/go.sum @@ -0,0 +1,41 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..290827d --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,86 @@ +package config + +import ( + "errors" + "os" + "time" + + "gopkg.in/yaml.v3" +) + +type Config struct { + LogLevel string `yaml:"log_level"` + + MQTT struct { + Broker string `yaml:"broker"` + ClientID string `yaml:"client_id"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Topic string `yaml:"topic"` + QoS byte `yaml:"qos"` + } `yaml:"mqtt"` + + DB struct { + ConnString string `yaml:"conn_string"` + } `yaml:"db"` + + Site struct { + Name string `yaml:"name"` + Latitude float64 `yaml:"latitude"` + Longitude float64 `yaml:"longitude"` + ElevationM float64 `yaml:"elevation_m"` + } `yaml:"site"` + + Pollers struct { + OpenMeteo struct { + Enabled bool `yaml:"enabled"` + Interval time.Duration `yaml:"interval"` + Model string `yaml:"model"` + } `yaml:"open_meteo"` + } `yaml:"pollers"` + + Wunderground struct { + Enabled bool `yaml:"enabled"` + StationID string `yaml:"station_id"` + StationKey string `yaml:"station_key"` + Interval time.Duration `yaml:"interval"` + } `yaml:"wunderground"` +} + +func Load(path string) (*Config, error) { + b, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var c Config + if err := yaml.Unmarshal(b, &c); err != nil { + return nil, err + } + + // Minimal validation + if c.MQTT.Broker == "" || c.MQTT.Topic == "" { + return nil, errors.New("mqtt broker and topic are required") + } + if c.DB.ConnString == "" { + return nil, errors.New("db conn_string is required") + } + if c.Site.Name == "" { + c.Site.Name = "default" + } + if c.Pollers.OpenMeteo.Model == "" { + c.Pollers.OpenMeteo.Model = "ecmwf" + } + if c.Pollers.OpenMeteo.Interval == 0 { + c.Pollers.OpenMeteo.Interval = 30 * time.Minute + } + if c.Wunderground.Interval == 0 { + c.Wunderground.Interval = 60 * time.Second + } + // If enabled, require creds + if c.Wunderground.Enabled && (c.Wunderground.StationID == "" || c.Wunderground.StationKey == "") { + return nil, errors.New("wunderground enabled but station_id/station_key not set") + } + + return &c, nil +} diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..0d7c985 --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,41 @@ +package db + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type DB struct { + Pool *pgxpool.Pool +} + +func Open(ctx context.Context, connString string) (*DB, error) { + cfg, err := pgxpool.ParseConfig(connString) + if err != nil { + return nil, err + } + cfg.MaxConns = 10 + cfg.MinConns = 1 + cfg.MaxConnIdleTime = 5 * time.Minute + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, err + } + + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("db ping failed: %w", err) + } + + return &DB{Pool: pool}, nil +} + +func (d *DB) Close() { + if d.Pool != nil { + d.Pool.Close() + } +} diff --git a/internal/db/queries.go b/internal/db/queries.go new file mode 100644 index 0000000..db46145 --- /dev/null +++ b/internal/db/queries.go @@ -0,0 +1,128 @@ +package db + +import ( + "context" + "encoding/json" + "time" + + "github.com/jackc/pgx/v5/pgtype" +) + +type InsertWS90Params struct { + TS time.Time + Site string + StationID int64 + Model string + BatteryOK int + BatteryMV int + TempC float64 + Humidity float64 + WindDirDeg float64 + WindAvgMS float64 + WindMaxMS float64 + UVI float64 + LightLux float64 + Flags int + RainMM float64 + RainStart int64 + SupercapV float64 + Firmware int + RawData string + MIC string + Protocol string + RSSI int + Duration int64 + Payload map[string]any +} + +func (d *DB) InsertWS90(ctx context.Context, p InsertWS90Params) error { + b, _ := json.Marshal(p.Payload) + + var payloadJSON pgtype.JSONB + _ = payloadJSON.Set(b) + + _, err := d.Pool.Exec(ctx, ` + INSERT INTO observations_ws90 ( + ts, site, station_id, model, battery_ok, battery_mv, + temperature_c, humidity, + wind_dir_deg, wind_avg_m_s, wind_max_m_s, + uvi, light_lux, flags, + rain_mm, rain_start, + supercap_v, firmware, raw_data, mic, protocol, + rssi, duration, payload_json + ) VALUES ( + $1,$2,$3,$4,$5,$6, + $7,$8, + $9,$10,$11, + $12,$13,$14, + $15,$16, + $17,$18,$19,$20,$21, + $22,$23,$24 + ) + `, p.TS, p.Site, p.StationID, p.Model, p.BatteryOK, p.BatteryMV, + p.TempC, p.Humidity, + p.WindDirDeg, p.WindAvgMS, p.WindMaxMS, + p.UVI, p.LightLux, p.Flags, + p.RainMM, p.RainStart, + p.SupercapV, p.Firmware, p.RawData, p.MIC, p.Protocol, + p.RSSI, p.Duration, payloadJSON) + + return err +} + +type InsertOpenMeteoHourlyParams struct { + TS time.Time + RetrievedAt time.Time + Site string + Model string + TempC *float64 + RH *float64 + PressureMSLH *float64 + WindMS *float64 + WindGustMS *float64 + WindDirDeg *float64 + PrecipMM *float64 + PrecipProb *float64 + CloudCover *float64 + SourcePayload map[string]any +} + +func (d *DB) UpsertOpenMeteoHourly(ctx context.Context, p InsertOpenMeteoHourlyParams) error { + b, _ := json.Marshal(p.SourcePayload) + + var sourceJSON pgtype.JSONB + _ = sourceJSON.Set(b) + + _, err := d.Pool.Exec(ctx, ` + INSERT INTO forecast_openmeteo_hourly ( + ts, retrieved_at, site, model, + temp_c, rh, pressure_msl_hpa, + wind_m_s, wind_gust_m_s, wind_dir_deg, + precip_mm, precip_prob, cloud_cover, + source_json + ) VALUES ( + $1,$2,$3,$4, + $5,$6,$7, + $8,$9,$10, + $11,$12,$13, + $14 + ) + ON CONFLICT (site, model, retrieved_at, ts) DO UPDATE SET + temp_c = EXCLUDED.temp_c, + rh = EXCLUDED.rh, + pressure_msl_hpa = EXCLUDED.pressure_msl_hpa, + wind_m_s = EXCLUDED.wind_m_s, + wind_gust_m_s = EXCLUDED.wind_gust_m_s, + wind_dir_deg = EXCLUDED.wind_dir_deg, + precip_mm = EXCLUDED.precip_mm, + precip_prob = EXCLUDED.precip_prob, + cloud_cover = EXCLUDED.cloud_cover, + source_json = EXCLUDED.source_json + `, p.TS, p.RetrievedAt, p.Site, p.Model, + p.TempC, p.RH, p.PressureMSLH, + p.WindMS, p.WindGustMS, p.WindDirDeg, + p.PrecipMM, p.PrecipProb, p.CloudCover, + sourceJSON) + + return err +} diff --git a/internal/mqttingest/latest.go b/internal/mqttingest/latest.go new file mode 100644 index 0000000..9403e8a --- /dev/null +++ b/internal/mqttingest/latest.go @@ -0,0 +1,171 @@ +package mqttingest + +import ( + "sync" + "time" +) + +type rainMode int + +const ( + rainModeUnknown rainMode = iota + rainModeCumulative + rainModeIncremental +) + +type Latest struct { + mu sync.RWMutex + + lastTS time.Time + last *WS90Payload + + // Rain tracking + mode rainMode + lastRainMM *float64 + + // rolling sums built from "rain increment" values (mm) + rainIncs []rainIncPoint // last 1h + dailyIncs []rainIncPoint // since midnight (or since start; we’ll trim daily by midnight) +} + +type rainIncPoint struct { + ts time.Time + mm float64 // incremental rainfall at this timestamp (mm) +} + +func (l *Latest) Update(ts time.Time, p *WS90Payload) { + l.mu.Lock() + defer l.mu.Unlock() + + l.lastTS = ts + l.last = p + + inc := l.computeRainIncrement(ts, p.RainMM) + + // Track last hour increments + l.rainIncs = append(l.rainIncs, rainIncPoint{ts: ts, mm: inc}) + cutoff := ts.Add(-1 * time.Hour) + l.rainIncs = trimBefore(l.rainIncs, cutoff) + + // Track daily increments: trim before local midnight + l.dailyIncs = append(l.dailyIncs, rainIncPoint{ts: ts, mm: inc}) + midnight := localMidnight(ts) + l.dailyIncs = trimBefore(l.dailyIncs, midnight) +} + +func trimBefore(a []rainIncPoint, cutoff time.Time) []rainIncPoint { + i := 0 + for ; i < len(a); i++ { + if !a[i].ts.Before(cutoff) { + break + } + } + if i > 0 { + return a[i:] + } + return a +} + +// localMidnight returns midnight in the local timezone of the *process*. +// If you want a specific timezone (e.g. Australia/Sydney) we can wire that in later. +func localMidnight(t time.Time) time.Time { + lt := t.Local() + return time.Date(lt.Year(), lt.Month(), lt.Day(), 0, 0, 0, 0, lt.Location()) +} + +// computeRainIncrement returns the “incremental rain” in mm for this sample, +// regardless of whether the incoming rain_mm is cumulative or incremental. +func (l *Latest) computeRainIncrement(ts time.Time, rainMM float64) float64 { + // First sample: we can’t infer anything yet + if l.lastRainMM == nil { + l.lastRainMM = &rainMM + return 0 + } + + prev := *l.lastRainMM + l.lastRainMM = &rainMM + + // Heuristic: + // - If value often stays 0 and occasionally jumps by small amounts, it might be cumulative OR incremental. + // - If it monotonically increases over time (with occasional resets), that’s cumulative. + // - If it is usually small per message (e.g. 0, 0.2, 0, 0, 0.2) and not trending upward, that’s incremental. + // + // We’ll decide based on “trendiness” and deltas: + delta := rainMM - prev + + // Handle reset (counter rollover / daily reset / device reboot) + if delta < -0.001 { + // If cumulative, after reset the “increment” is 0 for that sample. + // If incremental, a reset doesn’t really make sense but we still treat as 0. + if l.mode == rainModeUnknown { + l.mode = rainModeCumulative + } + return 0 + } + + // If we already decided + switch l.mode { + case rainModeCumulative: + if delta > 0 { + return delta + } + return 0 + case rainModeIncremental: + // in incremental mode we treat the sample as “this message’s rain” + if rainMM > 0 { + return rainMM + } + return 0 + } + + // Decide mode (unknown): + // If delta is consistently positive when rainMM > 0, cumulative is likely. + // If delta is ~0 while rainMM occasionally > 0, incremental is likely. + // + // Single-sample heuristic: + // - if rainMM > 0 and delta > 0 => lean cumulative + // - if rainMM > 0 and delta ~ 0 => lean incremental + if rainMM > 0 { + if delta > 0.0009 { + l.mode = rainModeCumulative + return delta + } + // delta near zero but rainMM nonzero suggests incremental + l.mode = rainModeIncremental + return rainMM + } + + return 0 +} + +type Snapshot struct { + TS time.Time + P WS90Payload + + RainLastHourMM float64 + DailyRainMM float64 +} + +func (l *Latest) Snapshot() (Snapshot, bool) { + l.mu.RLock() + defer l.mu.RUnlock() + + if l.last == nil || l.lastTS.IsZero() { + return Snapshot{}, false + } + + var hourSum, daySum float64 + for _, rp := range l.rainIncs { + hourSum += rp.mm + } + for _, rp := range l.dailyIncs { + daySum += rp.mm + } + + return Snapshot{ + TS: l.lastTS, + P: *l.last, + RainLastHourMM: hourSum, + DailyRainMM: daySum, + }, true +} diff --git a/internal/mqttingest/mqtt.go b/internal/mqttingest/mqtt.go new file mode 100644 index 0000000..bdcd961 --- /dev/null +++ b/internal/mqttingest/mqtt.go @@ -0,0 +1,53 @@ +package mqttingest + +import ( + "context" + "fmt" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type MQTTConfig struct { + Broker string + ClientID string + Username string + Password string + Topic string + QoS byte +} + +type Handler func(ctx context.Context, topic string, payload []byte) error + +func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error { + opts := mqtt.NewClientOptions(). + AddBroker(cfg.Broker). + SetClientID(cfg.ClientID). + SetAutoReconnect(true). + SetConnectRetry(true). + SetConnectRetryInterval(5 * time.Second) + + if cfg.Username != "" { + opts.SetUsername(cfg.Username) + opts.SetPassword(cfg.Password) + } + + client := mqtt.NewClient(opts) + if tok := client.Connect(); tok.Wait() && tok.Error() != nil { + return fmt.Errorf("mqtt connect: %w", tok.Error()) + } + + // Subscribe + if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) { + // Keep callback short; do work with context + _ = h(ctx, msg.Topic(), msg.Payload()) + }); tok.Wait() && tok.Error() != nil { + client.Disconnect(250) + return fmt.Errorf("mqtt subscribe: %w", tok.Error()) + } + + // Block until ctx cancelled + <-ctx.Done() + client.Disconnect(250) + return nil +} diff --git a/internal/mqttingest/ws90.go b/internal/mqttingest/ws90.go new file mode 100644 index 0000000..6e4f242 --- /dev/null +++ b/internal/mqttingest/ws90.go @@ -0,0 +1,39 @@ +package mqttingest + +import "encoding/json" + +// WS90Payload matches your JSON keys. +type WS90Payload struct { + Model string `json:"model"` + ID int64 `json:"id"` + BatteryOK int `json:"battery_ok"` + BatteryMV int `json:"battery_mV"` + TemperatureC float64 `json:"temperature_C"` + Humidity float64 `json:"humidity"` + WindDirDeg float64 `json:"wind_dir_deg"` + WindAvgMS float64 `json:"wind_avg_m_s"` + WindMaxMS float64 `json:"wind_max_m_s"` + UVI float64 `json:"uvi"` + LightLux float64 `json:"light_lux"` + Flags int `json:"flags"` + RainMM float64 `json:"rain_mm"` + RainStart int64 `json:"rain_start"` + SupercapV float64 `json:"supercap_V"` + Firmware int `json:"firmware"` + Data string `json:"data"` + MIC string `json:"mic"` + Protocol string `json:"protocol"` + RSSI int `json:"rssi"` + Duration int64 `json:"duration"` +} + +func ParseWS90(b []byte) (*WS90Payload, map[string]any, error) { + var p WS90Payload + if err := json.Unmarshal(b, &p); err != nil { + return nil, nil, err + } + // Keep the full payload as JSONB too. + var raw map[string]any + _ = json.Unmarshal(b, &raw) + return &p, raw, nil +} diff --git a/internal/providers/openmeteo.go b/internal/providers/openmeteo.go new file mode 100644 index 0000000..8e5867f --- /dev/null +++ b/internal/providers/openmeteo.go @@ -0,0 +1,166 @@ +package providers + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" +) + +type OpenMeteo struct { + Client *http.Client +} + +func (o *OpenMeteo) Name() string { return "open_meteo" } + +func (o *OpenMeteo) Fetch(ctxDone <-chan struct{}, site Site, model string) (*ForecastResult, error) { + if o.Client == nil { + o.Client = &http.Client{Timeout: 15 * time.Second} + } + + // Hourly fields that are useful for bias-correction / training + hourly := []string{ + "temperature_2m", + "relative_humidity_2m", + "pressure_msl", + "wind_speed_10m", + "wind_gusts_10m", + "wind_direction_10m", + "precipitation", + "precipitation_probability", + "cloud_cover", + } + + u, _ := url.Parse("https://api.open-meteo.com/v1/forecast") + q := u.Query() + q.Set("latitude", fmt.Sprintf("%.6f", site.Latitude)) + q.Set("longitude", fmt.Sprintf("%.6f", site.Longitude)) + q.Set("hourly", join(hourly)) + q.Set("timezone", "UTC") + q.Set("models", model) // e.g. "ecmwf" + q.Set("forecast_days", "7") // keep it short; you can increase later + u.RawQuery = q.Encode() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { <-ctxDone; cancel() }() + + req, _ := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + resp, err := o.Client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("open-meteo HTTP %d", resp.StatusCode) + } + + var raw map[string]any + if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil { + return nil, err + } + + // Parse the hourly arrays + hr, ok := raw["hourly"].(map[string]any) + if !ok { + return nil, fmt.Errorf("unexpected open-meteo payload: missing hourly") + } + + times, err := parseTimeArray(hr["time"]) + if err != nil { + return nil, err + } + + // Helpers pull float arrays (may be absent) + temp := floatArray(hr["temperature_2m"]) + rh := floatArray(hr["relative_humidity_2m"]) + msl := floatArray(hr["pressure_msl"]) + ws := floatArray(hr["wind_speed_10m"]) + gust := floatArray(hr["wind_gusts_10m"]) + wd := floatArray(hr["wind_direction_10m"]) + precip := floatArray(hr["precipitation"]) + pprob := floatArray(hr["precipitation_probability"]) + cloud := floatArray(hr["cloud_cover"]) + + points := make([]HourlyForecastPoint, 0, len(times)) + for i := range times { + points = append(points, HourlyForecastPoint{ + TS: times[i], + TempC: idx(temp, i), + RH: idx(rh, i), + PressureMSLH: idx(msl, i), + WindMS: idx(ws, i), + WindGustMS: idx(gust, i), + WindDirDeg: idx(wd, i), + PrecipMM: idx(precip, i), + PrecipProb: idx(pprob, i), + CloudCover: idx(cloud, i), + }) + } + + return &ForecastResult{ + RetrievedAt: time.Now().UTC(), + Model: model, + Hourly: points, + Raw: raw, + }, nil +} + +func join(items []string) string { + out := "" + for i, s := range items { + if i > 0 { + out += "," + } + out += s + } + return out +} + +func parseTimeArray(v any) ([]time.Time, error) { + arr, ok := v.([]any) + if !ok { + return nil, fmt.Errorf("hourly.time not array") + } + out := make([]time.Time, 0, len(arr)) + for _, x := range arr { + s, _ := x.(string) + t, err := time.Parse("2006-01-02T15:04", s) // open-meteo uses ISO without seconds + if err != nil { + return nil, err + } + out = append(out, t.UTC()) + } + return out, nil +} + +func floatArray(v any) []float64 { + arr, ok := v.([]any) + if !ok { + return nil + } + out := make([]float64, 0, len(arr)) + for _, x := range arr { + switch n := x.(type) { + case float64: + out = append(out, n) + case int: + out = append(out, float64(n)) + default: + // if null or unexpected, use NaN sentinel? We'll instead skip by storing nil via idx() + out = append(out, 0) + } + } + return out +} + +func idx(a []float64, i int) *float64 { + if a == nil || i < 0 || i >= len(a) { + return nil + } + v := a[i] + return &v +} diff --git a/internal/providers/types.go b/internal/providers/types.go new file mode 100644 index 0000000..ef15ae5 --- /dev/null +++ b/internal/providers/types.go @@ -0,0 +1,34 @@ +package providers + +import "time" + +type Site struct { + Name string + Latitude float64 + Longitude float64 +} + +type HourlyForecastPoint struct { + TS time.Time + TempC *float64 + RH *float64 + PressureMSLH *float64 + WindMS *float64 + WindGustMS *float64 + WindDirDeg *float64 + PrecipMM *float64 + PrecipProb *float64 + CloudCover *float64 +} + +type ForecastResult struct { + RetrievedAt time.Time + Model string + Hourly []HourlyForecastPoint + Raw map[string]any +} + +type Provider interface { + Name() string + Fetch(ctxDone <-chan struct{}, site Site, model string) (*ForecastResult, error) +} diff --git a/internal/providers/wunderground.go b/internal/providers/wunderground.go new file mode 100644 index 0000000..1e14faa --- /dev/null +++ b/internal/providers/wunderground.go @@ -0,0 +1,99 @@ +package providers + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strconv" + "time" +) + +// WU PWS upload protocol endpoint. :contentReference[oaicite:1]{index=1} +const wuEndpoint = "https://rtupdate.wunderground.com/weatherstation/updateweatherstation.php" + +type WundergroundClient struct { + HTTP *http.Client +} + +type WUUpload struct { + StationID string + StationKey string + + // Units required by WU: + // tempf (F), wind mph, baromin (inHg), rain inches. + TempC float64 + Humidity float64 + WindDirDeg float64 + WindAvgMS float64 + WindMaxMS float64 + UVI float64 + SolarWm2 *float64 // your payload is lux; leave nil unless you add W/m^2 later + + RainLastHourMM float64 + DailyRainMM float64 + + DateUTC string // "now" recommended +} + +func (c *WundergroundClient) Upload(ctx context.Context, u WUUpload) (string, error) { + if c.HTTP == nil { + c.HTTP = &http.Client{Timeout: 10 * time.Second} + } + + q := url.Values{} + q.Set("ID", u.StationID) + q.Set("PASSWORD", u.StationKey) + q.Set("action", "updateraw") + q.Set("dateutc", u.DateUTC) // "now" ok :contentReference[oaicite:2]{index=2} + + // Required-ish observation fields (only set what we have) + q.Set("tempf", fmt.Sprintf("%.2f", cToF(u.TempC))) + q.Set("humidity", fmt.Sprintf("%.0f", u.Humidity)) + + q.Set("winddir", fmt.Sprintf("%.0f", u.WindDirDeg)) + q.Set("windspeedmph", fmt.Sprintf("%.2f", msToMph(u.WindAvgMS))) + q.Set("windgustmph", fmt.Sprintf("%.2f", msToMph(u.WindMaxMS))) + + // Rain (inches) + q.Set("rainin", fmt.Sprintf("%.4f", mmToIn(u.RainLastHourMM))) + q.Set("dailyrainin", fmt.Sprintf("%.4f", mmToIn(u.DailyRainMM))) + + // UV index + q.Set("UV", fmt.Sprintf("%.2f", u.UVI)) + + // NOTE: your WS90 payload provides light in lux, not W/m^2. + // WU expects solarradiation in W/m^2, so we omit it unless you add a conversion/actual sensor field. + if u.SolarWm2 != nil { + q.Set("solarradiation", fmt.Sprintf("%.2f", *u.SolarWm2)) + } + + // Optional: a software identifier + q.Set("softwaretype", "go-weatherstation-go") + + reqURL := wuEndpoint + "?" + q.Encode() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return "", err + } + + resp, err := c.HTTP.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // WU typically returns plain text like "success" + if resp.StatusCode/100 != 2 { + return "", fmt.Errorf("wunderground upload HTTP %d", resp.StatusCode) + } + + // Read small body + buf := make([]byte, 2048) + n, _ := resp.Body.Read(buf) + return strconv.Quote(string(buf[:n])), nil +} + +func cToF(c float64) float64 { return (c * 9.0 / 5.0) + 32.0 } +func msToMph(ms float64) float64 { return ms * 2.2369362920544 } +func mmToIn(mm float64) float64 { return mm / 25.4 }