package tasks import ( "context" "database/sql" "fmt" "log/slog" "strconv" "strings" "sync" "sync/atomic" "time" "vctp/db" "vctp/db/queries" "vctp/internal/metrics" "vctp/internal/report" "vctp/internal/utils" "vctp/internal/vcenter" "github.com/jmoiron/sqlx" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) type inventorySnapshotRow struct { InventoryId sql.NullInt64 Name string Vcenter string VmId sql.NullString EventKey sql.NullString CloudId sql.NullString CreationTime sql.NullInt64 DeletionTime sql.NullInt64 ResourcePool sql.NullString Datacenter sql.NullString Cluster sql.NullString Folder sql.NullString ProvisionedDisk sql.NullFloat64 VcpuCount sql.NullInt64 RamGB sql.NullInt64 IsTemplate string PoweredOn string SrmPlaceholder string VmUuid sql.NullString SnapshotTime int64 IsPresent string } type snapshotTotals = db.SnapshotTotals // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } startedAt := time.Now() defer func() { logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt)) }() tracker := NewCronTracker(c.Database) done, skip, err := tracker.Start(jobCtx, "hourly_snapshot") if err != nil { return err } if skip { logger.Warn("Hourly snapshot skipped because a previous run is still active") return nil } defer func() { done(err) }() ctx, cancel := context.WithCancel(jobCtx) defer cancel() startTime := time.Now() if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil { return err } // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() if c.FirstHourlySnapshotCheck { if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly") if err != nil { return err } minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600) if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second { c.Logger.Info("Skipping hourly snapshot, last snapshot too recent", "last_snapshot", lastSnapshot, "min_interval_seconds", minIntervalSeconds, ) c.FirstHourlySnapshotCheck = false return nil } c.FirstHourlySnapshotCheck = false } tableName, err := hourlyInventoryTableName(startTime) if err != nil { return err } dbConn := c.Database.DB() db.ApplySQLiteTuning(ctx, dbConn) if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil { return err } var wg sync.WaitGroup var errCount int64 concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 { concurrencyLimit = override } var sem chan struct{} if concurrencyLimit > 0 { sem = make(chan struct{}, concurrencyLimit) } c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit) for _, url := range c.Settings.Values.Settings.VcenterAddresses { wg.Add(1) go func(url string) { defer wg.Done() waitStarted := time.Now() vcStart := time.Now() if sem != nil { sem <- struct{}{} defer func() { <-sem }() } waitDuration := time.Since(waitStarted) timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute) runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() c.Logger.Info("Starting hourly snapshot for vcenter", "url", url) if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil { atomic.AddInt64(&errCount, 1) c.Logger.Error("hourly snapshot failed", "error", err, "url", url) } else { c.Logger.Info("Finished hourly snapshot for vcenter", "url", url, "queue_wait", waitDuration, "duration", time.Since(vcStart), "timeout", timeout, ) } }(url) } wg.Wait() if errCount > 0 { err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount) return err } rowCount, err := db.TableRowCount(ctx, dbConn, tableName) if err != nil { c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName) } if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil { c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName) } metrics.RecordHourlySnapshot(startTime, rowCount, err) if err := c.generateReport(ctx, tableName); err != nil { c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName) } c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount) return nil } // RunSnapshotCleanup drops hourly and daily snapshot tables older than retention. func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) { jobCtx := ctx jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute) if jobTimeout > 0 { var cancel context.CancelFunc jobCtx, cancel = context.WithTimeout(ctx, jobTimeout) defer cancel() } tracker := NewCronTracker(c.Database) done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup") if err != nil { return err } if skip { logger.Warn("Snapshot cleanup skipped because a previous run is still active") return nil } defer func() { done(err) }() if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil { return err } startedAt := time.Now() defer func() { logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt)) }() now := time.Now() hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60) dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12) hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays) dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0) dbConn := c.Database.DB() hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_") if err != nil { return err } removedHourly := 0 for _, table := range hourlyTables { if strings.HasPrefix(table, "inventory_daily_summary_") { continue } tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch") if !ok { continue } if tableDate.Before(truncateDate(hourlyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table) } else { removedHourly++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table) } } } } dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_") if err != nil { return err } removedDaily := 0 for _, table := range dailyTables { tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102") if !ok { continue } if tableDate.Before(truncateDate(dailyCutoff)) { if err := dropSnapshotTable(ctx, dbConn, table); err != nil { c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table) } else { removedDaily++ if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil { c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table) } } } } c.Logger.Info("Finished snapshot cleanup", "removed_hourly_tables", removedHourly, "removed_daily_tables", removedDaily, "hourly_max_age_days", hourlyMaxDays, "daily_max_age_months", dailyMaxMonths, ) return nil } func hourlyInventoryTableName(t time.Time) (string, error) { return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error { if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil { return err } if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil { return err } return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{ {Name: "VcpuCount", Type: "BIGINT"}, {Name: "RamGB", Type: "BIGINT"}, }) } func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) { if len(tables) == 0 { return "", fmt.Errorf("no tables provided for union") } if len(columns) == 0 { return "", fmt.Errorf("no columns provided for union") } queries := make([]string, 0, len(tables)) columnList := strings.Join(columns, ", ") for _, table := range tables { safeName, err := db.SafeTableName(table) if err != nil { return "", err } query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName) if whereClause != "" { query = fmt.Sprintf("%s WHERE %s", query, whereClause) } queries = append(queries, query) } if len(queries) == 0 { return "", fmt.Errorf("no valid tables provided for union") } return strings.Join(queries, "\nUNION ALL\n"), nil } func templateExclusionFilter() string { return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')` } func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) { if !strings.HasPrefix(table, prefix) { return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) if layout == "epoch" { epoch, err := strconv.ParseInt(suffix, 10, 64) if err != nil { return time.Time{}, false } return time.Unix(epoch, 0), true } parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false } return parsed, true } func truncateDate(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table)) return err } func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table)) if err != nil { return fmt.Errorf("failed to clear table %s: %w", table, err) } return nil } func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord { filtered := snapshots[:0] for _, snapshot := range snapshots { if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist { filtered = append(filtered, snapshot) } } return filtered } func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord { filtered := records[:0] for _, r := range records { if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) { filtered = append(filtered, r) } } return filtered } type columnDef struct { Name string Type string } var summaryUnionColumns = []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, `"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, `"IsPresent"`, } func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error { driver := strings.ToLower(dbConn.DriverName()) switch driver { case "pgx", "postgres": hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId") if err != nil { return err } if !hasColumn { if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil { return err } } if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil { return err } case "sqlite": return nil } return nil } func nullInt64ToInt(value sql.NullInt64) int64 { if value.Valid { return value.Int64 } return 0 } func nullFloat64ToFloat(value sql.NullFloat64) float64 { if value.Valid { return value.Float64 } return 0 } func intWithDefault(value int, fallback int) int { if value <= 0 { return fallback } return value } func durationFromSeconds(seconds int, fallback time.Duration) time.Duration { if seconds > 0 { return time.Duration(seconds) * time.Second } return fallback } func normalizeResourcePool(value string) string { trimmed := strings.TrimSpace(value) if trimmed == "" { return "" } lower := strings.ToLower(trimmed) canonical := map[string]string{ "tin": "Tin", "bronze": "Bronze", "silver": "Silver", "gold": "Gold", } if val, ok := canonical[lower]; ok { return val } return trimmed } func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { return dir } } return "/var/lib/vctp/reports" } func (c *CronTask) generateReport(ctx context.Context, tableName string) error { dest := c.reportsDir() _, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest) return err } func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (inventorySnapshotRow, error) { if vmObject == nil { return inventorySnapshotRow{}, fmt.Errorf("missing VM object") } row := inventorySnapshotRow{ Name: vmObject.Name, Vcenter: vc.Vurl, VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""}, SnapshotTime: snapshotTime.Unix(), } if inv != nil { row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0} row.EventKey = inv.EventKey row.CloudId = inv.CloudId row.DeletionTime = inv.DeletionTime } if vmObject.Config != nil { row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""} if !vmObject.Config.CreateDate.IsZero() { row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true} } row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0} row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0} totalDiskBytes := int64(0) for _, device := range vmObject.Config.Hardware.Device { if disk, ok := device.(*types.VirtualDisk); ok { totalDiskBytes += disk.CapacityInBytes } } if totalDiskBytes > 0 { row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true} } if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" { row.SrmPlaceholder = "TRUE" } else { row.SrmPlaceholder = "FALSE" } if vmObject.Config.Template { row.IsTemplate = "TRUE" } else { row.IsTemplate = "FALSE" } } if vmObject.Runtime.PowerState == "poweredOff" { row.PoweredOn = "FALSE" } else { row.PoweredOn = "TRUE" } if inv != nil { if inv.ResourcePool.Valid { row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true} } row.Datacenter = inv.Datacenter row.Cluster = inv.Cluster row.Folder = inv.Folder if !row.CreationTime.Valid { row.CreationTime = inv.CreationTime } if !row.ProvisionedDisk.Valid { row.ProvisionedDisk = inv.ProvisionedDisk } if !row.VcpuCount.Valid { row.VcpuCount = inv.InitialVcpus } if !row.RamGB.Valid && inv.InitialRam.Valid { row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0} } if row.IsTemplate == "" { row.IsTemplate = boolStringFromInterface(inv.IsTemplate) } if row.PoweredOn == "" { row.PoweredOn = boolStringFromInterface(inv.PoweredOn) } if row.SrmPlaceholder == "" { row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder) } if !row.VmUuid.Valid { row.VmUuid = inv.VmUuid } } if row.ResourcePool.String == "" && vmObject.ResourcePool != nil { if rpLookup != nil { if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok { row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""} } } if !row.ResourcePool.Valid { if rpName, err := vc.GetVmResourcePool(*vmObject); err == nil { row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""} } } } if row.Folder.String == "" { if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } else if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil { row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""} } } if vmObject.Runtime.Host != nil && hostLookup != nil { if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok { if row.Cluster.String == "" && lookup.Cluster != "" { row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true} } if row.Datacenter.String == "" && lookup.Datacenter != "" { row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true} } } } if row.Cluster.String == "" { if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil { row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""} } } if row.Datacenter.String == "" { if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil { row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""} } } return row, nil } func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow { return inventorySnapshotRow{ InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}, Name: inv.Name, Vcenter: inv.Vcenter, VmId: inv.VmId, EventKey: inv.EventKey, CloudId: inv.CloudId, CreationTime: inv.CreationTime, DeletionTime: inv.DeletionTime, ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid}, Datacenter: inv.Datacenter, Cluster: inv.Cluster, Folder: inv.Folder, ProvisionedDisk: inv.ProvisionedDisk, VcpuCount: inv.InitialVcpus, RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0}, IsTemplate: boolStringFromInterface(inv.IsTemplate), PoweredOn: boolStringFromInterface(inv.PoweredOn), SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder), VmUuid: inv.VmUuid, SnapshotTime: snapshotTime.Unix(), } } func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error { if len(rows) == 0 { return nil } tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } stmt, err := tx.PreparexContext(ctx, sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), fmt.Sprintf(` INSERT INTO %s ( "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, tableName))) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, row := range rows { if _, err := stmt.ExecContext(ctx, row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, row.IsPresent, ); err != nil { tx.Rollback() return err } } return tx.Commit() } func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { started := time.Now() c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) return fmt.Errorf("unable to connect to vcenter: %w", err) } defer func() { if err := vc.Logout(); err != nil { c.Logger.Warn("vcenter logout failed", "url", url, "error", err) } }() vcVms, err := vc.GetAllVMsWithProps() if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) return fmt.Errorf("unable to get VMs from vcenter: %w", err) } canDetectMissing := len(vcVms) > 0 if !canDetectMissing { c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url) } hostLookup, err := vc.BuildHostLookup() if err != nil { c.Logger.Warn("failed to build host lookup", "url", url, "error", err) hostLookup = nil } else { c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) } folderLookup, err := vc.BuildFolderPathLookup() if err != nil { c.Logger.Warn("failed to build folder lookup", "url", url, "error", err) folderLookup = nil } else { c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) } rpLookup, err := vc.BuildResourcePoolLookup() if err != nil { c.Logger.Warn("failed to build resource pool lookup", "url", url, "error", err) rpLookup = nil } else { c.Logger.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) } inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) if err != nil { return fmt.Errorf("unable to query inventory table: %w", err) } inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) for _, inv := range inventoryRows { if inv.VmId.Valid { inventoryByVmID[inv.VmId.String] = inv } } dbConn := c.Database.DB() presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) totals := snapshotTotals{} for _, vm := range vcVms { if strings.HasPrefix(vm.Name, "vCLS-") { continue } if vm.Config != nil && vm.Config.Template { continue } var inv *queries.Inventory if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { existingCopy := existing inv = &existingCopy } row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup) if err != nil { c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) continue } row.IsPresent = "TRUE" presentSnapshots[vm.Reference().Value] = row totals.VmCount++ totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) totals.RamTotal += nullInt64ToInt(row.RamGB) totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) } batch := make([]inventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows)) for _, row := range presentSnapshots { batch = append(batch, row) } if !canDetectMissing { c.Logger.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, ) return nil } for _, inv := range inventoryRows { if strings.HasPrefix(inv.Name, "vCLS-") { continue } vmID := inv.VmId.String if vmID != "" { if _, ok := presentSnapshots[vmID]; ok { continue } } row := snapshotFromInventory(inv, startTime) row.IsPresent = "FALSE" if !row.DeletionTime.Valid { deletionTime := startTime.Unix() row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ DeletionTime: row.DeletionTime, VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } } batch = append(batch, row) } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) return err } c.Logger.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, "ram_total_gb", totals.RamTotal, "disk_total_gb", totals.DiskTotal, ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) return nil } func boolStringFromInterface(value interface{}) string { switch v := value.(type) { case nil: return "" case string: return v case []byte: return string(v) case bool: if v { return "TRUE" } return "FALSE" case int: if v != 0 { return "TRUE" } return "FALSE" case int64: if v != 0 { return "TRUE" } return "FALSE" default: return fmt.Sprint(v) } }