fix aggregation logic
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-23 09:38:08 +11:00
parent 8a3481b966
commit 3e2d95d3b9
16 changed files with 384 additions and 168 deletions

View File

@@ -464,6 +464,8 @@ func recordsFromTableNames(ctx context.Context, database db.Database, snapshotTy
TableName: table,
SnapshotTime: ts,
SnapshotType: snapshotType,
// Unknown row count when snapshot_registry isn't available.
SnapshotCount: -1,
})
}
}

View File

@@ -150,11 +150,6 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil {
c.Logger.Warn("failed to apply lifecycle creation times to daily summary", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied)
}
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to daily summary", "error", err, "table", summaryTable)
} else {
@@ -375,20 +370,18 @@ LIMIT 1
}
c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples)
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil {
c.Logger.Warn("failed to apply lifecycle creation times to daily summary (Go)", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied)
}
// Persist rollup cache for monthly aggregation.
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples, totalSamplesByVcenter); err != nil {
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Debug("persisted daily rollup cache", "date", dayStart.Format("2006-01-02"))
}
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
} else {
c.Logger.Debug("refined creation/deletion times", "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)

View File

@@ -3,6 +3,7 @@ package tasks
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strconv"
@@ -15,6 +16,17 @@ import (
"github.com/jmoiron/sqlx"
)
var snapshotProbeLimiter = make(chan struct{}, 1)
func acquireSnapshotProbe(ctx context.Context) (func(), error) {
select {
case snapshotProbeLimiter <- struct{}{}:
return func() { <-snapshotProbeLimiter }, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil:
@@ -98,30 +110,41 @@ LIMIT ?
}
if count.Valid && count.Int64 == 0 {
if logger != nil {
logger.Debug("skipping snapshot table with zero count", "table", name, "snapshot_time", ts)
logger.Debug("skipping snapshot table with zero count", "table", name, "snapshot_time", ts, "vcenter", vcenter)
}
continue
}
probed := false
var probeErr error
probeTimeout := false
// If count is known and >0, trust it; if NULL, accept optimistically to avoid heavy probes.
hasRows := !count.Valid || count.Int64 > 0
start := time.Now()
if vcenter != "" && hasRows {
probed = true
vrows, qerr := querySnapshotRows(ctx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter)
if qerr == nil {
hasRows = vrows.Next()
vrows.Close()
} else {
probeCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
release, err := acquireSnapshotProbe(probeCtx)
if err != nil {
probeErr = err
hasRows = false
if logger != nil {
logger.Debug("snapshot vcenter filter probe failed", "table", name, "vcenter", vcenter, "error", qerr)
cancel()
} else {
vrows, qerr := querySnapshotRows(probeCtx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter)
if qerr == nil {
hasRows = vrows.Next()
vrows.Close()
} else {
probeErr = qerr
hasRows = false
}
release()
cancel()
}
probeTimeout = errors.Is(probeErr, context.DeadlineExceeded) || errors.Is(probeErr, context.Canceled)
}
elapsed := time.Since(start)
if logger != nil {
logger.Debug("evaluated snapshot table", "table", name, "snapshot_time", ts, "snapshot_count", count, "probed", probed, "has_rows", hasRows, "elapsed", elapsed)
logger.Debug("evaluated snapshot table", "table", name, "snapshot_time", ts, "snapshot_count", count, "probed", probed, "has_rows", hasRows, "elapsed", elapsed, "vcenter", vcenter, "probe_error", probeErr, "probe_timeout", probeTimeout)
}
if !hasRows {
continue

View File

@@ -117,6 +117,7 @@ type snapshotTable struct {
}
func listHourlyTablesForDay(ctx context.Context, dbConn *sqlx.DB, dayStart, dayEnd time.Time) ([]snapshotTable, error) {
log := loggerFromCtx(ctx, nil)
rows, err := dbConn.QueryxContext(ctx, `
SELECT table_name, snapshot_time, snapshot_count
FROM snapshot_registry
@@ -139,6 +140,9 @@ ORDER BY snapshot_time ASC
}
// Trust snapshot_count if present; otherwise optimistically include to avoid long probes.
if t.Count.Valid && t.Count.Int64 <= 0 {
if log != nil {
log.Debug("skipping snapshot table with zero count", "table", t.Table, "snapshot_time", t.Time)
}
continue
}
tables = append(tables, t)

View File

@@ -417,23 +417,60 @@ func buildUnionQuery(tables []string, columns []string, whereClause string) (str
return "", fmt.Errorf("no columns provided for union")
}
queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ")
for _, table := range tables {
safeName, err := db.SafeTableName(table)
if err != nil {
return "", err
const maxCompoundTerms = 450
if len(tables) <= maxCompoundTerms {
queries := make([]string, 0, len(tables))
for _, table := range tables {
safeName, err := db.SafeTableName(table)
if err != nil {
return "", err
}
query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName)
if whereClause != "" {
query = fmt.Sprintf("%s WHERE %s", query, whereClause)
}
queries = append(queries, query)
}
query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName)
if whereClause != "" {
query = fmt.Sprintf("%s WHERE %s", query, whereClause)
if len(queries) == 0 {
return "", fmt.Errorf("no valid tables provided for union")
}
queries = append(queries, query)
union := strings.Join(queries, "\nUNION ALL\n")
return fmt.Sprintf("SELECT * FROM (%s) AS union_all", union), nil
}
if len(queries) == 0 {
batches := make([]string, 0, (len(tables)/maxCompoundTerms)+1)
batchIndex := 0
for start := 0; start < len(tables); start += maxCompoundTerms {
end := start + maxCompoundTerms
if end > len(tables) {
end = len(tables)
}
queries := make([]string, 0, end-start)
for _, table := range tables[start:end] {
safeName, err := db.SafeTableName(table)
if err != nil {
return "", err
}
query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName)
if whereClause != "" {
query = fmt.Sprintf("%s WHERE %s", query, whereClause)
}
queries = append(queries, query)
}
if len(queries) == 0 {
continue
}
subUnion := strings.Join(queries, "\nUNION ALL\n")
batches = append(batches, fmt.Sprintf("SELECT * FROM (%s) AS batch_%d", subUnion, batchIndex))
batchIndex++
}
if len(batches) == 0 {
return "", fmt.Errorf("no valid tables provided for union")
}
return strings.Join(queries, "\nUNION ALL\n"), nil
outerUnion := strings.Join(batches, "\nUNION ALL\n")
return fmt.Sprintf("SELECT * FROM (%s) AS union_batches", outerUnion), nil
}
func templateExclusionFilter() string {
@@ -465,9 +502,21 @@ func truncateDate(t time.Time) time.Time {
func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord {
filtered := snapshots[:0]
log := loggerFromCtx(ctx, nil)
for _, snapshot := range snapshots {
if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
switch {
case snapshot.SnapshotCount > 0:
filtered = append(filtered, snapshot)
case snapshot.SnapshotCount == 0:
if log != nil {
log.Debug("skipping snapshot table with zero count", "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime)
}
default:
if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
filtered = append(filtered, snapshot)
} else if log != nil {
log.Debug("snapshot table probe empty or failed", "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime, "error", err)
}
}
}
return filtered
@@ -1072,14 +1121,16 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
// Fallback: compare against latest registered snapshot table.
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)); err == nil && prevTable != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
if moreMissing > 0 {
missingCount += moreMissing
// Fallback: locate a previous table only if we didn't already find one.
if prevTableName == "" {
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)); err == nil && prevTable != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
if moreMissing > 0 {
missingCount += moreMissing
}
// Reuse this table name for later snapshot lookups when correlating deletion events.
prevTableName = prevTable
}
// Reuse this table name for later snapshot lookups when correlating deletion events.
prevTableName = prevTable
}
freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second
if freq <= 0 {

View File

@@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"runtime"
"sort"
"strings"
"sync"
"time"
@@ -53,6 +54,12 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
monthEnd := monthStart.AddDate(0, 1, 0)
dbConn := c.Database.DB()
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
driver := strings.ToLower(dbConn.DriverName())
useGoAgg := os.Getenv("MONTHLY_AGG_GO") == "1"
if !useGoAgg && granularity == "hourly" && driver == "sqlite" {
c.Logger.Warn("SQL monthly aggregation is slow on sqlite; overriding to Go path", "granularity", granularity)
useGoAgg = true
}
var snapshots []report.SnapshotRecord
var unionColumns []string
@@ -99,17 +106,28 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
}
// Optional Go-based aggregation path.
if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity == "daily" {
c.Logger.Debug("Using go implementation of monthly aggregation")
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
if useGoAgg {
if granularity == "daily" {
c.Logger.Debug("Using go implementation of monthly aggregation (daily)")
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
return nil
}
} else if granularity == "hourly" {
c.Logger.Debug("Using go implementation of monthly aggregation (hourly)")
if err := c.aggregateMonthlySummaryGoHourly(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
return nil
}
} else {
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
return nil
c.Logger.Warn("MONTHLY_AGG_GO is set but granularity is unsupported; using SQL path", "granularity", granularity)
}
} else if os.Getenv("MONTHLY_AGG_GO") == "1" && granularity != "daily" {
c.Logger.Warn("MONTHLY_AGG_GO is set but only daily granularity supports Go aggregation; using SQL path", "granularity", granularity)
}
tables := make([]string, 0, len(snapshots))
@@ -148,11 +166,6 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
return err
}
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, monthlyTable); err != nil {
c.Logger.Warn("failed to apply lifecycle creation times to monthly summary", "error", err, "table", monthlyTable)
} else {
c.Logger.Info("Monthly aggregation creation times", "source_lifecycle_cache", applied)
}
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary", "error", err, "table", monthlyTable)
} else {
@@ -183,6 +196,124 @@ func monthlySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
}
// aggregateMonthlySummaryGoHourly aggregates hourly snapshots directly into the monthly summary table.
func (c *CronTask) aggregateMonthlySummaryGoHourly(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, hourlySnapshots []report.SnapshotRecord) error {
jobStart := time.Now()
dbConn := c.Database.DB()
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", monthStart.Format("2006-01"))
}
totalSamples := len(hourlySnapshots)
var (
aggMap map[dailyAggKey]*dailyAggVal
snapTimes []int64
)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, monthStart, monthEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
c.Logger.Debug("using hourly cache for monthly aggregation", "month", monthStart.Format("2006-01"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
}
}
if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
if errScan != nil {
return errScan
}
c.Logger.Debug("scanned hourly tables for monthly aggregation", "month", monthStart.Format("2006-01"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01"))
}
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
}
lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, monthStart, monthEnd)
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions)
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, monthStart, monthEnd)
c.Logger.Info("Monthly aggregation deletion times", "source_inventory", inventoryDeletions)
if len(snapTimes) > 0 {
maxSnap := snapTimes[len(snapTimes)-1]
inferredDeletions := 0
for _, v := range aggMap {
if v.deletion != 0 {
continue
}
consecutiveMisses := 0
firstMiss := int64(0)
for _, t := range snapTimes {
if t <= v.lastSeen {
continue
}
if _, ok := v.seen[t]; ok {
consecutiveMisses = 0
firstMiss = 0
continue
}
consecutiveMisses++
if firstMiss == 0 {
firstMiss = t
}
if consecutiveMisses >= 2 {
v.deletion = firstMiss
inferredDeletions++
break
}
}
if v.deletion == 0 && v.lastSeen < maxSnap && firstMiss > 0 {
c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss)
}
}
c.Logger.Info("Monthly aggregation deletion times", "source_inferred", inferredDeletions)
}
totalSamplesByVcenter := sampleCountsByVcenter(aggMap)
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil {
return err
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count monthly summary rows (Go hourly)", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil {
c.Logger.Warn("failed to register monthly snapshot (Go hourly)", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate monthly report (Go hourly)", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished monthly inventory aggregation (Go hourly)",
"summary_table", summaryTable,
"duration", time.Since(jobStart),
"tables_scanned", len(hourlySnapshots),
"rows_written", rowCount,
"total_samples", totalSamples,
)
return nil
}
// aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go,
// reading daily summaries in parallel and reducing them to a single monthly summary table.
func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord) error {
@@ -223,11 +354,6 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo
return err
}
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil {
c.Logger.Warn("failed to apply lifecycle creation times to monthly summary (Go)", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Monthly aggregation creation times", "source_lifecycle_cache", applied)
}
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (Go)", "error", err, "table", summaryTable)
} else {

View File

@@ -208,30 +208,41 @@ func (v *Vcenter) FindVmDeletionEvents(ctx context.Context, begin, end time.Time
endUTC := end.UTC()
mgr := event.NewManager(v.client.Client)
recordDeletion := func(vmID string, ts time.Time) {
if vmID == "" {
return
}
if prev, ok := result[vmID]; !ok || ts.Before(prev) {
result[vmID] = ts
}
}
isDeletionMessage := func(msg string) bool {
msg = strings.ToLower(msg)
return strings.Contains(msg, "destroy") || strings.Contains(msg, "deleted")
}
processEvents := func(evts []types.BaseEvent) {
for _, ev := range evts {
switch e := ev.(type) {
case *types.VmRemovedEvent:
if e.Vm != nil {
vmID := e.Vm.Vm.Value
if vmID != "" {
result[vmID] = e.CreatedTime
}
recordDeletion(vmID, e.CreatedTime)
}
case *types.TaskEvent:
// Fallback for destroy task events.
if e.Info.Entity != nil {
vmID := e.Info.Entity.Value
msg := strings.ToLower(e.GetEvent().FullFormattedMessage)
if vmID != "" && (strings.Contains(msg, "destroy") || strings.Contains(msg, "deleted")) {
result[vmID] = e.CreatedTime
if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) {
recordDeletion(vmID, e.CreatedTime)
}
}
case *types.VmEvent:
if e.Vm != nil {
vmID := e.Vm.Vm.Value
if vmID != "" {
result[vmID] = e.CreatedTime
if vmID != "" && isDeletionMessage(e.GetEvent().FullFormattedMessage) {
recordDeletion(vmID, e.CreatedTime)
}
}
}