Files
vctp2/internal/tasks/inventorySnapshots.go
Nathan Coad a81613a8c2
Some checks failed
continuous-integration/drone/push Build was killed
fix drone and sqlc generation
2026-01-13 19:49:13 +11:00

675 lines
20 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
"vctp/db/queries"
"vctp/internal/report"
"vctp/internal/vcenter"
"github.com/jmoiron/sqlx"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
type inventorySnapshotRow struct {
InventoryId sql.NullInt64
Name string
Vcenter string
VmId sql.NullString
EventKey sql.NullString
CloudId sql.NullString
CreationTime sql.NullInt64
DeletionTime sql.NullInt64
ResourcePool sql.NullString
VmType sql.NullString
Datacenter sql.NullString
Cluster sql.NullString
Folder sql.NullString
ProvisionedDisk sql.NullFloat64
InitialVcpus sql.NullInt64
InitialRam sql.NullInt64
IsTemplate string
PoweredOn string
SrmPlaceholder string
VmUuid sql.NullString
SnapshotTime int64
IsPresent string
}
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) error {
startTime := time.Now()
tableName, err := dailyInventoryTableName(startTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil {
return err
}
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
for _, url := range c.Settings.Values.Settings.VcenterAddresses {
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
vc.Login(url)
vcVms, err := vc.GetAllVmReferences()
if err != nil {
c.Logger.Error("unable to get VMs from vcenter", "error", err, "url", url)
vc.Logout()
continue
}
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
if err != nil {
c.Logger.Error("unable to query inventory table", "error", err, "url", url)
vc.Logout()
continue
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
}
}
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name(), "vCLS-") {
continue
}
vmObj, err := vc.ConvertObjToMoVM(vm)
if err != nil {
c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err)
continue
}
if vmObj.Config != nil && vmObj.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(vmObj, vc, startTime, inv)
if err != nil {
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row
}
for _, row := range presentSnapshots {
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String)
}
}
for _, inv := range inventoryRows {
vmID := inv.VmId.String
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
continue
}
}
row := snapshotFromInventory(inv, startTime)
row.IsPresent = "FALSE"
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String)
}
}
vc.Logout()
}
c.Logger.Debug("Finished hourly vcenter snapshot")
return nil
}
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error {
targetTime := time.Now().Add(-time.Minute)
sourceTable, err := dailyInventoryTableName(targetTime)
if err != nil {
return err
}
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureDailySummaryTable(ctx, dbConn, summaryTable); err != nil {
return err
}
insertQuery := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent"
FROM %s
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, summaryTable, sourceTable)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "source_table", sourceTable)
return err
}
c.Logger.Debug("Finished daily inventory aggregation", "source_table", sourceTable, "summary_table", summaryTable)
return nil
}
// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots.
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) error {
now := time.Now()
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
monthPrefix := fmt.Sprintf("inventory_daily_%s", targetMonth.Format("200601"))
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, monthPrefix)
if err != nil {
return err
}
if len(dailyTables) == 0 {
return fmt.Errorf("no daily snapshot tables found for %s", targetMonth.Format("2006-01"))
}
monthlyTable, err := monthlySummaryTableName(targetMonth)
if err != nil {
return err
}
dbConn := c.Database.DB()
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
unionQuery := buildUnionQuery(dailyTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"InitialVcpus"`, `"InitialRam"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
if strings.TrimSpace(unionQuery) == "" {
return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01"))
}
insertQuery := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
"AvgVcpus", "AvgRam", "AvgIsPresent"
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
AVG(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" END) AS "AvgVcpus",
AVG(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" END) AS "AvgRam",
AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent"
FROM (
%s
) snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, monthlyTable, unionQuery)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
return err
}
c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable)
return nil
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) error {
now := time.Now()
hourlyMaxDays := getEnvInt("HOURLY_SNAPSHOT_MAX_AGE_DAYS", 60)
dailyMaxMonths := getEnvInt("DAILY_SNAPSHOT_MAX_AGE_MONTHS", 12)
hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays)
dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0)
dbConn := c.Database.DB()
hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_")
if err != nil {
return err
}
for _, table := range hourlyTables {
if strings.HasPrefix(table, "inventory_daily_summary_") {
continue
}
tableDate, ok := parseSnapshotDate(table, "inventory_daily_", "20060102")
if !ok {
continue
}
if tableDate.Before(truncateDate(hourlyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table)
}
}
}
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_")
if err != nil {
return err
}
for _, table := range dailyTables {
tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102")
if !ok {
continue
}
if tableDate.Before(truncateDate(dailyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table)
}
}
}
c.Logger.Debug("Finished snapshot cleanup")
return nil
}
func dailyInventoryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_daily_%s", t.Format("20060102")))
}
func dailySummaryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
}
func monthlySummaryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
}
func safeTableName(name string) (string, error) {
for _, r := range name {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' {
continue
}
return "", fmt.Errorf("invalid table name: %s", name)
}
return name, nil
}
func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"InitialVcpus" BIGINT,
"InitialRam" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SnapshotTime" BIGINT NOT NULL,
"IsPresent" TEXT NOT NULL
);`, tableName)
_, err := dbConn.ExecContext(ctx, ddl)
return err
}
func ensureDailySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"InitialVcpus" BIGINT,
"InitialRam" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"SamplesPresent" BIGINT NOT NULL
);`, tableName)
_, err := dbConn.ExecContext(ctx, ddl)
return err
}
func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
"InventoryId" BIGINT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"CloudId" TEXT,
"CreationTime" BIGINT,
"DeletionTime" BIGINT,
"ResourcePool" TEXT,
"VmType" TEXT,
"Datacenter" TEXT,
"Cluster" TEXT,
"Folder" TEXT,
"ProvisionedDisk" REAL,
"InitialVcpus" BIGINT,
"InitialRam" BIGINT,
"IsTemplate" TEXT,
"PoweredOn" TEXT,
"SrmPlaceholder" TEXT,
"VmUuid" TEXT,
"AvgVcpus" REAL,
"AvgRam" REAL,
"AvgIsPresent" REAL
);`, tableName)
_, err := dbConn.ExecContext(ctx, ddl)
return err
}
func buildUnionQuery(tables []string, columns []string) string {
queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ")
for _, table := range tables {
if _, err := safeTableName(table); err != nil {
continue
}
queries = append(queries, fmt.Sprintf("SELECT %s FROM %s", columnList, table))
}
return strings.Join(queries, "\nUNION ALL\n")
}
func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) {
if !strings.HasPrefix(table, prefix) {
return time.Time{}, false
}
suffix := strings.TrimPrefix(table, prefix)
parsed, err := time.Parse(layout, suffix)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func truncateDate(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := safeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
return err
}
func getEnvInt(key string, fallback int) int {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return fallback
}
value, err := strconv.Atoi(raw)
if err != nil || value < 0 {
return fallback
}
return value
}
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory) (inventorySnapshotRow, error) {
if vmObject == nil {
return inventorySnapshotRow{}, fmt.Errorf("missing VM object")
}
row := inventorySnapshotRow{
Name: vmObject.Name,
Vcenter: vc.Vurl,
VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""},
SnapshotTime: snapshotTime.Unix(),
}
if inv != nil {
row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}
row.EventKey = inv.EventKey
row.CloudId = inv.CloudId
row.DeletionTime = inv.DeletionTime
row.VmType = inv.VmType
}
if vmObject.Config != nil {
row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""}
if !vmObject.Config.CreateDate.IsZero() {
row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true}
}
row.InitialVcpus = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0}
row.InitialRam = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB), Valid: vmObject.Config.Hardware.MemoryMB > 0}
totalDiskBytes := int64(0)
for _, device := range vmObject.Config.Hardware.Device {
if disk, ok := device.(*types.VirtualDisk); ok {
totalDiskBytes += disk.CapacityInBytes
}
}
if totalDiskBytes > 0 {
row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true}
}
if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" {
row.SrmPlaceholder = "TRUE"
} else {
row.SrmPlaceholder = "FALSE"
}
if vmObject.Config.Template {
row.IsTemplate = "TRUE"
} else {
row.IsTemplate = "FALSE"
}
}
if vmObject.Runtime.PowerState == "poweredOff" {
row.PoweredOn = "FALSE"
} else {
row.PoweredOn = "TRUE"
}
if inv != nil {
row.ResourcePool = inv.ResourcePool
row.Datacenter = inv.Datacenter
row.Cluster = inv.Cluster
row.Folder = inv.Folder
if !row.CreationTime.Valid {
row.CreationTime = inv.CreationTime
}
if !row.ProvisionedDisk.Valid {
row.ProvisionedDisk = inv.ProvisionedDisk
}
if !row.InitialVcpus.Valid {
row.InitialVcpus = inv.InitialVcpus
}
if !row.InitialRam.Valid {
row.InitialRam = inv.InitialRam
}
if row.IsTemplate == "" {
row.IsTemplate = boolStringFromInterface(inv.IsTemplate)
}
if row.PoweredOn == "" {
row.PoweredOn = boolStringFromInterface(inv.PoweredOn)
}
if row.SrmPlaceholder == "" {
row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder)
}
if !row.VmUuid.Valid {
row.VmUuid = inv.VmUuid
}
}
if row.ResourcePool.String == "" {
if rpName, err := vc.GetVmResourcePool(*vmObject); err == nil {
row.ResourcePool = sql.NullString{String: rpName, Valid: rpName != ""}
}
}
if row.Folder.String == "" {
if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil {
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
}
}
if row.Cluster.String == "" {
if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil {
row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""}
}
}
if row.Datacenter.String == "" {
if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil {
row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""}
}
}
return row, nil
}
func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow {
return inventorySnapshotRow{
InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0},
Name: inv.Name,
Vcenter: inv.Vcenter,
VmId: inv.VmId,
EventKey: inv.EventKey,
CloudId: inv.CloudId,
CreationTime: inv.CreationTime,
DeletionTime: inv.DeletionTime,
ResourcePool: inv.ResourcePool,
VmType: inv.VmType,
Datacenter: inv.Datacenter,
Cluster: inv.Cluster,
Folder: inv.Folder,
ProvisionedDisk: inv.ProvisionedDisk,
InitialVcpus: inv.InitialVcpus,
InitialRam: inv.InitialRam,
IsTemplate: boolStringFromInterface(inv.IsTemplate),
PoweredOn: boolStringFromInterface(inv.PoweredOn),
SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder),
VmUuid: inv.VmUuid,
SnapshotTime: snapshotTime.Unix(),
}
}
func insertDailyInventoryRow(ctx context.Context, dbConn *sqlx.DB, tableName string, row inventorySnapshotRow) error {
query := fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus",
"InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent"
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`, tableName)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
_, err := dbConn.ExecContext(ctx, query,
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.VmType,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.InitialVcpus,
row.InitialRam,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
row.IsPresent,
)
return err
}
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil:
return ""
case string:
return v
case []byte:
return string(v)
case bool:
if v {
return "TRUE"
}
return "FALSE"
case int:
if v != 0 {
return "TRUE"
}
return "FALSE"
case int64:
if v != 0 {
return "TRUE"
}
return "FALSE"
default:
return fmt.Sprint(v)
}
}