first commit

This commit is contained in:
2026-01-26 12:40:47 +11:00
commit adaa57f9e2
17 changed files with 1382 additions and 0 deletions

34
Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# ---- build stage ----
FROM golang:1.25-alpine AS build
WORKDIR /src
# Git CA certs (for go modules / HTTPS)
RUN apk add --no-cache ca-certificates
# Copy mod files first for better caching
COPY go.mod go.sum ./
RUN go mod download
# Copy the rest of the source
COPY . .
# Build a static-ish binary (alpine musl)
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
go build -trimpath -ldflags="-s -w" -o /out/ingestd ./cmd/ingestd
# ---- runtime stage ----
FROM alpine:3.22
WORKDIR /app
RUN apk add --no-cache ca-certificates && \
adduser -D -H -u 10001 appuser
# Copy binary + schema file used at startup
COPY --from=build /out/ingestd /app/ingestd
COPY internal/db/schema.sql /app/internal/db/schema.sql
USER appuser
# default config path inside container
ENTRYPOINT ["/app/ingestd"]
CMD ["-config", "/app/config.yaml"]

19
README.md Normal file
View File

@@ -0,0 +1,19 @@
# go-weatherstation
Starter go-weatherstationrology data pipeline:
- MQTT ingest of WS90 payloads -> TimescaleDB
- Baseline forecast polling (Open-Meteo) -> TimescaleDB
## Run
1) Start services:
docker compose up -d
2) Copy config:
cp config.example.yaml config.yaml
# edit mqtt topic/broker + site lat/lon
3) Run:
go run ./cmd/ingestd -config config.yaml
## Publish a test WS90 payload
mosquitto_pub -h localhost -t ecowitt/ws90 -m '{"model":"Fineoffset-WS90","id":70618,"battery_ok":1,"battery_mV":3180,"temperature_C":24.2,"humidity":60,"wind_dir_deg":129,"wind_avg_m_s":0,"wind_max_m_s":0,"uvi":0,"light_lux":0,"flags":130,"rain_mm":0,"rain_start":0,"supercap_V":0.5,"firmware":160,"data":"3fff000000------0000ff7ff70000","mic":"CRC","protocol":"Fine Offset Electronics WS90 weather station","rssi":-44,"duration":32996}'

207
cmd/ingestd/main.go Normal file
View File

