update daily aggregation to use hourly intervals
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-23 14:33:22 +11:00
parent e6c7596239
commit 0d509179aa
3 changed files with 121 additions and 86 deletions

View File

@@ -6,6 +6,7 @@ vCTP is a vSphere Chargeback Tracking Platform, designed for a specific customer
- Daily summaries aggregate the hourly snapshots for the day; monthly summaries aggregate daily summaries for the month (or hourly snapshots if configured).
- Snapshots are registered in `snapshot_registry` so regeneration via `/api/snapshots/aggregate` can locate the correct tables (fallback scanning is also supported).
- Reports (XLSX with totals/charts) are generated automatically after hourly, daily, and monthly jobs and written to a reports directory.
- Hourly totals in reports are interval-based: each row represents `[HH:00, HH+1:00)` and uses the first snapshot at or after the hour end (including cross-day snapshots) to prorate VM presence by creation/deletion overlap.
- Prometheus metrics are exposed at `/metrics`:
- Snapshots/aggregations: `vctp_hourly_snapshots_total`, `vctp_hourly_snapshots_failed_total`, `vctp_hourly_snapshot_last_unix`, `vctp_hourly_snapshot_last_rows`, `vctp_daily_aggregations_total`, `vctp_daily_aggregations_failed_total`, `vctp_daily_aggregation_duration_seconds`, `vctp_monthly_aggregations_total`, `vctp_monthly_aggregations_failed_total`, `vctp_monthly_aggregation_duration_seconds`, `vctp_reports_available`
- vCenter health/perf: `vctp_vcenter_connect_failures_total{vcenter}`, `vctp_vcenter_snapshot_duration_seconds{vcenter}`, `vctp_vcenter_inventory_size{vcenter}`

View File

