Files
vctp2/internal/tasks/inventorySnapshots.go
Nathan Coad 5130d37632
Some checks failed
continuous-integration/drone/push Build was killed
ensure we dont collect hourly snapshot too soon after startup
2026-01-14 10:27:24 +11:00

1153 lines
37 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"vctp/db/queries"
"vctp/internal/report"
"vctp/internal/vcenter"
"github.com/jmoiron/sqlx"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
type inventorySnapshotRow struct {
InventoryId sql.NullInt64
Name string
Vcenter string
VmId sql.NullString
EventKey sql.NullString
CloudId sql.NullString
CreationTime sql.NullInt64
DeletionTime sql.NullInt64
ResourcePool sql.NullString
VmType sql.NullString
Datacenter sql.NullString
Cluster sql.NullString
Folder sql.NullString
ProvisionedDisk sql.NullFloat64
VcpuCount sql.NullInt64
RamGB sql.NullInt64
IsTemplate string
PoweredOn string
SrmPlaceholder string
VmUuid sql.NullString
SnapshotTime int64
IsPresent string
}
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) error {
startedAt := time.Now()
defer func() {
logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt))
}()
startTime := time.Now()
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
if c.FirstHourlySnapshotCheck {
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly")
if err != nil {
return err
}
minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600)
if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second {
c.Logger.Info("Skipping hourly snapshot, last snapshot too recent",
"last_snapshot", lastSnapshot,
"min_interval_seconds", minIntervalSeconds,
)
c.FirstHourlySnapshotCheck = false
return nil
}
c.FirstHourlySnapshotCheck = false
}
tableName, err := hourlyInventoryTableName(startTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil {
return err
}
if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime); err != nil {
c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName)
}
var wg sync.WaitGroup
var errCount int64
concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency
var sem chan struct{}
if concurrencyLimit > 0 {
sem = make(chan struct{}, concurrencyLimit)
}
for _, url := range c.Settings.Values.Settings.VcenterAddresses {
url := url
if sem != nil {
sem <- struct{}{}
}
wg.Add(1)
go func() {
defer wg.Done()
if sem != nil {
defer func() { <-sem }()
}
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, url); err != nil {
atomic.AddInt64(&errCount, 1)
c.Logger.Error("hourly snapshot failed", "error", err, "url", url)
}
}()
}
wg.Wait()
if errCount > 0 {
return fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
}
c.Logger.Debug("Finished hourly vcenter snapshot")
return nil
}
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error {
startedAt := time.Now()
defer func() {
logger.Info("Daily summary job finished", "duration", time.Since(startedAt))
}()
targetTime := time.Now().Add(-time.Minute)
return c.aggregateDailySummary(ctx, targetTime, false)
}
func (c *CronTask) AggregateDailySummary(ctx context.Context, date time.Time, force bool) error {
return c.aggregateDailySummary(ctx, date, force)
}
func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Time, force bool) error {
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureDailySummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
return err
}
}
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
}
hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd)
if err != nil {
return err
}
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
}
unionQuery := buildUnionQuery(hourlyTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
currentTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
"disk_total_gb", currentTotals.DiskTotal,
)
}
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion := buildUnionQuery(prevTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
prevTotals, err := snapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
"disk_delta_gb", currentTotals.DiskTotal-prevTotals.DiskTotal,
)
}
}
insertQuery := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"SamplesPresent", "AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "VcpuCount" IS NOT NULL THEN "VcpuCount" END) AS "AvgVcpuCount",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "RamGB" IS NOT NULL THEN "RamGB" END) AS "AvgRamGB",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" END) AS "AvgProvisionedDisk",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolTinPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolBronzePct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct"
FROM (
%s
) snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, summaryTable, unionQuery)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
return nil
}
// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots.
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) error {
startedAt := time.Now()
defer func() {
logger.Info("Monthly summary job finished", "duration", time.Since(startedAt))
}()
now := time.Now()
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
return c.aggregateMonthlySummary(ctx, targetMonth, false)
}
func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error {
return c.aggregateMonthlySummary(ctx, month, force)
}
func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error {
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location())
monthEnd := monthStart.AddDate(0, 1, 0)
dailySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", monthStart, monthEnd)
if err != nil {
return err
}
if len(dailySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01"))
}
monthlyTable, err := monthlySummaryTableName(targetMonth)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
return err
} else if rowsExist && !force {
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
return nil
} else if rowsExist && force {
if err := clearTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
}
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
return err
} else if rowsExist {
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
return nil
}
dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName)
}
unionQuery := buildUnionQuery(dailyTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
if strings.TrimSpace(unionQuery) == "" {
return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01"))
}
monthlyTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01"))
} else {
c.Logger.Info("Monthly snapshot totals",
"month", targetMonth.Format("2006-01"),
"vm_count", monthlyTotals.VmCount,
"vcpu_total", monthlyTotals.VcpuTotal,
"ram_total_gb", monthlyTotals.RamTotal,
"disk_total_gb", monthlyTotals.DiskTotal,
)
}
insertQuery := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
AVG(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" END) AS "AvgVcpuCount",
AVG(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" END) AS "AvgRamGB",
AVG(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" END) AS "AvgProvisionedDisk",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolTinPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolBronzePct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct"
FROM (
%s
) snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, monthlyTable, unionQuery)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
return err
}
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth); err != nil {
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable)
}
c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable)
return nil
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) error {
startedAt := time.Now()
defer func() {
logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt))
}()
now := time.Now()
hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60)
dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12)
hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays)
dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0)
dbConn := c.Database.DB()
hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_")
if err != nil {
return err
}
removedHourly := 0
for _, table := range hourlyTables {
if strings.HasPrefix(table, "inventory_daily_summary_") {
continue
}
tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch")
if !ok {
continue
}
if tableDate.Before(truncateDate(hourlyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table)
} else {
removedHourly++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table)
}
}
}
}
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_")
if err != nil {
return err
}
removedDaily := 0
for _, table := range dailyTables {
tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102")
if !ok {
continue
}
if tableDate.Before(truncateDate(dailyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table)
} else {
removedDaily++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table)
}
}
}
}
c.Logger.Info("Finished snapshot cleanup",
"removed_hourly_tables", removedHourly,
"removed_daily_tables", removedDaily,
"hourly_max_age_days", hourlyMaxDays,
"daily_max_age_months", dailyMaxMonths,
)
return nil
}
func hourlyInventoryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
}
func dailySummaryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
func monthlySummaryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
}
func safeTableName(name string) (string, error) {
for _, r := range name {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
continue
}
return "", fmt.Errorf("invalid table name: %s", name)
}
return name, nil
}
func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL,
"IsPresent" TEXT NOT NULL
);`, tableName)
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err
}
return ensureSnapshotColumns(ctx, dbConn, tableName, []columnDef{
{Name: "VcpuCount", Type: "BIGINT"},
{Name: "RamGB", Type: "BIGINT"},
})
}
func ensureDailySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL
);`, tableName)
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err
}
return ensureSnapshotColumns(ctx, dbConn, tableName, []columnDef{
{Name: "AvgVcpuCount", Type: "REAL"},
{Name: "AvgRamGB", Type: "REAL"},
{Name: "AvgProvisionedDisk", Type: "REAL"},
{Name: "AvgIsPresent", Type: "REAL"},
{Name: "PoolTinPct", Type: "REAL"},
{Name: "PoolBronzePct", Type: "REAL"},
{Name: "PoolSilverPct", Type: "REAL"},
{Name: "PoolGoldPct", Type: "REAL"},
})
}
func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"VcpuCount" BIGINT,
"RamGB" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"AvgIsPresent" REAL,
"PoolTinPct" REAL,
"PoolBronzePct" REAL,
"PoolSilverPct" REAL,
"PoolGoldPct" REAL
);`, tableName)
if _, err := dbConn.ExecContext(ctx, ddl); err != nil {
return err
}
return ensureSnapshotColumns(ctx, dbConn, tableName, []columnDef{
{Name: "AvgVcpuCount", Type: "REAL"},
{Name: "AvgRamGB", Type: "REAL"},
{Name: "AvgProvisionedDisk", Type: "REAL"},
{Name: "AvgIsPresent", Type: "REAL"},
{Name: "PoolTinPct", Type: "REAL"},
{Name: "PoolBronzePct", Type: "REAL"},
{Name: "PoolSilverPct", Type: "REAL"},
{Name: "PoolGoldPct", Type: "REAL"},
})
}
func buildUnionQuery(tables []string, columns []string) string {
queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ")
for _, table := range tables {
if _, err := safeTableName(table); err != nil {
continue
}
queries = append(queries, fmt.Sprintf("SELECT %s FROM %s", columnList, table))
}
return strings.Join(queries, "\nUNION ALL\n")
}
func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) {
if !strings.HasPrefix(table, prefix) {
return time.Time{}, false
}
suffix := strings.TrimPrefix(table, prefix)
if layout == "epoch" {
epoch, err := strconv.ParseInt(suffix, 10, 64)
if err != nil {
return time.Time{}, false
}
return time.Unix(epoch, 0), true
}
parsed, err := time.Parse(layout, suffix)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func truncateDate(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := safeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
return err
}
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := safeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
if err != nil {
return fmt.Errorf("failed to clear table %s: %w", table, err)
}
return nil
}
func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
if _, err := safeTableName(table); err != nil {
return false, err
}
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
var exists int
if err := dbConn.GetContext(ctx, &exists, query); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
type snapshotTotals struct {
VmCount int64
VcpuTotal int64
RamTotal int64
DiskTotal float64
}
type columnDef struct {
Name string
Type string
}
func ensureSnapshotColumns(ctx context.Context, dbConn *sqlx.DB, tableName string, columns []columnDef) error {
if _, err := safeTableName(tableName); err != nil {
return err
}
for _, column := range columns {
if err := addColumnIfMissing(ctx, dbConn, tableName, column); err != nil {
return err
}
}
return nil
}
func addColumnIfMissing(ctx context.Context, dbConn *sqlx.DB, tableName string, column columnDef) error {
query := fmt.Sprintf(`ALTER TABLE %s ADD COLUMN "%s" %s`, tableName, column.Name, column.Type)
if _, err := dbConn.ExecContext(ctx, query); err != nil {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "duplicate column") || strings.Contains(errText, "already exists") {
return nil
}
return err
}
return nil
}
func snapshotTotalsForTable(ctx context.Context, dbConn *sqlx.DB, table string) (snapshotTotals, error) {
if _, err := safeTableName(table); err != nil {
return snapshotTotals{}, err
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM %s
WHERE "IsPresent" = 'TRUE'
`, table)
var totals snapshotTotals
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
return snapshotTotals{}, err
}
return totals, nil
}
func snapshotTotalsForUnion(ctx context.Context, dbConn *sqlx.DB, unionQuery string) (snapshotTotals, error) {
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
COALESCE(SUM(CASE WHEN "ProvisionedDisk" IS NOT NULL THEN "ProvisionedDisk" ELSE 0 END), 0) AS disk_total
FROM (
%s
) snapshots
WHERE "IsPresent" = 'TRUE'
`, unionQuery)
var totals snapshotTotals
if err := dbConn.GetContext(ctx, &totals, query); err != nil {
return snapshotTotals{}, err
}
return totals, nil
}
func tableExists(ctx context.Context, dbConn *sqlx.DB, table string) bool {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "sqlite":
var count int
err := dbConn.GetContext(ctx, &count, `
SELECT COUNT(1)
FROM sqlite_master
WHERE type = 'table' AND name = ?
`, table)
return err == nil && count > 0
case "pgx", "postgres":
var count int
err := dbConn.GetContext(ctx, &count, `
SELECT COUNT(1)
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename = $1
`, table)
return err == nil && count > 0
default:
return false
}
}
func nullInt64ToInt(value sql.NullInt64) int64 {
if value.Valid {
return value.Int64
}
return 0
}
func nullFloat64ToFloat(value sql.NullFloat64) float64 {
if value.Valid {
return value.Float64
}
return 0
}
func intWithDefault(value int, fallback int) int {
if value <= 0 {
return fallback
}
return value
}
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory) (inventorySnapshotRow, error) {
if vmObject == nil {
return inventorySnapshotRow{}, fmt.Errorf("missing VM object")
}
row := inventorySnapshotRow{
Name: vmObject.Name,
Vcenter: vc.Vurl,
VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""},
SnapshotTime: snapshotTime.Unix(),
}
if inv != nil {
row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}
row.EventKey = inv.EventKey
row.CloudId = inv.CloudId
row.DeletionTime = inv.DeletionTime
row.VmType = inv.VmType
}
if vmObject.Config != nil {
row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""}
if !vmObject.Config.CreateDate.IsZero() {
row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true}
}
row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0}
row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0}
totalDiskBytes := int64(0)
for _, device := range vmObject.Config.Hardware.Device {
if disk, ok := device.(*types.VirtualDisk); ok {
totalDiskBytes += disk.CapacityInBytes
}
}
if totalDiskBytes > 0 {
row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true}
}
if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" {
row.SrmPlaceholder = "TRUE"
} else {
row.SrmPlaceholder = "FALSE"
}
if vmObject.Config.Template {
row.IsTemplate = "TRUE"
} else {
row.IsTemplate = "FALSE"
}
}
if vmObject.Runtime.PowerState == "poweredOff" {
row.PoweredOn = "FALSE"
} else {
row.PoweredOn = "TRUE"
}
if inv != nil {
row.ResourcePool = inv.ResourcePool
row.Datacenter = inv.Datacenter
row.Cluster = inv.Cluster
row.Folder = inv.Folder
if !row.CreationTime.Valid {
row.CreationTime = inv.CreationTime
}
if !row.ProvisionedDisk.Valid {
row.ProvisionedDisk = inv.ProvisionedDisk
}
if !row.VcpuCount.Valid {
row.VcpuCount = inv.InitialVcpus
}
if !row.RamGB.Valid && inv.InitialRam.Valid {
row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0}
}
if row.IsTemplate == "" {
row.IsTemplate = boolStringFromInterface(inv.IsTemplate)
}
if row.PoweredOn == "" {
row.PoweredOn = boolStringFromInterface(inv.PoweredOn)
}
if row.SrmPlaceholder == "" {
row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder)
}
if !row.VmUuid.Valid {
row.VmUuid = inv.VmUuid
}
}
if row.ResourcePool.String == "" {
if rpName, err := vc.GetVmResourcePool(*vmObject); err == nil {
row.ResourcePool = sql.NullString{String: rpName, Valid: rpName != ""}
}
}
if row.Folder.String == "" {
if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil {
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
}
}
if row.Cluster.String == "" {
if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil {
row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""}
}
}
if row.Datacenter.String == "" {
if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil {
row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""}
}
}
return row, nil
}
func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow {
return inventorySnapshotRow{
InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0},
Name: inv.Name,
Vcenter: inv.Vcenter,
VmId: inv.VmId,
EventKey: inv.EventKey,
CloudId: inv.CloudId,
CreationTime: inv.CreationTime,
DeletionTime: inv.DeletionTime,
ResourcePool: inv.ResourcePool,
VmType: inv.VmType,
Datacenter: inv.Datacenter,
Cluster: inv.Cluster,
Folder: inv.Folder,
ProvisionedDisk: inv.ProvisionedDisk,
VcpuCount: inv.InitialVcpus,
RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0},
IsTemplate: boolStringFromInterface(inv.IsTemplate),
PoweredOn: boolStringFromInterface(inv.PoweredOn),
SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder),
VmUuid: inv.VmUuid,
SnapshotTime: snapshotTime.Unix(),
}
}
func insertDailyInventoryRow(ctx context.Context, dbConn *sqlx.DB, tableName string, row inventorySnapshotRow) error {
query := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent"
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`, tableName)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
_, err := dbConn.ExecContext(ctx, query,
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.VmType,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
row.IsPresent,
)
return err
}
func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error {
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil {
return fmt.Errorf("unable to connect to vcenter: %w", err)
}
defer vc.Logout()
vcVms, err := vc.GetAllVmReferences()
if err != nil {
return fmt.Errorf("unable to get VMs from vcenter: %w", err)
}
canDetectMissing := len(vcVms) > 0
if !canDetectMissing {
c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url)
}
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
if err != nil {
return fmt.Errorf("unable to query inventory table: %w", err)
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
}
}
dbConn := c.Database.DB()
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
totals := snapshotTotals{}
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name(), "vCLS-") {
continue
}
vmObj, err := vc.ConvertObjToMoVM(vm)
if err != nil {
c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err)
continue
}
if vmObj.Config != nil && vmObj.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(vmObj, vc, startTime, inv)
if err != nil {
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
totals.RamTotal += nullInt64ToInt(row.RamGB)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
for _, row := range presentSnapshots {
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String)
}
}
if !canDetectMissing {
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
return nil
}
for _, inv := range inventoryRows {
if strings.HasPrefix(inv.Name, "vCLS-") {
continue
}
vmID := inv.VmId.String
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
continue
}
}
row := snapshotFromInventory(inv, startTime)
row.IsPresent = "FALSE"
if !row.DeletionTime.Valid {
deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: row.DeletionTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
}
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String)
}
}
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
return nil
}
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil:
return ""
case string:
return v
case []byte:
return string(v)
case bool:
if v {
return "TRUE"
}
return "FALSE"
case int:
if v != 0 {
return "TRUE"
}
return "FALSE"
case int64:
if v != 0 {
return "TRUE"
}
return "FALSE"
default:
return fmt.Sprint(v)
}
}