Files
vctp2/internal/tasks/dailyAggregate.go
Nathan Coad e186644db7
All checks were successful
continuous-integration/drone/push Build is passing
add repair functionality
2026-01-17 12:51:11 +11:00

777 lines
25 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"runtime"
"sort"
"strings"
"sync"
"time"
"vctp/db"
"vctp/internal/metrics"
"vctp/internal/report"
)
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error {
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
// Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier.
return c.aggregateDailySummary(jobCtx, targetTime, true)
})
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
jobStart := time.Now()
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
if os.Getenv("DAILY_AGG_GO") == "1" {
c.Logger.Debug("Using go implementation of aggregation")
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
return nil
}
}
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
// Ensure indexes exist on historical hourly tables for faster aggregation.
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd)
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter())
if err == nil {
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
} else {
c.Logger.Warn("unable to build previous day union", "error", err)
}
}
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
// Backfill missing creation times to the start of the day for rows where vCenter had no creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
dayStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable)
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), err)
return err
}
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
return nil
}
func dailySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
// as closely as possible while improving CPU utilization on multi-core hosts.
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
jobStart := time.Now()
dbConn := c.Database.DB()
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
// Clear existing summary if forcing.
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
totalSamples := len(hourlyTables)
var (
aggMap map[dailyAggKey]*dailyAggVal
snapTimes []int64
)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
}
}
if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
if errScan != nil {
return errScan
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Build ordered list of snapshot times for deletion inference.
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
}
for _, v := range aggMap {
if v.creation == 0 {
v.creation = v.firstSeen
}
// Infer deletion as the first snapshot time after lastSeen where the VM is absent.
for _, t := range snapTimes {
if t <= v.lastSeen {
continue
}
if _, ok := v.seen[t]; !ok {
v.deletion = t
break
}
}
}
// Insert aggregated rows.
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
return err
}
// Persist rollup cache for monthly aggregation.
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
}
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
type dailyAggKey struct {
Vcenter string
VmId string
VmUuid string
Name string
}
type dailyAggVal struct {
key dailyAggKey
resourcePool string
datacenter string
cluster string
folder string
isTemplate string
poweredOn string
srmPlaceholder string
creation int64
firstSeen int64
lastSeen int64
lastDisk float64
lastVcpu int64
lastRam int64
sumVcpu int64
sumRam int64
sumDisk float64
samples int64
tinHits int64
bronzeHits int64
silverHits int64
goldHits int64
seen map[int64]struct{}
deletion int64
}
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
mu := sync.Mutex{}
workers := runtime.NumCPU()
if workers < 2 {
workers = 2
}
if workers > len(snapshots) {
workers = len(snapshots)
}
jobs := make(chan report.SnapshotRecord, len(snapshots))
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for snap := range jobs {
rows, err := c.scanHourlyTable(ctx, snap)
if err != nil {
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
continue
}
mu.Lock()
for k, v := range rows {
if existing, ok := agg[k]; ok {
mergeDailyAgg(existing, v)
} else {
agg[k] = v
}
}
mu.Unlock()
}
}()
}
for _, snap := range snapshots {
jobs <- snap
}
close(jobs)
wg.Wait()
return agg, nil
}
func mergeDailyAgg(dst, src *dailyAggVal) {
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
dst.creation = src.creation
}
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
dst.firstSeen = src.firstSeen
}
if src.lastSeen > dst.lastSeen {
dst.lastSeen = src.lastSeen
dst.resourcePool = src.resourcePool
dst.datacenter = src.datacenter
dst.cluster = src.cluster
dst.folder = src.folder
dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder
dst.lastDisk = src.lastDisk
dst.lastVcpu = src.lastVcpu
dst.lastRam = src.lastRam
}
dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam
dst.sumDisk += src.sumDisk
dst.samples += src.samples
dst.tinHits += src.tinHits
dst.bronzeHits += src.bronzeHits
dst.silverHits += src.silverHits
dst.goldHits += src.goldHits
if dst.seen == nil {
dst.seen = make(map[int64]struct{}, len(src.seen))
}
for t := range src.seen {
dst.seen[t] = struct{}{}
}
}
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
dbConn := c.Database.DB()
query := fmt.Sprintf(`
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'FALSE') AS is_template,
COALESCE("PoweredOn",'FALSE') AS powered_on,
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
COALESCE("SnapshotTime",0) AS snapshot_time
FROM %s
`, snap.TableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
out := make(map[dailyAggKey]*dailyAggVal, 256)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
// Skip templates.
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: int64OrZero(snapshotTime),
lastSeen: int64OrZero(snapshotTime),
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
seen: map[int64]struct{}{int64OrZero(snapshotTime): {}},
}
out[key] = row
}
return out, nil
}
// scanHourlyCache aggregates directly from vm_hourly_stats when available.
func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) {
dbConn := c.Database.DB()
query := `
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'') AS is_template,
COALESCE("PoweredOn",'') AS powered_on,
COALESCE("SrmPlaceholder",'') AS srm_placeholder,
"SnapshotTime"
FROM vm_hourly_stats
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
q := dbConn.Rebind(query)
rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix())
if err != nil {
return nil, nil, err
}
defer rows.Close()
agg := make(map[dailyAggKey]*dailyAggVal, 512)
timeSet := make(map[int64]struct{}, 64)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
snapTs := int64OrZero(snapshotTime)
timeSet[snapTs] = struct{}{}
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: snapTs,
lastSeen: snapTs,
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
seen: map[int64]struct{}{snapTs: {}},
}
if deletion.Valid && deletion.Int64 > 0 {
row.deletion = deletion.Int64
}
if existing, ok := agg[key]; ok {
mergeDailyAgg(existing, row)
} else {
agg[key] = row
}
}
snapTimes := make([]int64, 0, len(timeSet))
for t := range timeSet {
snapTimes = append(snapTimes, t)
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
return agg, snapTimes, rows.Err()
}
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB()
tx, err := dbConn.Beginx()
if err != nil {
return err
}
defer tx.Rollback()
driver := strings.ToLower(dbConn.DriverName())
placeholders := makePlaceholders(driver, 30)
insert := fmt.Sprintf(`
INSERT INTO %s (
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
"CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
) VALUES (%s)
`, table, placeholders)
for _, v := range agg {
if v.samples == 0 {
continue
}
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
avgRam := float64(v.sumRam) / float64(v.samples)
avgDisk := v.sumDisk / float64(v.samples)
total := float64(totalSamples)
avgPresent := 0.0
tinPct := 0.0
bronzePct := 0.0
silverPct := 0.0
goldPct := 0.0
if total > 0 {
avgPresent = float64(v.samples) / total
}
if v.samples > 0 {
tinPct = float64(v.tinHits) * 100 / float64(v.samples)
bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples)
silverPct = float64(v.silverHits) * 100 / float64(v.samples)
goldPct = float64(v.goldHits) * 100 / float64(v.samples)
}
args := []interface{}{
v.key.Name,
v.key.Vcenter,
nullIfEmpty(v.key.VmId),
nullIfEmpty(v.key.VmUuid),
v.resourcePool,
nullIfEmpty(v.datacenter),
nullIfEmpty(v.cluster),
nullIfEmpty(v.folder),
v.lastDisk,
v.lastVcpu,
v.lastRam,
v.isTemplate,
v.poweredOn,
v.srmPlaceholder,
v.creation,
v.deletion,
v.lastSeen,
v.samples,
avgVcpu,
avgRam,
avgDisk,
avgPresent,
tinPct, bronzePct, silverPct, goldPct,
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
}
if driver != "sqlite" {
// Postgres expects primitive types, nulls are handled by pq via nil.
for i, a := range args {
if s, ok := a.(string); ok && s == "" {
args[i] = nil
}
}
}
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
return err
}
}
return tx.Commit()
}
func int64OrZero(v sql.NullInt64) int64 {
if v.Valid {
return v.Int64
}
return 0
}
func nullIfEmpty(s string) interface{} {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func makePlaceholders(driver string, n int) string {
if driver == "sqlite" {
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = "?"
}
return strings.Join(parts, ",")
}
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = fmt.Sprintf("$%d", i+1)
}
return strings.Join(parts, ",")
}
func btoi(b bool) int64 {
if b {
return 1
}
return 0
}
// persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation.
func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB()
for _, v := range agg {
if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" {
continue
}
row := db.VmDailyRollupRow{
Vcenter: v.key.Vcenter,
VmId: v.key.VmId,
VmUuid: v.key.VmUuid,
Name: v.key.Name,
CreationTime: v.creation,
DeletionTime: v.deletion,
SamplesPresent: v.samples,
TotalSamples: int64(totalSamples),
SumVcpu: float64(v.sumVcpu),
SumRam: float64(v.sumRam),
SumDisk: v.sumDisk,
TinHits: v.tinHits,
BronzeHits: v.bronzeHits,
SilverHits: v.silverHits,
GoldHits: v.goldHits,
LastResourcePool: v.resourcePool,
LastDatacenter: v.datacenter,
LastCluster: v.cluster,
LastFolder: v.folder,
LastProvisionedDisk: v.lastDisk,
LastVcpuCount: v.lastVcpu,
LastRamGB: v.lastRam,
IsTemplate: v.isTemplate,
PoweredOn: v.poweredOn,
SrmPlaceholder: v.srmPlaceholder,
}
if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil {
return err
}
}
return nil
}