@@ -0,0 +1,207 @@
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"
"go-weatherstation/internal/config"
"go-weatherstation/internal/db"
"go-weatherstation/internal/mqttingest"
"go-weatherstation/internal/providers"
)
func main() {
var cfgPath string
flag.StringVar(&cfgPath, "config", "config.yaml", "path to config yaml")
flag.Parse()
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("config load: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Shutdown handling
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
cancel()
}()
latest := &mqttingest.Latest{}
d, err := db.Open(ctx, cfg.DB.ConnString)
if err != nil {
log.Fatalf("db open: %v", err)
}
defer d.Close()
site := providers.Site{
Name: cfg.Site.Name,
Latitude: cfg.Site.Latitude,
Longitude: cfg.Site.Longitude,
}
// Start Open-Meteo poller (optional)
if cfg.Pollers.OpenMeteo.Enabled {
go runOpenMeteoPoller(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval)
}
if cfg.Wunderground.Enabled {
go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval)
}
// MQTT subscriber (blocks until ctx done)
err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{
Broker: cfg.MQTT.Broker,
ClientID: cfg.MQTT.ClientID,
Username: cfg.MQTT.Username,
Password: cfg.MQTT.Password,
Topic: cfg.MQTT.Topic,
QoS: cfg.MQTT.QoS,
}, func(ctx context.Context, topic string, payload []byte) error {
p, raw, err := mqttingest.ParseWS90(payload)
if err != nil {
log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload))
return nil
}
// Use receive time as observation ts (WS90 payload doesn't include a timestamp).
ts := time.Now().UTC()
latest.Update(ts, p)
if err := d.InsertWS90(ctx, db.InsertWS90Params{
TS: ts,
Site: cfg.Site.Name,
StationID: p.ID,
Model: p.Model,
BatteryOK: p.BatteryOK,
BatteryMV: p.BatteryMV,
TempC: p.TemperatureC,
Humidity: p.Humidity,
WindDirDeg: p.WindDirDeg,
WindAvgMS: p.WindAvgMS,
WindMaxMS: p.WindMaxMS,
UVI: p.UVI,
LightLux: p.LightLux,
Flags: p.Flags,
RainMM: p.RainMM,
RainStart: p.RainStart,
SupercapV: p.SupercapV,
Firmware: p.Firmware,
RawData: p.Data,
MIC: p.MIC,
Protocol: p.Protocol,
RSSI: p.RSSI,
Duration: p.Duration,
Payload: raw,
}); err != nil {
log.Printf("db insert ws90 error: %v", err)
}
return nil
})
if err != nil {
log.Fatalf("mqtt subscriber: %v", err)
}
}
func runOpenMeteoPoller(ctx context.Context, d *db.DB, site providers.Site, model string, interval time.Duration) {
p := &providers.OpenMeteo{}
t := time.NewTicker(interval)
defer t.Stop()
// poll immediately at startup
pollOnce(ctx, d, p, site, model)
for {
select {
case <-ctx.Done():
return
case <-t.C:
pollOnce(ctx, d, p, site, model)
}
}
}
func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, stationID, stationKey string, interval time.Duration) {
client := &providers.WundergroundClient{}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
snap, ok := latest.Snapshot()
if !ok {
log.Printf("wunderground: no data yet")
continue
}
up := providers.WUUpload{
StationID: stationID,
StationKey: stationKey,
TempC: snap.P.TemperatureC,
Humidity: snap.P.Humidity,
WindDirDeg: snap.P.WindDirDeg,
WindAvgMS: snap.P.WindAvgMS,
WindMaxMS: snap.P.WindMaxMS,
UVI: snap.P.UVI,
RainLastHourMM: snap.RainLastHourMM,
DailyRainMM: snap.DailyRainMM,
DateUTC: "now",
}
resp, err := client.Upload(ctx, up)
if err != nil {
log.Printf("wunderground upload failed: %v", err)
continue
}
log.Printf("wunderground upload ok: %s", resp)
}
}
}
func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site providers.Site, model string) {
res, err := p.Fetch(ctx.Done(), site, model)
if err != nil {
log.Printf("forecast fetch error provider=%s err=%v", p.Name(), err)
return
}
for _, pt := range res.Hourly {
err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{
TS: pt.TS,
RetrievedAt: res.RetrievedAt,
Site: site.Name,
Model: res.Model,
TempC: pt.TempC,
RH: pt.RH,
PressureMSLH: pt.PressureMSLH,
WindMS: pt.WindMS,
WindGustMS: pt.WindGustMS,
WindDirDeg: pt.WindDirDeg,
PrecipMM: pt.PrecipMM,
PrecipProb: pt.PrecipProb,
CloudCover: pt.CloudCover,
SourcePayload: res.Raw,
})
if err != nil {
log.Printf("forecast upsert error ts=%s err=%v", pt.TS.Format(time.RFC3339), err)
}
}
log.Printf("forecast stored provider=%s model=%s points=%d retrieved_at=%s",
p.Name(), res.Model, len(res.Hourly), res.RetrievedAt.Format(time.RFC3339))
}

26
config.yaml Normal file
View File

@@ -0,0 +1,26 @@
mqtt:
broker: "tcp://mosquitto:1883"
client_id: "go-weatherstation-ingestd"
topic: "ecowitt/ws90"
qos: 1
db:
conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
site:
name: "home"
latitude: -33.8688
longitude: 151.2093
elevation_m: 50
pollers:
open_meteo:
enabled: true
interval: "30m"
model: "ecmwf"
wunderground:
enabled: true
station_id: "ISSYDWXYZ123"
station_key: "your_station_key_here"
interval: "60s"

191
db/init/001_schema.sql Normal file
View File