@@ -63,6 +63,7 @@ templ Index(info BuildInfo) {
<li>Daily summaries aggregate the hourly snapshots for the day; monthly summaries aggregate daily summaries for the month (or hourly snapshots if configured).</li>
<li>Snapshots are registered in `snapshot_registry` so regeneration via `/api/snapshots/aggregate` can locate the correct tables (fallback scanning is also supported).</li>
<li>Reports (XLSX with totals/charts) are generated automatically after hourly, daily, and monthly jobs and written to a reports directory.</li>
<li>Hourly totals are interval-based: each row represents [HH:00, HH+1:00) and uses the first snapshot at or after the hour end (including cross-day snapshots) to prorate VM presence.</li>
</ul>
</div>
<div class="web2-card">

View File

@@ -805,11 +805,11 @@ func addTotalsChartSheet(logger *slog.Logger, database db.Database, ctx context.
if err := EnsureSnapshotRegistry(ctx, database); err != nil {
return
}
records, err := SnapshotRecordsWithFallback(ctx, database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
records, err := SnapshotRecordsWithFallback(ctx, database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd.Add(2*time.Hour))
if err != nil || len(records) == 0 {
return
}
points, err := buildHourlyTotals(ctx, logger, database.DB(), records)
points, err := buildHourlyTotals(ctx, logger, database.DB(), records, dayStart, dayEnd)
if err != nil || len(points) == 0 {
return
}
@@ -1077,43 +1077,77 @@ type totalsPoint struct {
GoldTotal float64
}
func buildHourlyTotals(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, records []SnapshotRecord) ([]totalsPoint, error) {
func buildHourlyTotals(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, records []SnapshotRecord, windowStart, windowEnd time.Time) ([]totalsPoint, error) {
if logger == nil {
logger = slog.Default()
}
type hourBucket struct {
samples int
vmSum float64
vcpuSum float64
ramSum float64
presenceSum float64
tinSum float64
bronzeSum float64
silverSum float64
goldSum float64
if windowEnd.Before(windowStart) {
return nil, fmt.Errorf("hourly totals window end is before start")
}
sort.Slice(records, func(i, j int) bool {
return records[i].SnapshotTime.Before(records[j].SnapshotTime)
})
expectedInterval := estimateSnapshotInterval(records)
maxLag := expectedInterval
if maxLag <= 0 {
maxLag = time.Hour
}
buckets := make(map[int64]*hourBucket)
for _, record := range records {
if err := db.ValidateTableName(record.TableName); err != nil {
return nil, err
}
if record.SnapshotCount == 0 {
logger.Debug("hourly totals skipping empty snapshot", "table", record.TableName, "snapshot_time", record.SnapshotTime)
continue
}
if record.SnapshotCount < 0 {
rowsExist, err := db.TableHasRows(ctx, dbConn, record.TableName)
if err != nil {
logger.Debug("hourly totals snapshot probe failed", "table", record.TableName, "snapshot_time", record.SnapshotTime, "error", err)
}
if err != nil || !rowsExist {
points := make([]totalsPoint, 0, 24)
hourStart := windowStart.Truncate(time.Hour)
if hourStart.Before(windowStart) {
hourStart = hourStart.Add(time.Hour)
}
recordIndex := 0
for hourEnd := hourStart.Add(time.Hour); !hourEnd.After(windowEnd); hourEnd = hourEnd.Add(time.Hour) {
hourWindowStart := hourEnd.Add(-time.Hour)
var selected *SnapshotRecord
selectedIndex := recordIndex
for selectedIndex < len(records) {
record := records[selectedIndex]
if record.SnapshotTime.Before(hourEnd) {
selectedIndex++
continue
}
if record.SnapshotTime.After(hourEnd.Add(maxLag)) {
break
}
if err := db.ValidateTableName(record.TableName); err != nil {
return nil, err
}
if record.SnapshotCount == 0 {
logger.Debug("hourly totals skipping empty snapshot", "table", record.TableName, "snapshot_time", record.SnapshotTime)
selectedIndex++
continue
}
if record.SnapshotCount < 0 {
rowsExist, err := db.TableHasRows(ctx, dbConn, record.TableName)
if err != nil {
logger.Debug("hourly totals snapshot probe failed", "table", record.TableName, "snapshot_time", record.SnapshotTime, "error", err)
}
if err != nil || !rowsExist {
selectedIndex++
continue
}
}
selected = &record
break
}
hourStart := record.SnapshotTime.Local().Truncate(time.Hour)
hourStartUnix := hourStart.Unix()
hourEndUnix := hourStart.Add(time.Hour).Unix()
if selected == nil {
logger.Debug(
"hourly totals missing snapshot for interval",
"interval_start", hourWindowStart.Format("2006-01-02 15:04"),
"interval_end", hourEnd.Format("2006-01-02 15:04"),
"max_lag_seconds", int64(maxLag.Seconds()),
)
recordIndex = selectedIndex
continue
}
recordIndex = selectedIndex
hourStartUnix := hourWindowStart.Unix()
hourEndUnix := hourEnd.Unix()
durationSeconds := float64(hourEndUnix - hourStartUnix)
startExpr := `CASE WHEN "CreationTime" IS NOT NULL AND "CreationTime" > 0 AND "CreationTime" > ? THEN "CreationTime" ELSE ? END`
endExpr := `CASE WHEN "DeletionTime" IS NOT NULL AND "DeletionTime" > 0 AND "DeletionTime" < ? THEN "DeletionTime" ELSE ? END`
@@ -1176,7 +1210,7 @@ SELECT
diag.presence_under_zero,
diag.base_presence_sum
FROM diag
`, vmKeyExpr, overlapExpr, record.TableName, templateExclusionFilter())
`, vmKeyExpr, overlapExpr, selected.TableName, templateExclusionFilter())
query = dbConn.Rebind(query)
var row struct {
VmCount int64 `db:"vm_count"`
@@ -1207,11 +1241,15 @@ FROM diag
if err := dbConn.GetContext(ctx, &row, query, args...); err != nil {
return nil, err
}
snapshotLag := selected.SnapshotTime.Sub(hourEnd)
duplicateRows := row.RowCount - row.DistinctKeys
logger.Debug(
"hourly totals snapshot diagnostics",
"table", record.TableName,
"hour_start", hourStart.Format("2006-01-02 15:00"),
"table", selected.TableName,
"snapshot_time", selected.SnapshotTime.Format(time.RFC3339),
"snapshot_lag_seconds", int64(snapshotLag.Seconds()),
"interval_start", hourWindowStart.Format("2006-01-02 15:04"),
"interval_end", hourEnd.Format("2006-01-02 15:04"),
"row_count", row.RowCount,
"distinct_keys", row.DistinctKeys,
"duplicate_rows", duplicateRows,
@@ -1225,67 +1263,62 @@ FROM diag
"presence_ratio", row.PresenceRatio,
"vm_count", row.VmCount,
)
hourKey := hourStartUnix
bucket := buckets[hourKey]
if bucket == nil {
bucket = &hourBucket{}
buckets[hourKey] = bucket
}
bucket.samples++
bucket.vmSum += float64(row.VmCount)
bucket.vcpuSum += float64(row.VcpuTotal)
bucket.ramSum += float64(row.RamTotal)
bucket.presenceSum += row.PresenceRatio
bucket.tinSum += row.TinTotal
bucket.bronzeSum += row.BronzeTotal
bucket.silverSum += row.SilverTotal
bucket.goldSum += row.GoldTotal
}
keys := make([]int64, 0, len(buckets))
for key := range buckets {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
points := make([]totalsPoint, 0, len(keys))
for _, key := range keys {
bucket := buckets[key]
if bucket.samples == 0 {
continue
}
denom := float64(bucket.samples)
vmAvg := bucket.vmSum / denom
presenceAvg := bucket.presenceSum / denom
tinAvg := bucket.tinSum / denom
bronzeAvg := bucket.bronzeSum / denom
silverAvg := bucket.silverSum / denom
goldAvg := bucket.goldSum / denom
label := formatHourIntervalLabel(hourWindowStart, hourEnd)
logger.Debug(
"hourly totals bucket",
"hour_start", time.Unix(key, 0).Local().Format("2006-01-02 15:00"),
"samples", bucket.samples,
"presence_ratio", presenceAvg,
"tin_total", tinAvg,
"bronze_total", bronzeAvg,
"silver_total", silverAvg,
"gold_total", goldAvg,
"interval_start", hourWindowStart.Format("2006-01-02 15:04"),
"interval_end", hourEnd.Format("2006-01-02 15:04"),
"presence_ratio", row.PresenceRatio,
"tin_total", row.TinTotal,
"bronze_total", row.BronzeTotal,
"silver_total", row.SilverTotal,
"gold_total", row.GoldTotal,
)
points = append(points, totalsPoint{
Label: time.Unix(key, 0).Local().Format("2006-01-02 15:00"),
VmCount: vmAvg,
VcpuTotal: bucket.vcpuSum / denom,
RamTotal: bucket.ramSum / denom,
PresenceRatio: presenceAvg,
TinTotal: tinAvg,
BronzeTotal: bronzeAvg,
SilverTotal: silverAvg,
GoldTotal: goldAvg,
Label: label,
VmCount: float64(row.VmCount),
VcpuTotal: float64(row.VcpuTotal),
RamTotal: float64(row.RamTotal),
PresenceRatio: row.PresenceRatio,
TinTotal: row.TinTotal,
BronzeTotal: row.BronzeTotal,
SilverTotal: row.SilverTotal,
GoldTotal: row.GoldTotal,
})
}
return points, nil
}
func estimateSnapshotInterval(records []SnapshotRecord) time.Duration {
if len(records) < 2 {
return time.Hour
}
diffs := make([]int64, 0, len(records)-1)
for i := 1; i < len(records); i++ {
diff := records[i].SnapshotTime.Sub(records[i-1].SnapshotTime)
if diff > 0 {
diffs = append(diffs, int64(diff.Seconds()))
}
}
if len(diffs) == 0 {
return time.Hour
}
sort.Slice(diffs, func(i, j int) bool { return diffs[i] < diffs[j] })
median := diffs[len(diffs)/2]
if median <= 0 {
return time.Hour
}
return time.Duration(median) * time.Second
}
func formatHourIntervalLabel(start, end time.Time) string {
startLabel := start.Format("2006-01-02 15:04")
if start.Year() == end.Year() && start.YearDay() == end.YearDay() {
return fmt.Sprintf("%s to %s", startLabel, end.Format("15:04"))
}
return fmt.Sprintf("%s to %s", startLabel, end.Format("2006-01-02 15:04"))
}
func buildDailyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotRecord, prorateByAvg bool) ([]totalsPoint, error) {
points := make([]totalsPoint, 0, len(records))
tinExpr := `COALESCE(SUM(CASE WHEN "Tin" IS NOT NULL THEN "Tin" ELSE 0 END) / 100.0, 0)`