Files
vctp2/internal/tasks/inventorySnapshots.go
Nathan Coad 0a2c529111
All checks were successful
continuous-integration/drone/push Build is passing
code refactor
2026-01-21 14:40:37 +11:00

1210 lines
44 KiB
Go

package tasks
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"vctp/db"
"vctp/db/queries"
"vctp/internal/metrics"
"vctp/internal/report"
"vctp/internal/utils"
"vctp/internal/vcenter"
"github.com/jmoiron/sqlx"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
type ctxLoggerKey struct{}
type deletionCandidate struct {
vmID string
vmUUID string
name string
cluster string
datacenter sql.NullString
}
type vcenterResources struct {
vms []mo.VirtualMachine
hostLookup map[string]vcenter.HostLookup
folderLookup map[string]string
rpLookup map[string]string
}
func loggerFromCtx(ctx context.Context, fallback *slog.Logger) *slog.Logger {
if ctx == nil {
return fallback
}
if l, ok := ctx.Value(ctxLoggerKey{}).(*slog.Logger); ok && l != nil {
return l
}
return fallback
}
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
// If force is true, any in-progress marker will be cleared before starting (useful for manual recovery).
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
snapshotFreq := durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour)
tracker := NewCronTracker(c.Database)
// Clear stale marker for this job only (short timeout to avoid blocking).
staleCtx, cancelStale := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelStale()
if err := tracker.ClearStale(staleCtx, "hourly_snapshot", jobTimeout); err != nil {
logger.Warn("failed to clear stale cron status", "error", err)
}
if force {
if err := tracker.ClearAllInProgress(staleCtx); err != nil {
logger.Warn("failed to clear in-progress flag (force run)", "error", err)
} else {
logger.Info("force run cleared in-progress marker before starting")
}
}
startedAt := time.Now()
defer func() {
// gocron logs the next run on its side, but log here for quick visibility.
logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt), "next_run_estimated", time.Now().Add(snapshotFreq))
}()
done, skip, err := tracker.Start(jobCtx, "hourly_snapshot")
if err != nil {
return err
}
if skip {
if force {
logger.Info("Force run requested; clearing in-progress marker and retrying")
if err := tracker.ClearAllInProgress(jobCtx); err != nil {
logger.Warn("failed to clear in-progress flag for force run", "error", err)
return nil
}
done, skip, err = tracker.Start(jobCtx, "hourly_snapshot")
if err != nil {
return err
}
if skip {
logger.Warn("Hourly snapshot still marked active after force clear; skipping")
return nil
}
} else {
logger.Warn("Hourly snapshot skipped because a previous run is still active", "force", force)
return nil
}
}
defer func() { done(err) }()
ctx, cancel := context.WithCancel(jobCtx)
defer cancel()
startTime := time.Now()
if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil {
return err
}
if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil {
return err
}
// Best-effort cleanup of legacy IsPresent columns to simplify inserts.
c.dropLegacyIsPresentColumns(jobCtx)
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
if c.FirstHourlySnapshotCheck {
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly")
if err != nil {
return err
}
minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600) / 2
if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second {
c.Logger.Info("Skipping hourly snapshot, last snapshot too recent",
"last_snapshot", lastSnapshot,
"min_interval_seconds", minIntervalSeconds,
)
c.FirstHourlySnapshotCheck = false
return nil
}
c.FirstHourlySnapshotCheck = false
}
tableName, err := hourlyInventoryTableName(startTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
db.ApplySQLiteTuning(ctx, dbConn)
if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil {
return err
}
var wg sync.WaitGroup
var errCount int64
concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency
if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 {
concurrencyLimit = override
}
var sem chan struct{}
if concurrencyLimit > 0 {
sem = make(chan struct{}, concurrencyLimit)
}
c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit)
for _, url := range c.Settings.Values.Settings.VcenterAddresses {
wg.Add(1)
go func(url string) {
defer wg.Done()
waitStarted := time.Now()
vcStart := time.Now()
if sem != nil {
sem <- struct{}{}
defer func() { <-sem }()
}
waitDuration := time.Since(waitStarted)
timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute)
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c.Logger.Info("Starting hourly snapshot for vcenter", "url", url)
if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil {
atomic.AddInt64(&errCount, 1)
c.Logger.Error("hourly snapshot failed", "error", err, "url", url)
} else {
c.Logger.Info("Finished hourly snapshot for vcenter",
"url", url,
"queue_wait", waitDuration,
"duration", time.Since(vcStart),
"timeout", timeout,
)
}
}(url)
}
wg.Wait()
if errCount > 0 {
err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
return err
}
rowCount, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName)
}
if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil {
c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName)
}
metrics.RecordHourlySnapshot(startTime, rowCount, err)
if err := c.generateReport(ctx, tableName); err != nil {
c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName)
}
c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount)
return nil
}
// dropLegacyIsPresentColumns attempts to remove the old IsPresent column from hourly tables.
// This keeps inserts simple and avoids keeping unused data around.
func (c *CronTask) dropLegacyIsPresentColumns(ctx context.Context) {
dbConn := c.Database.DB()
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
c.Logger.Debug("skip IsPresent cleanup; registry unavailable", "error", err)
return
}
records, err := report.ListSnapshots(ctx, c.Database, "hourly")
if err != nil {
c.Logger.Debug("skip IsPresent cleanup; unable to list hourly snapshots", "error", err)
return
}
for _, r := range records {
if ok, err := db.ColumnExists(ctx, dbConn, r.TableName, "IsPresent"); err == nil && ok {
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IsPresent"`, r.TableName)); err != nil {
c.Logger.Debug("unable to drop legacy IsPresent column", "table", r.TableName, "error", err)
} else {
c.Logger.Info("dropped legacy IsPresent column", "table", r.TableName)
}
}
}
}
// RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count.
func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) {
jobStart := time.Now()
defer func() {
logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart))
}()
maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries
if maxRetries <= 0 {
maxRetries = 3
}
dbConn := c.Database.DB()
if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries)
if err != nil {
return err
}
if len(failed) == 0 {
logger.Debug("No failed hourly snapshots to retry")
return nil
}
for _, f := range failed {
startTime := time.Unix(f.SnapshotTime, 0)
tableName, tnErr := hourlyInventoryTableName(startTime)
if tnErr != nil {
logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter)
continue
}
logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1)
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil {
logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err)
}
}
return nil
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup")
if err != nil {
return err
}
if skip {
logger.Warn("Snapshot cleanup skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil {
return err
}
startedAt := time.Now()
defer func() {
logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt))
}()
now := time.Now()
hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60)
dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12)
hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays)
dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0)
dbConn := c.Database.DB()
hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_")
if err != nil {
return err
}
removedHourly := 0
for _, table := range hourlyTables {
if strings.HasPrefix(table, "inventory_daily_summary_") {
continue
}
tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch")
if !ok {
continue
}
if tableDate.Before(truncateDate(hourlyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table)
} else {
removedHourly++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table)
}
}
}
}
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_")
if err != nil {
return err
}
removedDaily := 0
for _, table := range dailyTables {
tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102")
if !ok {
continue
}
if tableDate.Before(truncateDate(dailyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table)
} else {
removedDaily++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table)
}
}
}
}
c.Logger.Info("Finished snapshot cleanup",
"removed_hourly_tables", removedHourly,
"removed_daily_tables", removedDaily,
"hourly_max_age_days", hourlyMaxDays,
"daily_max_age_months", dailyMaxMonths,
)
return nil
}
func hourlyInventoryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
}
func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil {
return err
}
if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil {
return err
}
return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{
{Name: "VcpuCount", Type: "BIGINT"},
{Name: "RamGB", Type: "BIGINT"},
})
}
func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) {
if len(tables) == 0 {
return "", fmt.Errorf("no tables provided for union")
}
if len(columns) == 0 {
return "", fmt.Errorf("no columns provided for union")
}
queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ")
for _, table := range tables {
safeName, err := db.SafeTableName(table)
if err != nil {
return "", err
}
query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName)
if whereClause != "" {
query = fmt.Sprintf("%s WHERE %s", query, whereClause)
}
queries = append(queries, query)
}
if len(queries) == 0 {
return "", fmt.Errorf("no valid tables provided for union")
}
return strings.Join(queries, "\nUNION ALL\n"), nil
}
func templateExclusionFilter() string {
return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')`
}
func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) {
if !strings.HasPrefix(table, prefix) {
return time.Time{}, false
}
suffix := strings.TrimPrefix(table, prefix)
if layout == "epoch" {
epoch, err := strconv.ParseInt(suffix, 10, 64)
if err != nil {
return time.Time{}, false
}
return time.Unix(epoch, 0), true
}
parsed, err := time.Parse(layout, suffix)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func truncateDate(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord {
filtered := snapshots[:0]
for _, snapshot := range snapshots {
if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
filtered = append(filtered, snapshot)
}
}
return filtered
}
func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord {
filtered := records[:0]
for _, r := range records {
if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) {
filtered = append(filtered, r)
}
}
return filtered
}
var summaryUnionColumns = []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`,
}
// monthlyUnionColumns are the fields needed from daily summaries for monthly aggregation/refinement.
var monthlyUnionColumns = []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`,
`"SamplesPresent"`, `"AvgVcpuCount"`, `"AvgRamGB"`, `"AvgProvisionedDisk"`, `"AvgIsPresent"`,
`"PoolTinPct"`, `"PoolBronzePct"`, `"PoolSilverPct"`, `"PoolGoldPct"`,
`"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`, `"SnapshotTime"`,
}
func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "pgx", "postgres":
hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId")
if err != nil {
return err
}
if !hasColumn {
if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil {
return err
}
}
if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil {
return err
}
case "sqlite":
return nil
}
return nil
}
func nullInt64ToInt(value sql.NullInt64) int64 {
if value.Valid {
return value.Int64
}
return 0
}
func nullFloat64ToFloat(value sql.NullFloat64) float64 {
if value.Valid {
return value.Float64
}
return 0
}
func intWithDefault(value int, fallback int) int {
if value <= 0 {
return fallback
}
return value
}
func durationFromSeconds(seconds int, fallback time.Duration) time.Duration {
if seconds > 0 {
return time.Duration(seconds) * time.Second
}
return fallback
}
func normalizeResourcePool(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return ""
}
lower := strings.ToLower(trimmed)
canonical := map[string]string{
"tin": "Tin",
"bronze": "Bronze",
"silver": "Silver",
"gold": "Gold",
}
if val, ok := canonical[lower]; ok {
return val
}
return trimmed
}
func (c *CronTask) reportsDir() string {
if c.Settings != nil && c.Settings.Values != nil {
if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" {
return dir
}
}
return "/var/lib/vctp/reports"
}
func (c *CronTask) generateReport(ctx context.Context, tableName string) error {
dest := c.reportsDir()
_, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest)
return err
}
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (InventorySnapshotRow, error) {
if vmObject == nil {
return InventorySnapshotRow{}, fmt.Errorf("missing VM object")
}
row := InventorySnapshotRow{
Name: vmObject.Name,
Vcenter: vc.Vurl,
VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""},
SnapshotTime: snapshotTime.Unix(),
}
if inv != nil {
row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}
row.EventKey = inv.EventKey
row.CloudId = inv.CloudId
row.DeletionTime = inv.DeletionTime
}
if vmObject.Config != nil {
row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""}
if !vmObject.Config.CreateDate.IsZero() {
row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true}
}
row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0}
row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0}
totalDiskBytes := int64(0)
for _, device := range vmObject.Config.Hardware.Device {
if disk, ok := device.(*types.VirtualDisk); ok {
totalDiskBytes += disk.CapacityInBytes
}
}
if totalDiskBytes > 0 {
row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true}
}
if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" {
row.SrmPlaceholder = "TRUE"
} else {
row.SrmPlaceholder = "FALSE"
}
if vmObject.Config.Template {
row.IsTemplate = "TRUE"
} else {
row.IsTemplate = "FALSE"
}
}
if vmObject.Runtime.PowerState == "poweredOff" {
row.PoweredOn = "FALSE"
} else {
row.PoweredOn = "TRUE"
}
if inv != nil {
if inv.ResourcePool.Valid {
row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true}
}
row.Datacenter = inv.Datacenter
row.Cluster = inv.Cluster
row.Folder = inv.Folder
if !row.CreationTime.Valid {
row.CreationTime = inv.CreationTime
}
if !row.ProvisionedDisk.Valid {
row.ProvisionedDisk = inv.ProvisionedDisk
}
if !row.VcpuCount.Valid {
row.VcpuCount = inv.InitialVcpus
}
if !row.RamGB.Valid && inv.InitialRam.Valid {
row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0}
}
if row.IsTemplate == "" {
row.IsTemplate = boolStringFromInterface(inv.IsTemplate)
}
if row.PoweredOn == "" {
row.PoweredOn = boolStringFromInterface(inv.PoweredOn)
}
if row.SrmPlaceholder == "" {
row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder)
}
if !row.VmUuid.Valid {
row.VmUuid = inv.VmUuid
}
}
if row.ResourcePool.String == "" && vmObject.ResourcePool != nil {
if rpLookup != nil {
if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok {
row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""}
}
}
}
if row.Folder.String == "" {
if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok {
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
} else {
// Unable to resolve folder path from lookup; leave empty.
}
}
if vmObject.Runtime.Host != nil && hostLookup != nil {
if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok {
if row.Cluster.String == "" && lookup.Cluster != "" {
row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true}
}
if row.Datacenter.String == "" && lookup.Datacenter != "" {
row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true}
}
}
}
if row.Cluster.String == "" && vmObject.Runtime.Host != nil {
if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil {
row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""}
} else if vc.Logger != nil {
vc.Logger.Warn("failed to resolve cluster from host", "vm_id", vmObject.Reference().Value, "error", err)
}
}
if row.Datacenter.String == "" {
if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil {
row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""}
}
}
return row, nil
}
func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) InventorySnapshotRow {
return InventorySnapshotRow{
InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0},
Name: inv.Name,
Vcenter: inv.Vcenter,
VmId: inv.VmId,
EventKey: inv.EventKey,
CloudId: inv.CloudId,
CreationTime: inv.CreationTime,
DeletionTime: inv.DeletionTime,
ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid},
Datacenter: inv.Datacenter,
Cluster: inv.Cluster,
Folder: inv.Folder,
ProvisionedDisk: inv.ProvisionedDisk,
VcpuCount: inv.InitialVcpus,
RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0},
IsTemplate: boolStringFromInterface(inv.IsTemplate),
PoweredOn: boolStringFromInterface(inv.PoweredOn),
SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder),
VmUuid: inv.VmUuid,
SnapshotTime: snapshotTime.Unix(),
}
}
func loadInventoryMaps(ctx context.Context, dbConn *sqlx.DB, url string) ([]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, map[string]queries.Inventory, error) {
inventoryRows, err := queries.New(dbConn).GetInventoryByVcenter(ctx, url)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to query inventory table: %w", err)
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByName := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
}
if inv.VmUuid.Valid {
inventoryByUuid[inv.VmUuid.String] = inv
}
if inv.Name != "" {
inventoryByName[inv.Name] = inv
}
}
return inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, nil
}
func prepareDeletionCandidates(ctx context.Context, log *slog.Logger, dbConn *sqlx.DB, url string, inventoryRows []queries.Inventory,
presentSnapshots map[string]InventorySnapshotRow, presentByUuid, presentByName map[string]struct{}, startTime time.Time) (int, bool, []deletionCandidate) {
candidates := make([]deletionCandidate, 0)
missingCount := 0
deletionsMarked := false
for _, inv := range inventoryRows {
log.Debug("checking inventory for deletions", "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "name", inv.Name)
if strings.HasPrefix(inv.Name, "vCLS-") {
continue
}
vmID := inv.VmId.String
uuid := ""
if inv.VmUuid.Valid {
uuid = inv.VmUuid.String
}
name := inv.Name
found := false
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
found = true
}
}
if !found && uuid != "" {
if _, ok := presentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := presentByName[name]; ok {
found = true
}
}
if found {
continue
}
row := snapshotFromInventory(inv, startTime)
if !row.DeletionTime.Valid {
deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
if err := queries.New(dbConn).InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: row.DeletionTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
log.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
log.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime)
deletionsMarked = true
}
clusterName := ""
if inv.Cluster.Valid {
clusterName = inv.Cluster.String
}
candidates = append(candidates, deletionCandidate{
vmID: vmID,
vmUUID: uuid,
name: name,
cluster: clusterName,
datacenter: inv.Datacenter,
})
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil {
log.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err)
}
if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil {
log.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err)
}
missingCount++
}
return missingCount, deletionsMarked, candidates
}
// buildPresentSnapshots converts vCenter VM objects into snapshot rows and aggregates totals.
func (c *CronTask) buildPresentSnapshots(ctx context.Context, dbConn *sqlx.DB, vc *vcenter.Vcenter, vcVms []mo.VirtualMachine, startTime time.Time, url string,
inventoryByVmID map[string]queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup map[string]string, rpLookup map[string]string) (map[string]InventorySnapshotRow, map[string]struct{}, map[string]struct{}, snapshotTotals) {
log := loggerFromCtx(ctx, c.Logger)
presentSnapshots := make(map[string]InventorySnapshotRow, len(vcVms))
presentByUuid := make(map[string]struct{}, len(vcVms))
presentByName := make(map[string]struct{}, len(vcVms))
totals := snapshotTotals{}
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name, "vCLS-") {
continue
}
if vm.Config != nil && vm.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup)
if err != nil {
log.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
if err := db.UpsertVmIdentity(ctx, dbConn, url, row.VmId, row.VmUuid, row.Name, row.Cluster, startTime); err != nil {
log.Warn("failed to upsert vm identity", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err)
}
clusterName := ""
if row.Cluster.Valid {
clusterName = row.Cluster.String
}
if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, row.VmId.String, row.VmUuid.String, row.Name, clusterName, startTime); err != nil {
log.Warn("failed to upsert vm lifecycle cache", "vcenter", url, "vm_id", row.VmId, "vm_uuid", row.VmUuid, "name", row.Name, "error", err)
}
presentSnapshots[vm.Reference().Value] = row
if row.VmUuid.Valid {
presentByUuid[row.VmUuid.String] = struct{}{}
}
if row.Name != "" {
presentByName[row.Name] = struct{}{}
}
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
totals.RamTotal += nullInt64ToInt(row.RamGB)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
return presentSnapshots, presentByUuid, presentByName, totals
}
// initVcenterResources logs into vCenter, fetches VMs, builds lookups, and returns a cleanup function for logout.
func (c *CronTask) initVcenterResources(ctx context.Context, log *slog.Logger, url string, startTime, started time.Time) (*vcenter.Vcenter, vcenterResources, func(), error) {
res := vcenterResources{}
vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil {
log.Warn("failed to record snapshot run", "url", url, "error", upErr)
}
return nil, res, nil, fmt.Errorf("unable to connect to vcenter: %w", err)
}
cleanup := func() {
logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := vc.Logout(logCtx); err != nil {
log.Warn("vcenter logout failed", "url", url, "error", err)
} else {
log.Debug("vcenter logout succeeded", "url", url)
}
}
vms, err := vc.GetAllVMsWithProps()
if err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil {
log.Warn("failed to record snapshot run", "url", url, "error", upErr)
}
cleanup()
return nil, res, nil, fmt.Errorf("unable to get VMs from vcenter: %w", err)
}
log.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vms))
if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil {
log.Warn("failed to ensure vm identity tables", "error", err)
}
hostLookup, err := vc.BuildHostLookup()
if err != nil {
log.Warn("failed to build host lookup", "url", url, "error", err)
hostLookup = nil
} else {
log.Debug("built host lookup", "url", url, "hosts", len(hostLookup))
}
folderLookup, err := vc.BuildFolderPathLookup()
if err != nil {
log.Warn("failed to build folder lookup", "url", url, "error", err)
folderLookup = nil
} else {
log.Debug("built folder lookup", "url", url, "folders", len(folderLookup))
}
rpLookup, err := vc.BuildResourcePoolLookup()
if err != nil {
log.Warn("failed to build resource pool lookup", "url", url, "error", err)
rpLookup = nil
} else {
log.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup))
}
res.vms = vms
res.hostLookup = hostLookup
res.folderLookup = folderLookup
res.rpLookup = rpLookup
return vc, res, cleanup, nil
}
func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error {
log := c.Logger.With("vcenter", url)
ctx = context.WithValue(ctx, ctxLoggerKey{}, log)
started := time.Now()
log.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc, resources, cleanup, err := c.initVcenterResources(ctx, log, url, startTime, started)
if err != nil {
return err
}
defer cleanup()
vcVms := resources.vms
hostLookup := resources.hostLookup
folderLookup := resources.folderLookup
rpLookup := resources.rpLookup
inventoryRows, inventoryByVmID, inventoryByUuid, inventoryByName, err := loadInventoryMaps(ctx, c.Database.DB(), url)
if err != nil {
return err
}
dbConn := c.Database.DB()
presentSnapshots, presentByUuid, presentByName, totals := c.buildPresentSnapshots(ctx, dbConn, vc, vcVms, startTime, url, inventoryByVmID, hostLookup, folderLookup, rpLookup)
deletionsMarked := false
candidates := make([]deletionCandidate, 0)
var prevVmCount sql.NullInt64
countQuery := `SELECT "VmCount" FROM vcenter_totals WHERE "Vcenter" = ? ORDER BY "SnapshotTime" DESC LIMIT 1`
countQuery = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), countQuery)
if err := dbConn.QueryRowContext(ctx, countQuery, url).Scan(&prevVmCount); err != nil && !errors.Is(err, sql.ErrNoRows) {
c.Logger.Warn("failed to read previous vcenter totals", "vcenter", url, "error", err)
}
c.Logger.Debug("hourly snapshot rows prepared", "vcenter", url, "rows", len(presentSnapshots))
batch := make([]InventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows))
for _, row := range presentSnapshots {
batch = append(batch, row)
}
log.Debug("checking inventory for missing VMs")
missingCount, deletionsMarked, candidates := prepareDeletionCandidates(ctx, log, dbConn, url, inventoryRows, presentSnapshots, presentByUuid, presentByName, startTime)
newCount := 0
prevTableName := ""
// If deletions detected, refine deletion time using vCenter events in a small window.
if missingCount > 0 {
freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second
if freq <= 0 {
freq = time.Hour
}
begin := startTime.Add(-4 * freq)
end := startTime
events, err := vc.FindVmDeletionEvents(ctx, begin, end)
if err != nil {
log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err)
} else {
log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC())
for _, cand := range candidates {
if t, ok := events[cand.vmID]; ok {
delTs := sql.NullInt64{Int64: t.Unix(), Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: delTs,
VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""},
DatacenterName: cand.datacenter,
}); err != nil {
log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
}
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil {
log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err)
}
log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t)
}
}
}
}
// If VM count dropped vs prior totals but we didn't mark missing, still look for events (best-effort logging).
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second
if freq <= 0 {
freq = time.Hour
}
begin := startTime.Add(-2 * freq)
end := startTime
events, err := vc.FindVmDeletionEvents(ctx, begin, end)
if err != nil {
log.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount)
} else {
log.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end)
}
}
log.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch))
if err := insertHourlyCache(ctx, dbConn, batch); err != nil {
log.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err)
}
if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err)
if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil {
log.Warn("failed to record snapshot run", "url", url, "error", upErr)
}
return err
}
// Record per-vCenter totals snapshot.
if err := db.InsertVcenterTotals(ctx, dbConn, url, startTime, totals.VmCount, totals.VcpuTotal, totals.RamTotal); err != nil {
slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err)
}
// Discover previous snapshots once per run (serial) to avoid concurrent probes across vCenters.
prevTableName, newCount, missingCount = c.compareWithPreviousSnapshot(ctx, dbConn, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName, missingCount)
// If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window.
if missingCount == 0 && prevVmCount.Valid && prevVmCount.Int64 > int64(totals.VmCount) {
// Fallback: compare against latest registered snapshot table.
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger)); err == nil && prevTable != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
if moreMissing > 0 {
missingCount += moreMissing
}
// Reuse this table name for later snapshot lookups when correlating deletion events.
prevTableName = prevTable
}
freq := time.Duration(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) * time.Second
if freq <= 0 {
freq = time.Hour
}
begin := startTime.Add(-4 * freq)
end := startTime
events, err := vc.FindVmDeletionEvents(ctx, begin, end)
if err != nil {
c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount)
} else {
c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start_local", begin, "window_end_local", end, "window_start_utc", begin.UTC(), "window_end_utc", end.UTC(), "window_minutes", end.Sub(begin).Minutes())
for vmID, t := range events {
// Skip if VM is still present.
if _, ok := presentSnapshots[vmID]; ok {
continue
}
inv, ok := inventoryByVmID[vmID]
var snapRow InventorySnapshotRow
if !ok {
var found bool
snapRow, found = findVMInHourlySnapshots(ctx, dbConn, url, vmID, prevTableName)
if !found {
c.Logger.Debug("count-drop: deletion event has no snapshot match", "vm_id", vmID, "vcenter", url, "event_time", t)
continue
}
inv = queries.Inventory{
VmId: snapRow.VmId,
VmUuid: snapRow.VmUuid,
Name: snapRow.Name,
Datacenter: snapRow.Datacenter,
}
c.Logger.Info("count-drop: correlated deletion via snapshot lookup", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "snapshot_table", prevTableName)
}
// Prefer UUID from snapshot if inventory entry lacks it.
if !inv.VmUuid.Valid && snapRow.VmUuid.Valid {
inv.VmUuid = snapRow.VmUuid
}
delTs := sql.NullInt64{Int64: t.Unix(), Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: delTs,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("count-drop: failed to update inventory deletion time from event", "vm_id", vmID, "vcenter", url, "error", err)
} else {
c.Logger.Info("count-drop: correlated deletion event to inventory", "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "name", inv.Name, "vcenter", url, "event_time", t, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount)
}
clusterName := ""
if inv.Cluster.Valid {
clusterName = inv.Cluster.String
}
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, vmID, inv.VmUuid.String, inv.Name, clusterName, t.Unix()); err != nil {
c.Logger.Warn("count-drop: failed to refine lifecycle cache deletion time", "vm_id", vmID, "vm_uuid", inv.VmUuid, "vcenter", url, "error", err)
}
missingCount++
deletionsMarked = true
}
}
}
// Backfill lifecycle deletions for VMs missing from inventory and without DeletedAt.
if err := backfillLifecycleDeletionsToday(ctx, log, dbConn, url, startTime, presentSnapshots); err != nil {
log.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err)
}
log.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
"missing_marked", missingCount,
"created_since_prev", newCount,
"deleted_since_prev", missingCount,
)
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil)
if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil {
log.Warn("failed to record snapshot run", "url", url, "error", upErr)
}
if deletionsMarked {
if err := c.generateReport(ctx, tableName); err != nil {
log.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName)
} else {
log.Debug("Regenerated hourly report after deletions", "table", tableName)
}
}
return nil
}
// compareWithPreviousSnapshot cross-checks current vs. previous hourly snapshots:
// marks deletions, detects new VMs when no gap exists, and returns the previous table name along with new/missing counts.
func (c *CronTask) compareWithPreviousSnapshot(
ctx context.Context,
dbConn *sqlx.DB,
url string,
startTime time.Time,
presentSnapshots map[string]InventorySnapshotRow,
presentByUuid map[string]struct{},
presentByName map[string]struct{},
inventoryByVmID map[string]queries.Inventory,
inventoryByUuid map[string]queries.Inventory,
inventoryByName map[string]queries.Inventory,
missingCount int,
) (string, int, int) {
prevTableName, prevTableErr := latestHourlySnapshotBefore(ctx, dbConn, startTime, loggerFromCtx(ctx, c.Logger))
if prevTableErr != nil {
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", prevTableErr, "url", url)
}
prevSnapshotTime, _ := parseSnapshotTime(prevTableName)
newCount := 0
if prevTableName != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
missingCount += moreMissing
expectedSeconds := int64(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds) / 2
// Skip only if snapshots are closer together than half the configured cadence
if SnapshotTooSoon(prevSnapshotTime, startTime.Unix(), expectedSeconds) {
c.Logger.Info("skipping new-VM detection because snapshots are too close together", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix(), "expected_interval_seconds", expectedSeconds)
} else {
newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
if newCount > 0 {
newRows := listNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots)
names := make([]string, 0, len(newRows))
for _, r := range newRows {
if r.Name != "" {
names = append(names, r.Name)
} else if r.VmId.Valid {
names = append(names, r.VmId.String)
}
}
c.Logger.Info("new VMs since previous snapshot", "prev_table", prevTableName, "count", newCount, "names", names)
}
}
c.Logger.Debug("compared with previous snapshot", "prev_table", prevTableName, "new_since_prev", newCount, "missing_since_prev", missingCount)
} else {
newCount = len(presentSnapshots)
}
return prevTableName, newCount, missingCount
}