@@ -0,0 +1,191 @@
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- WS90 observations (as received)
CREATE TABLE IF NOT EXISTS observations_ws90 (
ts TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
site TEXT NOT NULL,
station_id BIGINT NOT NULL,
model TEXT,
battery_ok SMALLINT,
battery_mv INT,
temperature_c DOUBLE PRECISION,
humidity DOUBLE PRECISION,
wind_dir_deg DOUBLE PRECISION,
wind_avg_m_s DOUBLE PRECISION,
wind_max_m_s DOUBLE PRECISION,
uvi DOUBLE PRECISION,
light_lux DOUBLE PRECISION,
flags INT,
rain_mm DOUBLE PRECISION,
rain_start BIGINT,
supercap_v DOUBLE PRECISION,
firmware INT,
raw_data TEXT,
mic TEXT,
protocol TEXT,
rssi INT,
duration BIGINT,
payload_json JSONB
);
SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC);
CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC);
-- Open-Meteo hourly baseline forecasts
CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly (
ts TIMESTAMPTZ NOT NULL,
retrieved_at TIMESTAMPTZ NOT NULL,
site TEXT NOT NULL,
model TEXT NOT NULL,
temp_c DOUBLE PRECISION,
rh DOUBLE PRECISION,
pressure_msl_hpa DOUBLE PRECISION,
wind_m_s DOUBLE PRECISION,
wind_gust_m_s DOUBLE PRECISION,
wind_dir_deg DOUBLE PRECISION,
precip_mm DOUBLE PRECISION,
precip_prob DOUBLE PRECISION,
cloud_cover DOUBLE PRECISION,
source_json JSONB,
PRIMARY KEY (site, model, retrieved_at, ts)
);
SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC);
-- Raw retention: 90 days
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs j
WHERE j.proc_name = 'policy_retention'
AND j.hypertable_name = 'observations_ws90'
) THEN
PERFORM add_retention_policy('observations_ws90', INTERVAL '90 days');
END IF;
END$$;
-- Compression after 7 days (recommended)
ALTER TABLE observations_ws90 SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'site,station_id'
);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_compression'
AND hypertable_name = 'observations_ws90'
) THEN
PERFORM add_compression_policy('observations_ws90', INTERVAL '7 days');
END IF;
END$$;
-- 1-minute continuous aggregate
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 minute', ts) AS bucket,
site,
station_id,
avg(temperature_c) AS temp_c_avg,
min(temperature_c) AS temp_c_min,
max(temperature_c) AS temp_c_max,
avg(humidity) AS rh_avg,
min(humidity) AS rh_min,
max(humidity) AS rh_max,
avg(wind_avg_m_s) AS wind_avg_ms_avg,
max(wind_max_m_s) AS wind_gust_ms_max,
avg(wind_dir_deg) AS wind_dir_deg_avg,
max(uvi) AS uvi_max,
max(light_lux) AS light_lux_max,
min(rain_mm) AS rain_mm_min,
max(rain_mm) AS rain_mm_max
FROM observations_ws90
GROUP BY 1,2,3;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'cagg_ws90_1m'
) THEN
PERFORM add_continuous_aggregate_policy('cagg_ws90_1m',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '2 minutes',
schedule_interval => INTERVAL '5 minutes'
);
END IF;
END$$;
-- 5-minute continuous aggregate (optional but useful)
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '5 minutes', ts) AS bucket,
site,
station_id,
avg(temperature_c) AS temp_c_avg,
min(temperature_c) AS temp_c_min,
max(temperature_c) AS temp_c_max,
avg(humidity) AS rh_avg,
min(humidity) AS rh_min,
max(humidity) AS rh_max,
avg(wind_avg_m_s) AS wind_avg_ms_avg,
max(wind_max_m_s) AS wind_gust_ms_max,
avg(wind_dir_deg) AS wind_dir_deg_avg,
max(uvi) AS uvi_max,
max(light_lux) AS light_lux_max,
min(rain_mm) AS rain_mm_min,
max(rain_mm) AS rain_mm_max
FROM observations_ws90
GROUP BY 1,2,3;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'cagg_ws90_5m'
) THEN
PERFORM add_continuous_aggregate_policy('cagg_ws90_5m',
start_offset => INTERVAL '24 hours',
end_offset => INTERVAL '10 minutes',
schedule_interval => INTERVAL '15 minutes'
);
END IF;
END$$;

26
docker-compose.yml Normal file
View File

@@ -0,0 +1,26 @@
services:
timescaledb:
image: timescale/timescaledb:latest-pg16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: micrometeo
ports:
- "5432:5432"
volumes:
- tsdata:/var/lib/postgresql/data
# runs on first DB initialization only
- ./db/init:/docker-entrypoint-initdb.d:ro
ingestd:
build:
context: .
dockerfile: Dockerfile
depends_on:
- timescaledb
restart: unless-stopped
volumes:
- ./config.yaml:/app/config.yaml:ro
volumes:
tsdata:

21
go.mod Normal file
View File

