828 lines
28 KiB
Go
828 lines
28 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"vctp/db"
|
|
"vctp/internal/metrics"
|
|
"vctp/internal/report"
|
|
)
|
|
|
|
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
|
|
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
|
|
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.DailyJobTimeoutSeconds, 15*time.Minute)
|
|
return c.runAggregateJob(ctx, "daily_aggregate", jobTimeout, func(jobCtx context.Context) error {
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
|
|
}()
|
|
// Aggregate the previous day to avoid partial "today" data when the job runs just after midnight.
|
|
targetTime := time.Now().AddDate(0, 0, -1)
|
|
logger.Info("Daily summary job starting", "target_date", targetTime.Format("2006-01-02"))
|
|
// Always force regeneration on the scheduled run to refresh data even if a manual run happened earlier.
|
|
return c.aggregateDailySummary(jobCtx, targetTime, true)
|
|
})
|
|
}
|
|
|
|
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
|
|
return c.aggregateDailySummary(ctx, date, force)
|
|
}
|
|
|
|
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
|
|
jobStart := time.Now()
|
|
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
|
|
dayEnd := dayStart.AddDate(0, 0, 1)
|
|
c.Logger.Info("Daily aggregation window", "start", dayStart, "end", dayEnd)
|
|
summaryTable, err := dailySummaryTableName(targetTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dbConn := c.Database.DB()
|
|
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
}
|
|
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
|
|
return err
|
|
}
|
|
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
} else if rowsExist && !force {
|
|
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
|
|
return nil
|
|
} else if rowsExist && force {
|
|
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
|
|
if os.Getenv("DAILY_AGG_GO") == "1" {
|
|
c.Logger.Debug("Using go implementation of aggregation")
|
|
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
|
|
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
|
|
} else {
|
|
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
|
|
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
|
|
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
|
c.Logger.Info("Daily aggregation hourly snapshot count", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02"))
|
|
if len(hourlySnapshots) == 0 {
|
|
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
|
}
|
|
|
|
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
|
for _, snapshot := range hourlySnapshots {
|
|
hourlyTables = append(hourlyTables, snapshot.TableName)
|
|
}
|
|
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
currentTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
|
|
} else {
|
|
c.Logger.Info("Daily snapshot totals",
|
|
"date", dayStart.Format("2006-01-02"),
|
|
"vm_count", currentTotals.VmCount,
|
|
"vcpu_total", currentTotals.VcpuTotal,
|
|
"ram_total_gb", currentTotals.RamTotal,
|
|
"disk_total_gb", currentTotals.DiskTotal,
|
|
)
|
|
}
|
|
|
|
prevStart := dayStart.AddDate(0, 0, -1)
|
|
prevEnd := dayStart
|
|
prevSnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", prevStart, prevEnd)
|
|
if err == nil && len(prevSnapshots) > 0 {
|
|
prevSnapshots = filterRecordsInRange(prevSnapshots, prevStart, prevEnd)
|
|
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
|
|
prevTables := make([]string, 0, len(prevSnapshots))
|
|
for _, snapshot := range prevSnapshots {
|
|
prevTables = append(prevTables, snapshot.TableName)
|
|
}
|
|
prevUnion, err := buildUnionQuery(prevTables, summaryUnionColumns, templateExclusionFilter())
|
|
if err == nil {
|
|
prevTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, prevUnion)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
|
|
} else {
|
|
c.Logger.Info("Daily snapshot comparison",
|
|
"current_date", dayStart.Format("2006-01-02"),
|
|
"previous_date", prevStart.Format("2006-01-02"),
|
|
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
|
|
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
|
|
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
|
|
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
|
|
)
|
|
}
|
|
} else {
|
|
c.Logger.Warn("unable to build previous day union", "error", err)
|
|
}
|
|
}
|
|
|
|
insertQuery, err := db.BuildDailySummaryInsert(summaryTable, unionQuery)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
|
|
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
|
|
return err
|
|
}
|
|
// Backfill missing creation times to the start of the day for rows where vCenter had no creation info.
|
|
if _, err := dbConn.ExecContext(ctx,
|
|
`UPDATE `+summaryTable+` SET "CreationTime" = $1 WHERE "CreationTime" IS NULL OR "CreationTime" = 0`,
|
|
dayStart.Unix(),
|
|
); err != nil {
|
|
c.Logger.Warn("failed to normalize creation times for daily summary", "error", err, "table", summaryTable)
|
|
}
|
|
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
|
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
|
}
|
|
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
|
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
|
}
|
|
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
|
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
|
}
|
|
|
|
if err := c.generateReport(ctx, summaryTable); err != nil {
|
|
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
|
metrics.RecordDailyAggregation(time.Since(jobStart), err)
|
|
return err
|
|
}
|
|
if err := db.CheckpointSQLite(ctx, dbConn); err != nil {
|
|
c.Logger.Warn("failed to checkpoint sqlite after daily aggregation", "error", err)
|
|
}
|
|
|
|
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
|
|
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
|
|
return nil
|
|
}
|
|
|
|
func dailySummaryTableName(t time.Time) (string, error) {
|
|
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
|
}
|
|
|
|
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
|
|
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
|
|
// as closely as possible while improving CPU utilization on multi-core hosts.
|
|
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
|
|
jobStart := time.Now()
|
|
dbConn := c.Database.DB()
|
|
|
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
|
|
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
|
c.Logger.Info("Daily aggregation hourly snapshot count (go path)", "count", len(hourlySnapshots), "date", dayStart.Format("2006-01-02"))
|
|
if len(hourlySnapshots) == 0 {
|
|
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
|
} else {
|
|
c.Logger.Debug("Found hourly snapshot tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots))
|
|
}
|
|
|
|
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
|
for _, snapshot := range hourlySnapshots {
|
|
hourlyTables = append(hourlyTables, snapshot.TableName)
|
|
}
|
|
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Clear existing summary if forcing.
|
|
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
} else if rowsExist && !force {
|
|
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
|
|
return nil
|
|
} else if rowsExist && force {
|
|
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
totalSamples := len(hourlyTables)
|
|
var (
|
|
aggMap map[dailyAggKey]*dailyAggVal
|
|
snapTimes []int64
|
|
)
|
|
|
|
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
|
|
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, dayStart, dayEnd)
|
|
if cacheErr != nil {
|
|
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
|
|
} else if len(cacheAgg) > 0 {
|
|
c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
|
|
aggMap = cacheAgg
|
|
snapTimes = cacheTimes
|
|
totalSamples = len(cacheTimes)
|
|
}
|
|
}
|
|
|
|
if aggMap == nil {
|
|
var errScan error
|
|
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
|
|
if errScan != nil {
|
|
return errScan
|
|
}
|
|
c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
|
|
if len(aggMap) == 0 {
|
|
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
|
|
}
|
|
|
|
// Build ordered list of snapshot times for deletion inference.
|
|
snapTimes = make([]int64, 0, len(hourlySnapshots))
|
|
for _, snap := range hourlySnapshots {
|
|
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
|
|
}
|
|
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
|
|
}
|
|
|
|
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
|
|
var nextSnapshotTable string
|
|
nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, `
|
|
SELECT table_name
|
|
FROM snapshot_registry
|
|
WHERE snapshot_type = 'hourly' AND snapshot_time >= ?
|
|
ORDER BY snapshot_time ASC
|
|
LIMIT 1
|
|
`, dayEnd.Unix())
|
|
if nextErr == nil {
|
|
if nextSnapshotRows.Next() {
|
|
if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil {
|
|
nextSnapshotTable = ""
|
|
}
|
|
}
|
|
nextSnapshotRows.Close()
|
|
}
|
|
nextPresence := make(map[string]struct{})
|
|
if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) {
|
|
rows, err := querySnapshotRows(ctx, dbConn, nextSnapshotTable, []string{"VmId", "VmUuid", "Name"}, `"Vcenter" = ?`, c.Settings.Values.Settings.VcenterAddresses[0])
|
|
if err == nil {
|
|
for rows.Next() {
|
|
var vmId, vmUuid, name sql.NullString
|
|
if err := rows.Scan(&vmId, &vmUuid, &name); err == nil {
|
|
if vmId.Valid {
|
|
nextPresence["id:"+vmId.String] = struct{}{}
|
|
}
|
|
if vmUuid.Valid {
|
|
nextPresence["uuid:"+vmUuid.String] = struct{}{}
|
|
}
|
|
if name.Valid {
|
|
nextPresence["name:"+name.String] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
rows.Close()
|
|
}
|
|
}
|
|
|
|
var maxSnap int64
|
|
if len(snapTimes) > 0 {
|
|
maxSnap = snapTimes[len(snapTimes)-1]
|
|
}
|
|
|
|
for _, v := range aggMap {
|
|
// Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen.
|
|
if maxSnap > 0 && len(v.seen) > 0 && v.lastSeen < maxSnap {
|
|
c.Logger.Debug("inferring deletion window", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(snapTimes))
|
|
}
|
|
consecutiveMisses := 0
|
|
firstMiss := int64(0)
|
|
for _, t := range snapTimes {
|
|
if t <= v.lastSeen {
|
|
continue
|
|
}
|
|
if _, ok := v.seen[t]; ok {
|
|
consecutiveMisses = 0
|
|
firstMiss = 0
|
|
continue
|
|
}
|
|
consecutiveMisses++
|
|
if firstMiss == 0 {
|
|
firstMiss = t
|
|
}
|
|
if consecutiveMisses >= 2 {
|
|
v.deletion = firstMiss
|
|
break
|
|
}
|
|
}
|
|
if v.deletion == 0 && firstMiss > 0 {
|
|
// Not enough consecutive misses within the day; try to use the first snapshot of the next day to confirm.
|
|
if nextSnapshotTable != "" && len(nextPresence) > 0 {
|
|
_, presentByID := nextPresence["id:"+v.key.VmId]
|
|
_, presentByUUID := nextPresence["uuid:"+v.key.VmUuid]
|
|
_, presentByName := nextPresence["name:"+v.key.Name]
|
|
if !presentByID && !presentByUUID && !presentByName {
|
|
v.deletion = firstMiss
|
|
c.Logger.Debug("cross-day deletion inferred from next snapshot", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable)
|
|
}
|
|
}
|
|
if v.deletion == 0 {
|
|
c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert aggregated rows.
|
|
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
|
return err
|
|
}
|
|
c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples)
|
|
|
|
// Persist rollup cache for monthly aggregation.
|
|
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
|
|
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
|
|
}
|
|
|
|
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
|
|
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
|
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
|
}
|
|
|
|
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
|
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
|
}
|
|
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
|
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
|
}
|
|
if err := c.generateReport(ctx, summaryTable); err != nil {
|
|
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
|
return err
|
|
}
|
|
if err := db.CheckpointSQLite(ctx, dbConn); err != nil {
|
|
c.Logger.Warn("failed to checkpoint sqlite after daily aggregation (Go path)", "error", err)
|
|
}
|
|
|
|
c.Logger.Debug("Finished daily inventory aggregation (Go path)",
|
|
"summary_table", summaryTable,
|
|
"duration", time.Since(jobStart),
|
|
"tables_scanned", len(hourlyTables),
|
|
"rows_written", rowCount,
|
|
"total_samples", totalSamples,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
|
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
|
|
mu := sync.Mutex{}
|
|
workers := runtime.NumCPU()
|
|
if workers < 2 {
|
|
workers = 2
|
|
}
|
|
if workers > len(snapshots) {
|
|
workers = len(snapshots)
|
|
}
|
|
|
|
jobs := make(chan report.SnapshotRecord, len(snapshots))
|
|
wg := sync.WaitGroup{}
|
|
for i := 0; i < workers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for snap := range jobs {
|
|
rows, err := c.scanHourlyTable(ctx, snap)
|
|
if err != nil {
|
|
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
|
|
continue
|
|
}
|
|
mu.Lock()
|
|
for k, v := range rows {
|
|
if existing, ok := agg[k]; ok {
|
|
mergeDailyAgg(existing, v)
|
|
} else {
|
|
agg[k] = v
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
for _, snap := range snapshots {
|
|
jobs <- snap
|
|
}
|
|
close(jobs)
|
|
wg.Wait()
|
|
return agg, nil
|
|
}
|
|
|
|
func mergeDailyAgg(dst, src *dailyAggVal) {
|
|
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
|
dst.creation = src.creation
|
|
}
|
|
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
|
|
dst.firstSeen = src.firstSeen
|
|
}
|
|
if src.lastSeen > dst.lastSeen {
|
|
dst.lastSeen = src.lastSeen
|
|
dst.resourcePool = src.resourcePool
|
|
dst.datacenter = src.datacenter
|
|
dst.cluster = src.cluster
|
|
dst.folder = src.folder
|
|
dst.isTemplate = src.isTemplate
|
|
dst.poweredOn = src.poweredOn
|
|
dst.srmPlaceholder = src.srmPlaceholder
|
|
dst.lastDisk = src.lastDisk
|
|
dst.lastVcpu = src.lastVcpu
|
|
dst.lastRam = src.lastRam
|
|
}
|
|
dst.sumVcpu += src.sumVcpu
|
|
dst.sumRam += src.sumRam
|
|
dst.sumDisk += src.sumDisk
|
|
dst.samples += src.samples
|
|
dst.tinHits += src.tinHits
|
|
dst.bronzeHits += src.bronzeHits
|
|
dst.silverHits += src.silverHits
|
|
dst.goldHits += src.goldHits
|
|
if dst.seen == nil {
|
|
dst.seen = make(map[int64]struct{}, len(src.seen))
|
|
}
|
|
for t := range src.seen {
|
|
dst.seen[t] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
|
dbConn := c.Database.DB()
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
|
COALESCE("ProvisionedDisk",0) AS disk,
|
|
COALESCE("VcpuCount",0) AS vcpu,
|
|
COALESCE("RamGB",0) AS ram,
|
|
COALESCE("CreationTime",0) AS creation,
|
|
COALESCE("DeletionTime",0) AS deletion,
|
|
COALESCE("IsTemplate",'FALSE') AS is_template,
|
|
COALESCE("PoweredOn",'FALSE') AS powered_on,
|
|
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
|
|
COALESCE("SnapshotTime",0) AS snapshot_time
|
|
FROM %s
|
|
`, snap.TableName)
|
|
rows, err := dbConn.QueryxContext(ctx, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make(map[dailyAggKey]*dailyAggVal, 256)
|
|
for rows.Next() {
|
|
var (
|
|
name, vcenter, vmId, vmUuid, resourcePool string
|
|
dc, cluster, folder sql.NullString
|
|
disk sql.NullFloat64
|
|
vcpu, ram sql.NullInt64
|
|
creation, deletion, snapshotTime sql.NullInt64
|
|
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
|
)
|
|
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
|
|
continue
|
|
}
|
|
if vcenter == "" {
|
|
continue
|
|
}
|
|
// Skip templates.
|
|
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
|
|
continue
|
|
}
|
|
key := dailyAggKey{
|
|
Vcenter: vcenter,
|
|
VmId: strings.TrimSpace(vmId),
|
|
VmUuid: strings.TrimSpace(vmUuid),
|
|
Name: strings.TrimSpace(name),
|
|
}
|
|
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
|
|
continue
|
|
}
|
|
if key.VmId == "" {
|
|
key.VmId = key.VmUuid
|
|
}
|
|
pool := strings.ToLower(strings.TrimSpace(resourcePool))
|
|
hitTin := btoi(pool == "tin")
|
|
hitBronze := btoi(pool == "bronze")
|
|
hitSilver := btoi(pool == "silver")
|
|
hitGold := btoi(pool == "gold")
|
|
row := &dailyAggVal{
|
|
key: key,
|
|
resourcePool: resourcePool,
|
|
datacenter: dc.String,
|
|
cluster: cluster.String,
|
|
folder: folder.String,
|
|
isTemplate: isTemplate.String,
|
|
poweredOn: poweredOn.String,
|
|
srmPlaceholder: srmPlaceholder.String,
|
|
creation: int64OrZero(creation),
|
|
firstSeen: int64OrZero(snapshotTime),
|
|
lastSeen: int64OrZero(snapshotTime),
|
|
lastDisk: disk.Float64,
|
|
lastVcpu: vcpu.Int64,
|
|
lastRam: ram.Int64,
|
|
sumVcpu: vcpu.Int64,
|
|
sumRam: ram.Int64,
|
|
sumDisk: disk.Float64,
|
|
samples: 1,
|
|
tinHits: hitTin,
|
|
bronzeHits: hitBronze,
|
|
silverHits: hitSilver,
|
|
goldHits: hitGold,
|
|
seen: map[int64]struct{}{int64OrZero(snapshotTime): {}},
|
|
}
|
|
out[key] = row
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// scanHourlyCache aggregates directly from vm_hourly_stats when available.
|
|
func (c *CronTask) scanHourlyCache(ctx context.Context, start, end time.Time) (map[dailyAggKey]*dailyAggVal, []int64, error) {
|
|
dbConn := c.Database.DB()
|
|
query := `
|
|
SELECT
|
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
|
COALESCE("ProvisionedDisk",0) AS disk,
|
|
COALESCE("VcpuCount",0) AS vcpu,
|
|
COALESCE("RamGB",0) AS ram,
|
|
COALESCE("CreationTime",0) AS creation,
|
|
COALESCE("DeletionTime",0) AS deletion,
|
|
COALESCE("IsTemplate",'') AS is_template,
|
|
COALESCE("PoweredOn",'') AS powered_on,
|
|
COALESCE("SrmPlaceholder",'') AS srm_placeholder,
|
|
"SnapshotTime"
|
|
FROM vm_hourly_stats
|
|
WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
|
|
q := dbConn.Rebind(query)
|
|
rows, err := dbConn.QueryxContext(ctx, q, start.Unix(), end.Unix())
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
agg := make(map[dailyAggKey]*dailyAggVal, 512)
|
|
timeSet := make(map[int64]struct{}, 64)
|
|
for rows.Next() {
|
|
var (
|
|
name, vcenter, vmId, vmUuid, resourcePool string
|
|
dc, cluster, folder sql.NullString
|
|
disk sql.NullFloat64
|
|
vcpu, ram sql.NullInt64
|
|
creation, deletion, snapshotTime sql.NullInt64
|
|
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
|
)
|
|
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
|
|
continue
|
|
}
|
|
if vcenter == "" {
|
|
continue
|
|
}
|
|
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
|
|
continue
|
|
}
|
|
key := dailyAggKey{
|
|
Vcenter: vcenter,
|
|
VmId: strings.TrimSpace(vmId),
|
|
VmUuid: strings.TrimSpace(vmUuid),
|
|
Name: strings.TrimSpace(name),
|
|
}
|
|
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
|
|
continue
|
|
}
|
|
if key.VmId == "" {
|
|
key.VmId = key.VmUuid
|
|
}
|
|
pool := strings.ToLower(strings.TrimSpace(resourcePool))
|
|
hitTin := btoi(pool == "tin")
|
|
hitBronze := btoi(pool == "bronze")
|
|
hitSilver := btoi(pool == "silver")
|
|
hitGold := btoi(pool == "gold")
|
|
|
|
snapTs := int64OrZero(snapshotTime)
|
|
timeSet[snapTs] = struct{}{}
|
|
|
|
row := &dailyAggVal{
|
|
key: key,
|
|
resourcePool: resourcePool,
|
|
datacenter: dc.String,
|
|
cluster: cluster.String,
|
|
folder: folder.String,
|
|
isTemplate: isTemplate.String,
|
|
poweredOn: poweredOn.String,
|
|
srmPlaceholder: srmPlaceholder.String,
|
|
creation: int64OrZero(creation),
|
|
firstSeen: snapTs,
|
|
lastSeen: snapTs,
|
|
lastDisk: disk.Float64,
|
|
lastVcpu: vcpu.Int64,
|
|
lastRam: ram.Int64,
|
|
sumVcpu: vcpu.Int64,
|
|
sumRam: ram.Int64,
|
|
sumDisk: disk.Float64,
|
|
samples: 1,
|
|
tinHits: hitTin,
|
|
bronzeHits: hitBronze,
|
|
silverHits: hitSilver,
|
|
goldHits: hitGold,
|
|
seen: map[int64]struct{}{snapTs: {}},
|
|
}
|
|
if deletion.Valid && deletion.Int64 > 0 {
|
|
row.deletion = deletion.Int64
|
|
}
|
|
if existing, ok := agg[key]; ok {
|
|
mergeDailyAgg(existing, row)
|
|
} else {
|
|
agg[key] = row
|
|
}
|
|
}
|
|
snapTimes := make([]int64, 0, len(timeSet))
|
|
for t := range timeSet {
|
|
snapTimes = append(snapTimes, t)
|
|
}
|
|
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
|
|
return agg, snapTimes, rows.Err()
|
|
}
|
|
|
|
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
|
dbConn := c.Database.DB()
|
|
tx, err := dbConn.Beginx()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
placeholders := makePlaceholders(driver, 30)
|
|
insert := fmt.Sprintf(`
|
|
INSERT INTO %s (
|
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
|
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
|
|
"CreationTime","DeletionTime","SnapshotTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
|
|
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
|
|
) VALUES (%s)
|
|
`, table, placeholders)
|
|
|
|
for _, v := range agg {
|
|
if v.samples == 0 {
|
|
continue
|
|
}
|
|
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
|
|
avgRam := float64(v.sumRam) / float64(v.samples)
|
|
avgDisk := v.sumDisk / float64(v.samples)
|
|
total := float64(totalSamples)
|
|
avgPresent := 0.0
|
|
tinPct := 0.0
|
|
bronzePct := 0.0
|
|
silverPct := 0.0
|
|
goldPct := 0.0
|
|
if total > 0 {
|
|
avgPresent = float64(v.samples) / total
|
|
}
|
|
if v.samples > 0 {
|
|
tinPct = float64(v.tinHits) * 100 / float64(v.samples)
|
|
bronzePct = float64(v.bronzeHits) * 100 / float64(v.samples)
|
|
silverPct = float64(v.silverHits) * 100 / float64(v.samples)
|
|
goldPct = float64(v.goldHits) * 100 / float64(v.samples)
|
|
}
|
|
args := []interface{}{
|
|
v.key.Name,
|
|
v.key.Vcenter,
|
|
nullIfEmpty(v.key.VmId),
|
|
nullIfEmpty(v.key.VmUuid),
|
|
v.resourcePool,
|
|
nullIfEmpty(v.datacenter),
|
|
nullIfEmpty(v.cluster),
|
|
nullIfEmpty(v.folder),
|
|
v.lastDisk,
|
|
v.lastVcpu,
|
|
v.lastRam,
|
|
v.isTemplate,
|
|
v.poweredOn,
|
|
v.srmPlaceholder,
|
|
v.creation,
|
|
v.deletion,
|
|
v.lastSeen,
|
|
v.samples,
|
|
avgVcpu,
|
|
avgRam,
|
|
avgDisk,
|
|
avgPresent,
|
|
tinPct, bronzePct, silverPct, goldPct,
|
|
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
|
|
}
|
|
if driver != "sqlite" {
|
|
// Postgres expects primitive types, nulls are handled by pq via nil.
|
|
for i, a := range args {
|
|
if s, ok := a.(string); ok && s == "" {
|
|
args[i] = nil
|
|
}
|
|
}
|
|
}
|
|
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func int64OrZero(v sql.NullInt64) int64 {
|
|
if v.Valid {
|
|
return v.Int64
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func nullIfEmpty(s string) interface{} {
|
|
if strings.TrimSpace(s) == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|
|
|
|
func makePlaceholders(driver string, n int) string {
|
|
if driver == "sqlite" {
|
|
parts := make([]string, n)
|
|
for i := 0; i < n; i++ {
|
|
parts[i] = "?"
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|
|
parts := make([]string, n)
|
|
for i := 0; i < n; i++ {
|
|
parts[i] = fmt.Sprintf("$%d", i+1)
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|
|
|
|
func btoi(b bool) int64 {
|
|
if b {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation.
|
|
func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
|
dbConn := c.Database.DB()
|
|
for _, v := range agg {
|
|
if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" {
|
|
continue
|
|
}
|
|
row := db.VmDailyRollupRow{
|
|
Vcenter: v.key.Vcenter,
|
|
VmId: v.key.VmId,
|
|
VmUuid: v.key.VmUuid,
|
|
Name: v.key.Name,
|
|
CreationTime: v.creation,
|
|
DeletionTime: v.deletion,
|
|
SamplesPresent: v.samples,
|
|
TotalSamples: int64(totalSamples),
|
|
SumVcpu: float64(v.sumVcpu),
|
|
SumRam: float64(v.sumRam),
|
|
SumDisk: v.sumDisk,
|
|
TinHits: v.tinHits,
|
|
BronzeHits: v.bronzeHits,
|
|
SilverHits: v.silverHits,
|
|
GoldHits: v.goldHits,
|
|
LastResourcePool: v.resourcePool,
|
|
LastDatacenter: v.datacenter,
|
|
LastCluster: v.cluster,
|
|
LastFolder: v.folder,
|
|
LastProvisionedDisk: v.lastDisk,
|
|
LastVcpuCount: v.lastVcpu,
|
|
LastRamGB: v.lastRam,
|
|
IsTemplate: v.isTemplate,
|
|
PoweredOn: v.poweredOn,
|
|
SrmPlaceholder: v.srmPlaceholder,
|
|
}
|
|
if err := db.UpsertVmDailyRollup(ctx, dbConn, dayUnix, row); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|