enhance utilisation of postgres features
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-04-20 10:19:27 +10:00
parent 98e92a8264
commit 8ccf5a7009
28 changed files with 2836 additions and 422 deletions
+275 -65
View File
@@ -16,6 +16,8 @@ import (
"vctp/internal/metrics"
"vctp/internal/report"
"vctp/internal/settings"
"github.com/jmoiron/sqlx"
)
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
@@ -34,15 +36,15 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo
targetTime := time.Now().AddDate(0, 0, -1)
logger.Info("Daily summary job starting", "target_date", targetTime.Format("2006-01-02"))
// Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier.
return c.aggregateDailySummary(jobCtx, targetTime, true)
return c.aggregateDailySummaryWithMode(jobCtx, targetTime, true, true)
})
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
return c.aggregateDailySummaryWithMode(ctx, date, force, false)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
func (c *CronTask) aggregateDailySummaryWithMode(ctx context.Context, targetTime time.Time, force bool, scheduled bool) error {
jobStart := time.Now()
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
@@ -71,10 +73,31 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
}
}
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
if os.Getenv("DAILY_AGG_GO") == "1" {
if scheduled && c.scheduledAggregationEngine() == "sql" {
c.Logger.Info("scheduled_aggregation_engine=sql enabled; using canonical SQL daily aggregation path")
if err := c.aggregateDailySummarySQLCanonical(ctx, dayStart, dayEnd, summaryTable); err != nil {
c.Logger.Warn("scheduled canonical SQL daily aggregation failed; falling back to go path", "error", err)
} else {
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished daily inventory aggregation (SQL canonical path)", "summary_table", summaryTable)
return nil
}
}
// Canonical Go aggregation is the default for both scheduled and manual runs.
// Legacy SQL/union aggregation stays available as a manual fallback/backfill path.
forceGoAgg := os.Getenv("DAILY_AGG_GO") == "1"
forceSQLAgg := !scheduled && os.Getenv("DAILY_AGG_SQL") == "1"
useGoAgg := scheduled || forceGoAgg || !forceSQLAgg
if forceSQLAgg && !forceGoAgg {
c.Logger.Info("DAILY_AGG_SQL=1 enabled; using SQL fallback path for manual daily aggregation")
}
if useGoAgg {
c.Logger.Debug("Using go implementation of aggregation")
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force, scheduled); err != nil {
if scheduled {
return err
}
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
@@ -200,7 +223,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
reportStart := time.Now()
c.Logger.Debug("Generating daily report", "table", summaryTable)
if err := c.generateReport(ctx, summaryTable); err != nil {
if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), err)
return err
@@ -225,34 +248,106 @@ func dailySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
func (c *CronTask) aggregateDailySummarySQLCanonical(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string) error {
jobStart := time.Now()
dbConn := c.Database.DB()
if !db.TableExists(ctx, dbConn, "vm_hourly_stats") {
return fmt.Errorf("vm_hourly_stats table not found for canonical SQL daily aggregation")
}
unionQuery := buildCanonicalHourlySummaryUnion(dayStart, dayEnd)
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
return err
}
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil {
c.Logger.Warn("failed to apply lifecycle deletions to daily summary (SQL canonical)", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied)
}
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil {
c.Logger.Warn("failed to apply lifecycle creations to daily summary (SQL canonical)", "error", err, "table", summaryTable)
} else {
c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied)
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, buildHourlyCacheLifecycleUnion(dayStart, dayEnd)); err != nil {
c.Logger.Warn("failed to refine creation/deletion times (SQL canonical)", "error", err, "table", summaryTable)
}
if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, dayStart.Unix(), dayEnd.Unix()); err != nil {
c.Logger.Warn("failed to update daily AvgIsPresent from lifecycle window (SQL canonical)", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows (SQL canonical)", "error", err, "table", summaryTable)
}
if rowCount == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
logMissingCreationSummary(ctx, c.Logger, c.Database, summaryTable, rowCount)
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot (SQL canonical)", "error", err, "table", summaryTable)
}
if refreshed, err := db.ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "daily", dayStart.Unix()); err != nil {
c.Logger.Warn("failed to refresh vcenter daily aggregate totals cache (SQL canonical)", "error", err, "table", summaryTable)
} else {
c.Logger.Debug("refreshed vcenter daily aggregate totals cache", "table", summaryTable, "rows", refreshed)
}
if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report (SQL canonical)", "error", err, "table", summaryTable)
return err
}
driver := strings.ToLower(dbConn.DriverName())
action, checkpointErr := db.CheckpointDatabase(ctx, dbConn)
if checkpointErr != nil {
c.Logger.Warn("failed to run database checkpoint after daily aggregation (SQL canonical)", "driver", driver, "action", action, "error", checkpointErr)
}
c.Logger.Debug("Finished daily inventory aggregation (SQL canonical path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
func buildCanonicalHourlySummaryUnion(start, end time.Time) string {
return fmt.Sprintf(`
SELECT
NULL AS "InventoryId",
COALESCE("Name",'') AS "Name",
COALESCE("Vcenter",'') AS "Vcenter",
COALESCE("VmId",'') AS "VmId",
NULL AS "EventKey",
NULL AS "CloudId",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime",
COALESCE("ResourcePool",'') AS "ResourcePool",
COALESCE("Datacenter",'') AS "Datacenter",
COALESCE("Cluster",'') AS "Cluster",
COALESCE("Folder",'') AS "Folder",
COALESCE("ProvisionedDisk",0) AS "ProvisionedDisk",
COALESCE("VcpuCount",0) AS "VcpuCount",
COALESCE("RamGB",0) AS "RamGB",
COALESCE("IsTemplate",'') AS "IsTemplate",
COALESCE("PoweredOn",'') AS "PoweredOn",
COALESCE("SrmPlaceholder",'') AS "SrmPlaceholder",
COALESCE("VmUuid",'') AS "VmUuid",
"SnapshotTime"
FROM vm_hourly_stats
WHERE "SnapshotTime" >= %d
AND "SnapshotTime" < %d
AND %s
`, start.Unix(), end.Unix(), templateExclusionFilter())
}
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
// as closely as possible while improving CPU utilization on multi-core hosts.
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool, canonicalOnly bool) error {
jobStart := time.Now()
dbConn := c.Database.DB()
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
c.Logger.Info("Daily aggregation hourly snapshot count (go path)", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02"))
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
} else {
c.Logger.Debug("Found hourly snapshot tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
hourlyTables := make([]string, 0, 64)
unionQuery := ""
// Clear existing summary if forcing.
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
@@ -266,41 +361,75 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
}
}
totalSamples := len(hourlyTables)
totalSamples := 0
var (
aggMap map[dailyAggKey]*dailyAggVal
snapTimes []int64
)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
if canonicalOnly {
if !db.TableExists(ctx, dbConn, "vm_hourly_stats") {
return fmt.Errorf("vm_hourly_stats table not found for canonical daily aggregation")
}
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
return cacheErr
}
}
if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
if errScan != nil {
return errScan
}
c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
if len(aggMap) == 0 {
if len(cacheAgg) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Build ordered list of snapshot times for deletion inference.
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
c.Logger.Debug("using canonical hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
unionQuery = buildHourlyCacheLifecycleUnion(dayStart, dayEnd)
} else {
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
c.Logger.Info("Daily aggregation hourly snapshot count (go path)", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02"))
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
}
unionQuery, err = buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
totalSamples = len(hourlyTables)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
}
}
if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
if errScan != nil {
return errScan
}
c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Build ordered list of snapshot times for deletion inference.
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
}
slices.Sort(snapTimes)
}
slices.Sort(snapTimes)
}
lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, dayStart, dayEnd)
@@ -316,22 +445,36 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
c.Logger.Info("Daily aggregation creation times", "source_inventory", inventoryCreations)
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
var nextSnapshotTable string
nextSnapshotQuery := dbConn.Rebind(`
var (
nextSnapshotTable string
nextSnapshotTime int64
)
nextPresenceByVcenter := make(map[string]map[string]struct{}, 8)
if canonicalOnly {
presence, snapshotTime, err := loadNextHourlyCachePresence(ctx, dbConn, dayEnd)
if err != nil {
c.Logger.Warn("failed to load next-hourly presence from canonical cache", "error", err)
} else {
nextPresenceByVcenter = presence
nextSnapshotTime = snapshotTime
}
} else {
nextSnapshotQuery := dbConn.Rebind(`
SELECT table_name
FROM snapshot_registry
WHERE snapshot_type = 'hourly' AND snapshot_time >= ?
ORDER BY snapshot_time ASC
LIMIT 1
`)
nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, nextSnapshotQuery, dayEnd.Unix())
if nextErr == nil {
if nextSnapshotRows.Next() {
if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil {
nextSnapshotTable = ""
nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, nextSnapshotQuery, dayEnd.Unix())
if nextErr == nil {
if nextSnapshotRows.Next() {
if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil {
nextSnapshotTable = ""
}
}
nextSnapshotRows.Close()
}
nextSnapshotRows.Close()
}
// Build per-vCenter snapshot timelines from observed VM samples so deletion
@@ -362,7 +505,6 @@ LIMIT 1
vcenterSnapTimes[vcenter] = times
}
nextPresenceByVcenter := make(map[string]map[string]struct{}, 8)
if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) {
rows, err := querySnapshotRows(ctx, dbConn, nextSnapshotTable, []string{"Vcenter", "VmId", "VmUuid", "Name"}, "")
if err == nil {
@@ -439,7 +581,7 @@ LIMIT 1
if !presentByID && !presentByUUID && !presentByName {
v.deletion = firstMiss
inferredDeletions++
c.Logger.Debug("cross-day deletion inferred from next snapshot", "vcenter", v.key.Vcenter, "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable)
c.Logger.Debug("cross-day deletion inferred from next snapshot", "vcenter", v.key.Vcenter, "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable, "next_snapshot_time", nextSnapshotTime)
}
}
if v.deletion == 0 {
@@ -521,7 +663,7 @@ LIMIT 1
}
reportStart := time.Now()
c.Logger.Debug("Generating daily report", "table", summaryTable)
if err := c.generateReport(ctx, summaryTable); err != nil {
if err := c.generateReportWithPolicy(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
return err
}
@@ -1115,6 +1257,74 @@ WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
return agg, snapTimes, rows.Err()
}
func buildHourlyCacheLifecycleUnion(start, end time.Time) string {
return fmt.Sprintf(`
SELECT
"VmId","VmUuid","Name","Vcenter","CreationTime","DeletionTime","SnapshotTime"
FROM vm_hourly_stats
WHERE "SnapshotTime" >= %d AND "SnapshotTime" < %d
`, start.Unix(), end.Unix())
}
func loadNextHourlyCachePresence(ctx context.Context, dbConn *sqlx.DB, dayEnd time.Time) (map[string]map[string]struct{}, int64, error) {
presence := make(map[string]map[string]struct{}, 8)
query := dbConn.Rebind(`
WITH next_by_vcenter AS (
SELECT "Vcenter", MIN("SnapshotTime") AS snapshot_time
FROM vm_hourly_stats
WHERE "SnapshotTime" >= ?
GROUP BY "Vcenter"
)
SELECT h."Vcenter", h."VmId", h."VmUuid", h."Name", n.snapshot_time
FROM next_by_vcenter n
JOIN vm_hourly_stats h
ON h."Vcenter" = n."Vcenter"
AND h."SnapshotTime" = n.snapshot_time
`)
rows, err := dbConn.QueryxContext(ctx, query, dayEnd.Unix())
if err != nil {
return nil, 0, err
}
defer rows.Close()
var minSnapshotTime int64
for rows.Next() {
var (
vcenter string
vmID, vmUUID sql.NullString
name sql.NullString
snapshotTime sql.NullInt64
)
if err := rows.Scan(&vcenter, &vmID, &vmUUID, &name, &snapshotTime); err != nil {
continue
}
if strings.TrimSpace(vcenter) == "" {
continue
}
if snapshotTime.Valid && snapshotTime.Int64 > 0 && (minSnapshotTime == 0 || snapshotTime.Int64 < minSnapshotTime) {
minSnapshotTime = snapshotTime.Int64
}
vcPresence := presence[vcenter]
if vcPresence == nil {
vcPresence = make(map[string]struct{}, 1024)
presence[vcenter] = vcPresence
}
if vmID.Valid && strings.TrimSpace(vmID.String) != "" {
vcPresence["id:"+strings.TrimSpace(vmID.String)] = struct{}{}
}
if vmUUID.Valid && strings.TrimSpace(vmUUID.String) != "" {
vcPresence["uuid:"+strings.TrimSpace(vmUUID.String)] = struct{}{}
}
if name.Valid && strings.TrimSpace(name.String) != "" {
vcPresence["name:"+strings.ToLower(strings.TrimSpace(name.String))] = struct{}{}
}
}
if err := rows.Err(); err != nil {
return nil, 0, err
}
return presence, minSnapshotTime, nil
}
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error {
dbConn := c.Database.DB()
tx, err := dbConn.Beginx()