@@ -0,0 +1,21 @@
module go-weatherstation
go 1.25
require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/jackc/pgx/v5 v5.8.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/gorilla/websocket v1.5.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/text v0.29.0 // indirect
)

41
go.sum Normal file
View File

@@ -0,0 +1,41 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

86
internal/config/config.go Normal file
View File

@@ -0,0 +1,86 @@
package config
import (
"errors"
"os"
"time"
"gopkg.in/yaml.v3"
)
type Config struct {
LogLevel string `yaml:"log_level"`
MQTT struct {
Broker string `yaml:"broker"`
ClientID string `yaml:"client_id"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Topic string `yaml:"topic"`
QoS byte `yaml:"qos"`
} `yaml:"mqtt"`
DB struct {
ConnString string `yaml:"conn_string"`
} `yaml:"db"`
Site struct {
Name string `yaml:"name"`
Latitude float64 `yaml:"latitude"`
Longitude float64 `yaml:"longitude"`
ElevationM float64 `yaml:"elevation_m"`
} `yaml:"site"`
Pollers struct {
OpenMeteo struct {
Enabled bool `yaml:"enabled"`
Interval time.Duration `yaml:"interval"`
Model string `yaml:"model"`
} `yaml:"open_meteo"`
} `yaml:"pollers"`
Wunderground struct {
Enabled bool `yaml:"enabled"`
StationID string `yaml:"station_id"`
StationKey string `yaml:"station_key"`
Interval time.Duration `yaml:"interval"`
} `yaml:"wunderground"`
}
func Load(path string) (*Config, error) {
b, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var c Config
if err := yaml.Unmarshal(b, &c); err != nil {
return nil, err
}
// Minimal validation
if c.MQTT.Broker == "" || c.MQTT.Topic == "" {
return nil, errors.New("mqtt broker and topic are required")
}
if c.DB.ConnString == "" {
return nil, errors.New("db conn_string is required")
}
if c.Site.Name == "" {
c.Site.Name = "default"
}
if c.Pollers.OpenMeteo.Model == "" {
c.Pollers.OpenMeteo.Model = "ecmwf"
}
if c.Pollers.OpenMeteo.Interval == 0 {
c.Pollers.OpenMeteo.Interval = 30 * time.Minute
}
if c.Wunderground.Interval == 0 {
c.Wunderground.Interval = 60 * time.Second
}
// If enabled, require creds
if c.Wunderground.Enabled && (c.Wunderground.StationID == "" || c.Wunderground.StationKey == "") {
return nil, errors.New("wunderground enabled but station_id/station_key not set")
}
return &c, nil
}

41
internal/db/db.go Normal file
View File

@@ -0,0 +1,41 @@
package db
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type DB struct {
Pool *pgxpool.Pool
}
func Open(ctx context.Context, connString string) (*DB, error) {
cfg, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, err
}
cfg.MaxConns = 10
cfg.MinConns = 1
cfg.MaxConnIdleTime = 5 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("db ping failed: %w", err)
}
return &DB{Pool: pool}, nil
}
func (d *DB) Close() {
if d.Pool != nil {
d.Pool.Close()
}
}

128
internal/db/queries.go Normal file
View File

@@ -0,0 +1,128 @@
package db
import (
"context"
"encoding/json"
"time"
"github.com/jackc/pgx/v5/pgtype"
)
type InsertWS90Params struct {
TS time.Time
Site string
StationID int64
Model string
BatteryOK int
BatteryMV int
TempC float64
Humidity float64
WindDirDeg float64
WindAvgMS float64
WindMaxMS float64
UVI float64
LightLux float64
Flags int
RainMM float64
RainStart int64
SupercapV float64
Firmware int
RawData string
MIC string
Protocol string
RSSI int
Duration int64
Payload map[string]any
}
func (d *DB) InsertWS90(ctx context.Context, p InsertWS90Params) error {
b, _ := json.Marshal(p.Payload)
var payloadJSON pgtype.JSONB
_ = payloadJSON.Set(b)
_, err := d.Pool.Exec(ctx, `
INSERT INTO observations_ws90 (
ts, site, station_id, model, battery_ok, battery_mv,
temperature_c, humidity,
wind_dir_deg, wind_avg_m_s, wind_max_m_s,
uvi, light_lux, flags,
rain_mm, rain_start,
supercap_v, firmware, raw_data, mic, protocol,
rssi, duration, payload_json
) VALUES (
$1,$2,$3,$4,$5,$6,
$7,$8,
$9,$10,$11,
$12,$13,$14,
$15,$16,
$17,$18,$19,$20,$21,
$22,$23,$24
)
`, p.TS, p.Site, p.StationID, p.Model, p.BatteryOK, p.BatteryMV,
p.TempC, p.Humidity,
p.WindDirDeg, p.WindAvgMS, p.WindMaxMS,
p.UVI, p.LightLux, p.Flags,
p.RainMM, p.RainStart,
p.SupercapV, p.Firmware, p.RawData, p.MIC, p.Protocol,
p.RSSI, p.Duration, payloadJSON)
return err
}
type InsertOpenMeteoHourlyParams struct {
TS time.Time
RetrievedAt time.Time
Site string
Model string
TempC *float64
RH *float64
PressureMSLH *float64
WindMS *float64
WindGustMS *float64
WindDirDeg *float64
PrecipMM *float64
PrecipProb *float64
CloudCover *float64
SourcePayload map[string]any
}
func (d *DB) UpsertOpenMeteoHourly(ctx context.Context, p InsertOpenMeteoHourlyParams) error {
b, _ := json.Marshal(p.SourcePayload)
var sourceJSON pgtype.JSONB
_ = sourceJSON.Set(b)
_, err := d.Pool.Exec(ctx, `
INSERT INTO forecast_openmeteo_hourly (
ts, retrieved_at, site, model,
temp_c, rh, pressure_msl_hpa,
wind_m_s, wind_gust_m_s, wind_dir_deg,
precip_mm, precip_prob, cloud_cover,
source_json
) VALUES (
$1,$2,$3,$4,
$5,$6,$7,
$8,$9,$10,
$11,$12,$13,
$14
)
ON CONFLICT (site, model, retrieved_at, ts) DO UPDATE SET
temp_c = EXCLUDED.temp_c,
rh = EXCLUDED.rh,
pressure_msl_hpa = EXCLUDED.pressure_msl_hpa,
wind_m_s = EXCLUDED.wind_m_s,
wind_gust_m_s = EXCLUDED.wind_gust_m_s,
wind_dir_deg = EXCLUDED.wind_dir_deg,
precip_mm = EXCLUDED.precip_mm,
precip_prob = EXCLUDED.precip_prob,
cloud_cover = EXCLUDED.cloud_cover,
source_json = EXCLUDED.source_json
`, p.TS, p.RetrievedAt, p.Site, p.Model,
p.TempC, p.RH, p.PressureMSLH,
p.WindMS, p.WindGustMS, p.WindDirDeg,
p.PrecipMM, p.PrecipProb, p.CloudCover,
sourceJSON)
return err
}

View File

@@ -0,0 +1,171 @@
package mqttingest
import (
"sync"
"time"
)
type rainMode int
const (
rainModeUnknown rainMode = iota
rainModeCumulative
rainModeIncremental
)
type Latest struct {
mu sync.RWMutex
lastTS time.Time
last *WS90Payload
// Rain tracking
mode rainMode
lastRainMM *float64
// rolling sums built from "rain increment" values (mm)
rainIncs []rainIncPoint // last 1h
dailyIncs []rainIncPoint // since midnight (or since start; well trim daily by midnight)
}
type rainIncPoint struct {
ts time.Time
mm float64 // incremental rainfall at this timestamp (mm)
}
func (l *Latest) Update(ts time.Time, p *WS90Payload) {
l.mu.Lock()
defer l.mu.Unlock()
l.lastTS = ts
l.last = p
inc := l.computeRainIncrement(ts, p.RainMM)
// Track last hour increments
l.rainIncs = append(l.rainIncs, rainIncPoint{ts: ts, mm: inc})
cutoff := ts.Add(-1 * time.Hour)
l.rainIncs = trimBefore(l.rainIncs, cutoff)
// Track daily increments: trim before local midnight
l.dailyIncs = append(l.dailyIncs, rainIncPoint{ts: ts, mm: inc})
midnight := localMidnight(ts)
l.dailyIncs = trimBefore(l.dailyIncs, midnight)
}
func trimBefore(a []rainIncPoint, cutoff time.Time) []rainIncPoint {
i := 0
for ; i < len(a); i++ {
if !a[i].ts.Before(cutoff) {
break
}
}
if i > 0 {
return a[i:]
}
return a
}
// localMidnight returns midnight in the local timezone of the *process*.
// If you want a specific timezone (e.g. Australia/Sydney) we can wire that in later.
func localMidnight(t time.Time) time.Time {
lt := t.Local()
return time.Date(lt.Year(), lt.Month(), lt.Day(), 0, 0, 0, 0, lt.Location())
}
// computeRainIncrement returns the “incremental rain” in mm for this sample,
// regardless of whether the incoming rain_mm is cumulative or incremental.
func (l *Latest) computeRainIncrement(ts time.Time, rainMM float64) float64 {
// First sample: we cant infer anything yet
if l.lastRainMM == nil {
l.lastRainMM = &rainMM
return 0
}
prev := *l.lastRainMM
l.lastRainMM = &rainMM
// Heuristic:
// - If value often stays 0 and occasionally jumps by small amounts, it might be cumulative OR incremental.
// - If it monotonically increases over time (with occasional resets), thats cumulative.
// - If it is usually small per message (e.g. 0, 0.2, 0, 0, 0.2) and not trending upward, thats incremental.
//
// Well decide based on “trendiness” and deltas:
delta := rainMM - prev
// Handle reset (counter rollover / daily reset / device reboot)
if delta < -0.001 {
// If cumulative, after reset the “increment” is 0 for that sample.
// If incremental, a reset doesnt really make sense but we still treat as 0.
if l.mode == rainModeUnknown {
l.mode = rainModeCumulative
}
return 0
}
// If we already decided
switch l.mode {
case rainModeCumulative:
if delta > 0 {
return delta
}
return 0
case rainModeIncremental:
// in incremental mode we treat the sample as “this messages rain”
if rainMM > 0 {
return rainMM
}
return 0
}
// Decide mode (unknown):
// If delta is consistently positive when rainMM > 0, cumulative is likely.
// If delta is ~0 while rainMM occasionally > 0, incremental is likely.
//
// Single-sample heuristic:
// - if rainMM > 0 and delta > 0 => lean cumulative
// - if rainMM > 0 and delta ~ 0 => lean incremental
if rainMM > 0 {
if delta > 0.0009 {
l.mode = rainModeCumulative
return delta
}
// delta near zero but rainMM nonzero suggests incremental
l.mode = rainModeIncremental
return rainMM
}
return 0
}
type Snapshot struct {
TS time.Time
P WS90Payload
RainLastHourMM float64
DailyRainMM float64
}
func (l *Latest) Snapshot() (Snapshot, bool) {
l.mu.RLock()
defer l.mu.RUnlock()
if l.last == nil || l.lastTS.IsZero() {
return Snapshot{}, false
}
var hourSum, daySum float64
for _, rp := range l.rainIncs {
hourSum += rp.mm
}
for _, rp := range l.dailyIncs {
daySum += rp.mm
}
return Snapshot{
TS: l.lastTS,
P: *l.last,
RainLastHourMM: hourSum,
DailyRainMM: daySum,
}, true
}

View File

@@ -0,0 +1,53 @@
package mqttingest
import (
"context"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type MQTTConfig struct {
Broker string
ClientID string
Username string
Password string
Topic string
QoS byte
}
type Handler func(ctx context.Context, topic string, payload []byte) error
func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error {
opts := mqtt.NewClientOptions().
AddBroker(cfg.Broker).
SetClientID(cfg.ClientID).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(5 * time.Second)
if cfg.Username != "" {
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
}
client := mqtt.NewClient(opts)
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
return fmt.Errorf("mqtt connect: %w", tok.Error())
}
// Subscribe
if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) {
// Keep callback short; do work with context
_ = h(ctx, msg.Topic(), msg.Payload())
}); tok.Wait() && tok.Error() != nil {
client.Disconnect(250)
return fmt.Errorf("mqtt subscribe: %w", tok.Error())
}
// Block until ctx cancelled
<-ctx.Done()
client.Disconnect(250)
return nil
}

View File

@@ -0,0 +1,39 @@
package mqttingest
import "encoding/json"
// WS90Payload matches your JSON keys.
type WS90Payload struct {
Model string `json:"model"`
ID int64 `json:"id"`
BatteryOK int `json:"battery_ok"`
BatteryMV int `json:"battery_mV"`
TemperatureC float64 `json:"temperature_C"`
Humidity float64 `json:"humidity"`
WindDirDeg float64 `json:"wind_dir_deg"`
WindAvgMS float64 `json:"wind_avg_m_s"`
WindMaxMS float64 `json:"wind_max_m_s"`
UVI float64 `json:"uvi"`
LightLux float64 `json:"light_lux"`
Flags int `json:"flags"`
RainMM float64 `json:"rain_mm"`
RainStart int64 `json:"rain_start"`
SupercapV float64 `json:"supercap_V"`
Firmware int `json:"firmware"`
Data string `json:"data"`
MIC string `json:"mic"`
Protocol string `json:"protocol"`
RSSI int `json:"rssi"`
Duration int64 `json:"duration"`
}
func ParseWS90(b []byte) (*WS90Payload, map[string]any, error) {
var p WS90Payload
if err := json.Unmarshal(b, &p); err != nil {
return nil, nil, err
}
// Keep the full payload as JSONB too.
var raw map[string]any
_ = json.Unmarshal(b, &raw)
return &p, raw, nil
}

View File

@@ -0,0 +1,166 @@
package providers
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
)
type OpenMeteo struct {
Client *http.Client
}
func (o *OpenMeteo) Name() string { return "open_meteo" }
func (o *OpenMeteo) Fetch(ctxDone <-chan struct{}, site Site, model string) (*ForecastResult, error) {
if o.Client == nil {
o.Client = &http.Client{Timeout: 15 * time.Second}
}
// Hourly fields that are useful for bias-correction / training
hourly := []string{
"temperature_2m",
"relative_humidity_2m",
"pressure_msl",
"wind_speed_10m",
"wind_gusts_10m",
"wind_direction_10m",
"precipitation",
"precipitation_probability",
"cloud_cover",
}
u, _ := url.Parse("https://api.open-meteo.com/v1/forecast")
q := u.Query()
q.Set("latitude", fmt.Sprintf("%.6f", site.Latitude))
q.Set("longitude", fmt.Sprintf("%.6f", site.Longitude))
q.Set("hourly", join(hourly))
q.Set("timezone", "UTC")
q.Set("models", model) // e.g. "ecmwf"
q.Set("forecast_days", "7") // keep it short; you can increase later
u.RawQuery = q.Encode()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { <-ctxDone; cancel() }()
req, _ := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
resp, err := o.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("open-meteo HTTP %d", resp.StatusCode)
}
var raw map[string]any
if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil {
return nil, err
}
// Parse the hourly arrays
hr, ok := raw["hourly"].(map[string]any)
if !ok {
return nil, fmt.Errorf("unexpected open-meteo payload: missing hourly")
}
times, err := parseTimeArray(hr["time"])
if err != nil {
return nil, err
}
// Helpers pull float arrays (may be absent)
temp := floatArray(hr["temperature_2m"])
rh := floatArray(hr["relative_humidity_2m"])
msl := floatArray(hr["pressure_msl"])
ws := floatArray(hr["wind_speed_10m"])
gust := floatArray(hr["wind_gusts_10m"])
wd := floatArray(hr["wind_direction_10m"])
precip := floatArray(hr["precipitation"])
pprob := floatArray(hr["precipitation_probability"])
cloud := floatArray(hr["cloud_cover"])
points := make([]HourlyForecastPoint, 0, len(times))
for i := range times {
points = append(points, HourlyForecastPoint{
TS: times[i],
TempC: idx(temp, i),
RH: idx(rh, i),
PressureMSLH: idx(msl, i),
WindMS: idx(ws, i),
WindGustMS: idx(gust, i),
WindDirDeg: idx(wd, i),
PrecipMM: idx(precip, i),
PrecipProb: idx(pprob, i),
CloudCover: idx(cloud, i),
})
}
return &ForecastResult{
RetrievedAt: time.Now().UTC(),
Model: model,
Hourly: points,
Raw: raw,
}, nil
}
func join(items []string) string {
out := ""
for i, s := range items {
if i > 0 {
out += ","
}
out += s
}
return out
}
func parseTimeArray(v any) ([]time.Time, error) {
arr, ok := v.([]any)
if !ok {
return nil, fmt.Errorf("hourly.time not array")
}
out := make([]time.Time, 0, len(arr))
for _, x := range arr {
s, _ := x.(string)
t, err := time.Parse("2006-01-02T15:04", s) // open-meteo uses ISO without seconds
if err != nil {
return nil, err
}
out = append(out, t.UTC())
}
return out, nil
}
func floatArray(v any) []float64 {
arr, ok := v.([]any)
if !ok {
return nil
}
out := make([]float64, 0, len(arr))
for _, x := range arr {
switch n := x.(type) {
case float64:
out = append(out, n)
case int:
out = append(out, float64(n))
default:
// if null or unexpected, use NaN sentinel? We'll instead skip by storing nil via idx()
out = append(out, 0)
}
}
return out
}
func idx(a []float64, i int) *float64 {
if a == nil || i < 0 || i >= len(a) {
return nil
}
v := a[i]
return &v
}

View File

@@ -0,0 +1,34 @@
package providers
import "time"
type Site struct {
Name string
Latitude float64
Longitude float64
}
type HourlyForecastPoint struct {
TS time.Time
TempC *float64
RH *float64
PressureMSLH *float64
WindMS *float64
WindGustMS *float64
WindDirDeg *float64
PrecipMM *float64
PrecipProb *float64
CloudCover *float64
}
type ForecastResult struct {
RetrievedAt time.Time
Model string
Hourly []HourlyForecastPoint
Raw map[string]any
}
type Provider interface {
Name() string
Fetch(ctxDone <-chan struct{}, site Site, model string) (*ForecastResult, error)
}

View File

@@ -0,0 +1,99 @@
package providers
import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"time"
)
// WU PWS upload protocol endpoint. :contentReference[oaicite:1]{index=1}
const wuEndpoint = "https://rtupdate.wunderground.com/weatherstation/updateweatherstation.php"
type WundergroundClient struct {
HTTP *http.Client
}
type WUUpload struct {
StationID string
StationKey string
// Units required by WU:
// tempf (F), wind mph, baromin (inHg), rain inches.
TempC float64
Humidity float64
WindDirDeg float64
WindAvgMS float64
WindMaxMS float64
UVI float64
SolarWm2 *float64 // your payload is lux; leave nil unless you add W/m^2 later
RainLastHourMM float64
DailyRainMM float64
DateUTC string // "now" recommended
}
func (c *WundergroundClient) Upload(ctx context.Context, u WUUpload) (string, error) {
if c.HTTP == nil {
c.HTTP = &http.Client{Timeout: 10 * time.Second}
}
q := url.Values{}
q.Set("ID", u.StationID)
q.Set("PASSWORD", u.StationKey)
q.Set("action", "updateraw")
q.Set("dateutc", u.DateUTC) // "now" ok :contentReference[oaicite:2]{index=2}
// Required-ish observation fields (only set what we have)
q.Set("tempf", fmt.Sprintf("%.2f", cToF(u.TempC)))
q.Set("humidity", fmt.Sprintf("%.0f", u.Humidity))
q.Set("winddir", fmt.Sprintf("%.0f", u.WindDirDeg))
q.Set("windspeedmph", fmt.Sprintf("%.2f", msToMph(u.WindAvgMS)))
q.Set("windgustmph", fmt.Sprintf("%.2f", msToMph(u.WindMaxMS)))
// Rain (inches)
q.Set("rainin", fmt.Sprintf("%.4f", mmToIn(u.RainLastHourMM)))
q.Set("dailyrainin", fmt.Sprintf("%.4f", mmToIn(u.DailyRainMM)))
// UV index
q.Set("UV", fmt.Sprintf("%.2f", u.UVI))
// NOTE: your WS90 payload provides light in lux, not W/m^2.
// WU expects solarradiation in W/m^2, so we omit it unless you add a conversion/actual sensor field.
if u.SolarWm2 != nil {
q.Set("solarradiation", fmt.Sprintf("%.2f", *u.SolarWm2))
}
// Optional: a software identifier
q.Set("softwaretype", "go-weatherstation-go")
reqURL := wuEndpoint + "?" + q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return "", err
}
resp, err := c.HTTP.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
// WU typically returns plain text like "success"
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("wunderground upload HTTP %d", resp.StatusCode)
}
// Read small body
buf := make([]byte, 2048)
n, _ := resp.Body.Read(buf)
return strconv.Quote(string(buf[:n])), nil
}
func cToF(c float64) float64 { return (c * 9.0 / 5.0) + 32.0 }
func msToMph(ms float64) float64 { return ms * 2.2369362920544 }
func mmToIn(mm float64) float64 { return mm / 25.4 }