package tasks import ( "context" "database/sql" "fmt" "log/slog" "os" "strconv" "strings" "time" "vctp/db/queries" "vctp/internal/report" "vctp/internal/vcenter" "github.com/jmoiron/sqlx" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) type inventorySnapshotRow struct { InventoryId sql.NullInt64 Name string Vcenter string VmId sql.NullString EventKey sql.NullString CloudId sql.NullString CreationTime sql.NullInt64 DeletionTime sql.NullInt64 ResourcePool sql.NullString VmType sql.NullString Datacenter sql.NullString Cluster sql.NullString Folder sql.NullString ProvisionedDisk sql.NullFloat64 InitialVcpus sql.NullInt64 InitialRam sql.NullInt64 IsTemplate string PoweredOn string SrmPlaceholder string VmUuid sql.NullString SnapshotTime int64 IsPresent string } // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) error { startTime := time.Now() tableName, err := dailyInventoryTableName(startTime) if err != nil { return err } dbConn := c.Database.DB() if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil { return err } // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() for _, url := range c.Settings.Values.Settings.VcenterAddresses { c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) vc := vcenter.New(c.Logger, c.VcCreds) vc.Login(url) vcVms, err := vc.GetAllVmReferences() if err != nil { c.Logger.Error("unable to get VMs from vcenter", "error", err, "url", url) vc.Logout() continue } inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) if err != nil { c.Logger.Error("unable to query inventory table", "error", err, "url", url) vc.Logout() continue } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } } presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) for _, vm := range vcVms { if strings.HasPrefix(vm.Name(), "vCLS-") { continue } vmObj, err := vc.ConvertObjToMoVM(vm) if err != nil { c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err) continue } if vmObj.Config != nil && vmObj.Config.Template { continue } var inv *queries.Inventory if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { existingCopy := existing inv = &existingCopy } row, err := snapshotFromVM(vmObj, vc, startTime, inv) if err != nil { c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) continue } row.IsPresent = "TRUE" presentSnapshots[vm.Reference().Value] = row } for _, row := range presentSnapshots { if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String) } } for _, inv := range inventoryRows { vmID := inv.VmId.String if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { continue } } row := snapshotFromInventory(inv, startTime) row.IsPresent = "FALSE" if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String) } } vc.Logout() } c.Logger.Debug("Finished hourly vcenter snapshot") return nil } // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error { targetTime := time.Now().Add(-time.Minute) sourceTable, err := dailyInventoryTableName(targetTime) if err != nil { return err } summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err } dbConn := c.Database.DB() if err := ensureDailySummaryTable(ctx, dbConn, summaryTable); err != nil { return err } insertQuery := fmt.Sprintf(` INSERT INTO %s ( "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent" ) SELECT "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent" FROM %s GROUP BY "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid"; `, summaryTable, sourceTable) if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate daily inventory", "error", err, "source_table", sourceTable) return err } c.Logger.Debug("Finished daily inventory aggregation", "source_table", sourceTable, "summary_table", summaryTable) return nil } // RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots. func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) error { now := time.Now() firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) targetMonth := firstOfThisMonth.AddDate(0, -1, 0) monthPrefix := fmt.Sprintf("inventory_daily_%s", targetMonth.Format("200601")) dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, monthPrefix) if err != nil { return err } if len(dailyTables) == 0 { return fmt.Errorf("no daily snapshot tables found for %s", targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) if err != nil { return err } dbConn := c.Database.DB() if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil { return err } unionQuery := buildUnionQuery(dailyTables, []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"InitialVcpus"`, `"InitialRam"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`, }) if strings.TrimSpace(unionQuery) == "" { return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01")) } insertQuery := fmt.Sprintf(` INSERT INTO %s ( "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "AvgVcpus", "AvgRam", "AvgIsPresent" ) SELECT "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", AVG(CASE WHEN "InitialVcpus" IS NOT NULL THEN "InitialVcpus" END) AS "AvgVcpus", AVG(CASE WHEN "InitialRam" IS NOT NULL THEN "InitialRam" END) AS "AvgRam", AVG(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "AvgIsPresent" FROM ( %s ) snapshots GROUP BY "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid"; `, monthlyTable, unionQuery) if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01")) return err } c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable) return nil } // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) error { now := time.Now() hourlyMaxDays := getEnvInt("HOURLY_SNAPSHOT_MAX_AGE_DAYS", 60) dailyMaxMonths := getEnvInt("DAILY_SNAPSHOT_MAX_AGE_MONTHS", 12) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) dbConn := c.Database.DB() hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_") if err != nil { return err } for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue } tableDate, ok := parseSnapshotDate(table, "inventory_daily_", "20060102") if !ok { continue } if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) } } } dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_") if err != nil { return err } for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) } } } c.Logger.Debug("Finished snapshot cleanup") return nil } func dailyInventoryTableName(t time.Time) (string, error) { return safeTableName(fmt.Sprintf("inventory_daily_%s", t.Format("20060102"))) } func dailySummaryTableName(t time.Time) (string, error) { return safeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102"))) } func monthlySummaryTableName(t time.Time) (string, error) { return safeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601"))) } func safeTableName(name string) (string, error) { for _, r := range name { if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' { continue } return "", fmt.Errorf("invalid table name: %s", name) } return name, nil } func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( "InventoryId" BIGINT, "Name" TEXT NOT NULL, "Vcenter" TEXT NOT NULL, "VmId" TEXT, "EventKey" TEXT, "CloudId" TEXT, "CreationTime" BIGINT, "DeletionTime" BIGINT, "ResourcePool" TEXT, "VmType" TEXT, "Datacenter" TEXT, "Cluster" TEXT, "Folder" TEXT, "ProvisionedDisk" REAL, "InitialVcpus" BIGINT, "InitialRam" BIGINT, "IsTemplate" TEXT, "PoweredOn" TEXT, "SrmPlaceholder" TEXT, "VmUuid" TEXT, "SnapshotTime" BIGINT NOT NULL, "IsPresent" TEXT NOT NULL );`, tableName) _, err := dbConn.ExecContext(ctx, ddl) return err } func ensureDailySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( "InventoryId" BIGINT, "Name" TEXT NOT NULL, "Vcenter" TEXT NOT NULL, "VmId" TEXT, "EventKey" TEXT, "CloudId" TEXT, "CreationTime" BIGINT, "DeletionTime" BIGINT, "ResourcePool" TEXT, "VmType" TEXT, "Datacenter" TEXT, "Cluster" TEXT, "Folder" TEXT, "ProvisionedDisk" REAL, "InitialVcpus" BIGINT, "InitialRam" BIGINT, "IsTemplate" TEXT, "PoweredOn" TEXT, "SrmPlaceholder" TEXT, "VmUuid" TEXT, "SamplesPresent" BIGINT NOT NULL );`, tableName) _, err := dbConn.ExecContext(ctx, ddl) return err } func ensureMonthlySummaryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { ddl := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( "InventoryId" BIGINT, "Name" TEXT NOT NULL, "Vcenter" TEXT NOT NULL, "VmId" TEXT, "EventKey" TEXT, "CloudId" TEXT, "CreationTime" BIGINT, "DeletionTime" BIGINT, "ResourcePool" TEXT, "VmType" TEXT, "Datacenter" TEXT, "Cluster" TEXT, "Folder" TEXT, "ProvisionedDisk" REAL, "InitialVcpus" BIGINT, "InitialRam" BIGINT, "IsTemplate" TEXT, "PoweredOn" TEXT, "SrmPlaceholder" TEXT, "VmUuid" TEXT, "AvgVcpus" REAL, "AvgRam" REAL, "AvgIsPresent" REAL );`, tableName) _, err := dbConn.ExecContext(ctx, ddl) return err } func buildUnionQuery(tables []string, columns []string) string { queries := make([]string, 0, len(tables)) columnList := strings.Join(columns, ", ") for _, table := range tables { if _, err := safeTableName(table); err != nil { continue } queries = append(queries, fmt.Sprintf("SELECT %s FROM %s", columnList, table)) } return strings.Join(queries, "\nUNION ALL\n") } func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) { if !strings.HasPrefix(table, prefix) { return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false } return parsed, true } func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := safeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table)) return err } func getEnvInt(key string, fallback int) int { raw := strings.TrimSpace(os.Getenv(key)) if raw == "" { return fallback } value, err := strconv.Atoi(raw) if err != nil || value < 0 { return fallback } return value } func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory) (inventorySnapshotRow, error) { if vmObject == nil { return inventorySnapshotRow{}, fmt.Errorf("missing VM object") } row := inventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, SnapshotTime: snapshotTime.Unix(), } if inv != nil { row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0} row.EventKey = inv.EventKey row.CloudId = inv.CloudId row.DeletionTime = inv.DeletionTime row.VmType = inv.VmType } if vmObject.Config != nil { row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""} if !vmObject.Config.CreateDate.IsZero() { row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true} } row.InitialVcpus = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0} row.InitialRam = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB), Valid: vmObject.Config.Hardware.MemoryMB > 0} totalDiskBytes := int64(0) for _, device := range vmObject.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { totalDiskBytes += disk.CapacityInBytes } } if totalDiskBytes > 0 { row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true} } if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" { row.SrmPlaceholder = "TRUE" } else { row.SrmPlaceholder = "FALSE" } if vmObject.Config.Template { row.IsTemplate = "TRUE" } else { row.IsTemplate = "FALSE" } } if vmObject.Runtime.PowerState == "poweredOff" { row.PoweredOn = "FALSE" } else { row.PoweredOn = "TRUE" } if inv != nil { row.ResourcePool = inv.ResourcePool row.Datacenter = inv.Datacenter row.Cluster = inv.Cluster row.Folder = inv.Folder if !row.CreationTime.Valid { row.CreationTime = inv.CreationTime } if !row.ProvisionedDisk.Valid { row.ProvisionedDisk = inv.ProvisionedDisk } if !row.InitialVcpus.Valid { row.InitialVcpus = inv.InitialVcpus } if !row.InitialRam.Valid { row.InitialRam = inv.InitialRam } if row.IsTemplate == "" { row.IsTemplate = boolStringFromInterface(inv.IsTemplate) } if row.PoweredOn == "" { row.PoweredOn = boolStringFromInterface(inv.PoweredOn) } if row.SrmPlaceholder == "" { row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder) } if !row.VmUuid.Valid { row.VmUuid = inv.VmUuid } } if row.ResourcePool.String == "" { if rpName, err := vc.GetVmResourcePool(*vmObject); err == nil { row.ResourcePool = sql.NullString{String: rpName, Valid: rpName != ""} } } if row.Folder.String == "" { if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } } if row.Cluster.String == "" { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} } } if row.Datacenter.String == "" { if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil { row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""} } } return row, nil } func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow { return inventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, VmId: inv.VmId, EventKey: inv.EventKey, CloudId: inv.CloudId, CreationTime: inv.CreationTime, DeletionTime: inv.DeletionTime, ResourcePool: inv.ResourcePool, VmType: inv.VmType, Datacenter: inv.Datacenter, Cluster: inv.Cluster, Folder: inv.Folder, ProvisionedDisk: inv.ProvisionedDisk, InitialVcpus: inv.InitialVcpus, InitialRam: inv.InitialRam, IsTemplate: boolStringFromInterface(inv.IsTemplate), PoweredOn: boolStringFromInterface(inv.PoweredOn), SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder), VmUuid: inv.VmUuid, SnapshotTime: snapshotTime.Unix(), } } func insertDailyInventoryRow(ctx context.Context, dbConn *sqlx.DB, tableName string, row inventorySnapshotRow) error { query := fmt.Sprintf(` INSERT INTO %s ( "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); `, tableName) query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) _, err := dbConn.ExecContext(ctx, query, row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.VmType, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.InitialVcpus, row.InitialRam, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, row.IsPresent, ) return err } func boolStringFromInterface(value interface{}) string { switch v := value.(type) { case nil: return "" case string: return v case []byte: return string(v) case bool: if v { return "TRUE" } return "FALSE" case int: if v != 0 { return "TRUE" } return "FALSE" case int64: if v != 0 { return "TRUE" } return "FALSE" default: return fmt.Sprint(v) } }