Files
vctp2/internal/tasks/dailyAggregate.go
Nathan Coad 7ea02be91a
All checks were successful
continuous-integration/drone/push Build is passing
refactor code and improve daily cache handling of deleted VMs
2026-01-20 16:46:07 +11:00

743 lines
24 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"runtime"
"sort"
"strings"
"sync"
"time"
"vctp/db"
"vctp/internal/metrics"
"vctp/internal/report"
)
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error {
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
// Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier.
return c.aggregateDailySummary(jobCtx, targetTime, true)
})
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
jobStart := time.Now()
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
if os.Getenv("DAILY_AGG_GO") == "1" {
c.Logger.Debug("Using go implementation of aggregation")
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
} else {
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
return nil
}
}
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
// Ensure indexes exist on historical hourly tables for faster aggregation.
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd)
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter())
if err == nil {
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
} else {
c.Logger.Warn("unable to build previous day union", "error", err)
}
}
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
if err != nil {
return err
}
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
// Backfill missing creation times to the start of the day for rows where vCenter had no creation info.
if _, err := dbConn.ExecContext(ctx,
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
dayStart.Unix(),
); err != nil {
c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable)
}
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), err)
return err
}
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
return nil
}
func dailySummaryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
// as closely as possible while improving CPU utilization on multi-core hosts.
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
jobStart := time.Now()
dbConn := c.Database.DB()
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
if err != nil {
return err
}
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
}
}
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
if err != nil {
return err
}
// Clear existing summary if forcing.
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
totalSamples := len(hourlyTables)
var (
aggMap map[dailyAggKey]*dailyAggVal
snapTimes []int64
)
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
if cacheErr != nil {
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
} else if len(cacheAgg) > 0 {
aggMap = cacheAgg
snapTimes = cacheTimes
totalSamples = len(cacheTimes)
}
}
if aggMap == nil {
var errScan error
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
if errScan != nil {
return errScan
}
if len(aggMap) == 0 {
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
}
// Build ordered list of snapshot times for deletion inference.
snapTimes = make([]int64, 0, len(hourlySnapshots))
for _, snap := range hourlySnapshots {
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
}
for _, v := range aggMap {
if v.creation == 0 {
v.creation = v.firstSeen
}
// Infer deletion as the first snapshot time after lastSeen where the VM is absent.
for _, t := range snapTimes {
if t <= v.lastSeen {
continue
}
if _, ok := v.seen[t]; !ok {
v.deletion = t
break
}
}
}
// Insert aggregated rows.
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
return err
}
// Persist rollup cache for monthly aggregation.
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
}
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
}
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
if err != nil {
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
if err := c.generateReport(ctx, summaryTable); err != nil {
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
return err
}
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
return nil
}
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
mu := sync.Mutex{}
workers := runtime.NumCPU()
if workers < 2 {
workers = 2
}
if workers > len(snapshots) {
workers = len(snapshots)
}
jobs := make(chan report.SnapshotRecord, len(snapshots))
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for snap := range jobs {
rows, err := c.scanHourlyTable(ctx, snap)
if err != nil {
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
continue
}
mu.Lock()
for k, v := range rows {
if existing, ok := agg[k]; ok {
mergeDailyAgg(existing, v)
} else {
agg[k] = v
}
}
mu.Unlock()
}
}()
}
for _, snap := range snapshots {
jobs <- snap
}
close(jobs)
wg.Wait()
return agg, nil
}
func mergeDailyAgg(dst, src *dailyAggVal) {
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
dst.creation = src.creation
}
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
dst.firstSeen = src.firstSeen
}
if src.lastSeen > dst.lastSeen {
dst.lastSeen = src.lastSeen
dst.resourcePool = src.resourcePool
dst.datacenter = src.datacenter
dst.cluster = src.cluster
dst.folder = src.folder
dst.isTemplate = src.isTemplate
dst.poweredOn = src.poweredOn
dst.srmPlaceholder = src.srmPlaceholder
dst.lastDisk = src.lastDisk
dst.lastVcpu = src.lastVcpu
dst.lastRam = src.lastRam
}
dst.sumVcpu += src.sumVcpu
dst.sumRam += src.sumRam
dst.sumDisk += src.sumDisk
dst.samples += src.samples
dst.tinHits += src.tinHits
dst.bronzeHits += src.bronzeHits
dst.silverHits += src.silverHits
dst.goldHits += src.goldHits
if dst.seen == nil {
dst.seen = make(map[int64]struct{}, len(src.seen))
}
for t := range src.seen {
dst.seen[t] = struct{}{}
}
}
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
dbConn := c.Database.DB()
query := fmt.Sprintf(`
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'FALSE') AS is_template,
COALESCE("PoweredOn",'FALSE') AS powered_on,
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
COALESCE("SnapshotTime",0) AS snapshot_time
FROM %s
`, snap.TableName)
rows, err := dbConn.QueryxContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
out := make(map[dailyAggKey]*dailyAggVal, 256)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
// Skip templates.
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: int64OrZero(snapshotTime),
lastSeen: int64OrZero(snapshotTime),
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
seen: map[int64]struct{}{int64OrZero(snapshotTime): {}},
}
out[key] = row
}
return out, nil
}
// scanHourlyCache aggregates directly from vm_hourly_stats when available.
func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) {
dbConn := c.Database.DB()
query := `
SELECT
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
COALESCE("ProvisionedDisk",0) AS disk,
COALESCE("VcpuCount",0) AS vcpu,
COALESCE("RamGB",0) AS ram,
COALESCE("CreationTime",0) AS creation,
COALESCE("DeletionTime",0) AS deletion,
COALESCE("IsTemplate",'') AS is_template,
COALESCE("PoweredOn",'') AS powered_on,
COALESCE("SrmPlaceholder",'') AS srm_placeholder,
"SnapshotTime"
FROM vm_hourly_stats
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
q := dbConn.Rebind(query)
rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix())
if err != nil {
return nil, nil, err
}
defer rows.Close()
agg := make(map[dailyAggKey]*dailyAggVal, 512)
timeSet := make(map[int64]struct{}, 64)
for rows.Next() {
var (
name, vcenter, vmId, vmUuid, resourcePool string
dc, cluster, folder sql.NullString
disk sql.NullFloat64
vcpu, ram sql.NullInt64
creation, deletion, snapshotTime sql.NullInt64
isTemplate, poweredOn, srmPlaceholder sql.NullString
)
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
continue
}
if vcenter == "" {
continue
}
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
continue
}
key := dailyAggKey{
Vcenter: vcenter,
VmId: strings.TrimSpace(vmId),
VmUuid: strings.TrimSpace(vmUuid),
Name: strings.TrimSpace(name),
}
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
continue
}
if key.VmId == "" {
key.VmId = key.VmUuid
}
pool := strings.ToLower(strings.TrimSpace(resourcePool))
hitTin := btoi(pool == "tin")
hitBronze := btoi(pool == "bronze")
hitSilver := btoi(pool == "silver")
hitGold := btoi(pool == "gold")
snapTs := int64OrZero(snapshotTime)
timeSet[snapTs] = struct{}{}
row := &dailyAggVal{
key: key,
resourcePool: resourcePool,
datacenter: dc.String,
cluster: cluster.String,
folder: folder.String,
isTemplate: isTemplate.String,
poweredOn: poweredOn.String,
srmPlaceholder: srmPlaceholder.String,
creation: int64OrZero(creation),
firstSeen: snapTs,
lastSeen: snapTs,
lastDisk: disk.Float64,
lastVcpu: vcpu.Int64,
lastRam: ram.Int64,
sumVcpu: vcpu.Int64,
sumRam: ram.Int64,
sumDisk: disk.Float64,
samples: 1,
tinHits: hitTin,
bronzeHits: hitBronze,
silverHits: hitSilver,
goldHits: hitGold,
seen: map[int64]struct{}{snapTs: {}},
}
if deletion.Valid && deletion.Int64 > 0 {
row.deletion = deletion.Int64
}
if existing, ok := agg[key]; ok {
mergeDailyAgg(existing, row)
} else {
agg[key] = row
}
}
snapTimes := make([]int64, 0, len(timeSet))
for t := range timeSet {
snapTimes = append(snapTimes, t)
}
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
return agg, snapTimes, rows.Err()
}
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB()
tx, err := dbConn.Beginx()
if err != nil {
return err
}
defer tx.Rollback()
driver := strings.ToLower(dbConn.DriverName())
placeholders := makePlaceholders(driver, 30)
insert := fmt.Sprintf(`
INSERT INTO %s (
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
"CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
) VALUES (%s)
`, table, placeholders)
for _, v := range agg {
if v.samples == 0 {
continue
}
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
avgRam := float64(v.sumRam) / float64(v.samples)
avgDisk := v.sumDisk / float64(v.samples)
total := float64(totalSamples)
avgPresent := 0.0
tinPct := 0.0
bronzePct := 0.0
silverPct := 0.0
goldPct := 0.0
if total > 0 {
avgPresent = float64(v.samples) / total
}
if v.samples > 0 {
tinPct = float64(v.tinHits) * 100 / float64(v.samples)
bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples)
silverPct = float64(v.silverHits) * 100 / float64(v.samples)
goldPct = float64(v.goldHits) * 100 / float64(v.samples)
}
args := []interface{}{
v.key.Name,
v.key.Vcenter,
nullIfEmpty(v.key.VmId),
nullIfEmpty(v.key.VmUuid),
v.resourcePool,
nullIfEmpty(v.datacenter),
nullIfEmpty(v.cluster),
nullIfEmpty(v.folder),
v.lastDisk,
v.lastVcpu,
v.lastRam,
v.isTemplate,
v.poweredOn,
v.srmPlaceholder,
v.creation,
v.deletion,
v.lastSeen,
v.samples,
avgVcpu,
avgRam,
avgDisk,
avgPresent,
tinPct, bronzePct, silverPct, goldPct,
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
}
if driver != "sqlite" {
// Postgres expects primitive types, nulls are handled by pq via nil.
for i, a := range args {
if s, ok := a.(string); ok && s == "" {
args[i] = nil
}
}
}
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
return err
}
}
return tx.Commit()
}
func int64OrZero(v sql.NullInt64) int64 {
if v.Valid {
return v.Int64
}
return 0
}
func nullIfEmpty(s string) interface{} {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func makePlaceholders(driver string, n int) string {
if driver == "sqlite" {
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = "?"
}
return strings.Join(parts, ",")
}
parts := make([]string, n)
for i := 0; i < n; i++ {
parts[i] = fmt.Sprintf("$%d", i+1)
}
return strings.Join(parts, ",")
}
func btoi(b bool) int64 {
if b {
return 1
}
return 0
}
// persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation.
func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
dbConn := c.Database.DB()
for _, v := range agg {
if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" {
continue
}
row := db.VmDailyRollupRow{
Vcenter: v.key.Vcenter,
VmId: v.key.VmId,
VmUuid: v.key.VmUuid,
Name: v.key.Name,
CreationTime: v.creation,
DeletionTime: v.deletion,
SamplesPresent: v.samples,
TotalSamples: int64(totalSamples),
SumVcpu: float64(v.sumVcpu),
SumRam: float64(v.sumRam),
SumDisk: v.sumDisk,
TinHits: v.tinHits,
BronzeHits: v.bronzeHits,
SilverHits: v.silverHits,
GoldHits: v.goldHits,
LastResourcePool: v.resourcePool,
LastDatacenter: v.datacenter,
LastCluster: v.cluster,
LastFolder: v.folder,
LastProvisionedDisk: v.lastDisk,
LastVcpuCount: v.lastVcpu,
LastRamGB: v.lastRam,
IsTemplate: v.isTemplate,
PoweredOn: v.poweredOn,
SrmPlaceholder: v.srmPlaceholder,
}
if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil {
return err
}
}
return nil
}