Files
vctp2/internal/tasks/inventorySnapshots.go
Nathan Coad e5e5be37a3
All checks were successful
continuous-integration/drone/push Build is passing
handle crashes better
2026-01-15 16:02:58 +11:00

1100 lines
33 KiB
Go

package tasks
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"vctp/db"
"vctp/db/queries"
"vctp/internal/metrics"
"vctp/internal/report"
"vctp/internal/utils"
"vctp/internal/vcenter"
"github.com/jmoiron/sqlx"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
type inventorySnapshotRow struct {
InventoryId sql.NullInt64
Name string
Vcenter string
VmId sql.NullString
EventKey sql.NullString
CloudId sql.NullString
CreationTime sql.NullInt64
DeletionTime sql.NullInt64
ResourcePool sql.NullString
Datacenter sql.NullString
Cluster sql.NullString
Folder sql.NullString
ProvisionedDisk sql.NullFloat64
VcpuCount sql.NullInt64
RamGB sql.NullInt64
IsTemplate string
PoweredOn string
SrmPlaceholder string
VmUuid sql.NullString
SnapshotTime int64
IsPresent string
}
type snapshotTotals = db.SnapshotTotals
// RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table.
func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.HourlyJobTimeoutSeconds, 20*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
// Clear any stale in-progress markers (e.g., after a crash) before attempting the run.
if err := tracker.ClearAllInProgress(jobCtx); err != nil {
logger.Warn("failed to clear stale cron status", "error", err)
}
startedAt := time.Now()
defer func() {
logger.Info("Hourly snapshot job finished", "duration", time.Since(startedAt))
}()
done, skip, err := tracker.Start(jobCtx, "hourly_snapshot")
if err != nil {
return err
}
if skip {
logger.Warn("Hourly snapshot skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
ctx, cancel := context.WithCancel(jobCtx)
defer cancel()
startTime := time.Now()
if err := db.CheckMigrationState(ctx, c.Database.DB()); err != nil {
return err
}
if err := db.EnsureSnapshotRunTable(ctx, c.Database.DB()); err != nil {
return err
}
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
if c.FirstHourlySnapshotCheck {
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
lastSnapshot, err := report.LatestSnapshotTime(ctx, c.Database, "hourly")
if err != nil {
return err
}
minIntervalSeconds := intWithDefault(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, 3600)
if !lastSnapshot.IsZero() && startTime.Sub(lastSnapshot) < time.Duration(minIntervalSeconds)*time.Second {
c.Logger.Info("Skipping hourly snapshot, last snapshot too recent",
"last_snapshot", lastSnapshot,
"min_interval_seconds", minIntervalSeconds,
)
c.FirstHourlySnapshotCheck = false
return nil
}
c.FirstHourlySnapshotCheck = false
}
tableName, err := hourlyInventoryTableName(startTime)
if err != nil {
return err
}
dbConn := c.Database.DB()
db.ApplySQLiteTuning(ctx, dbConn)
if err := ensureDailyInventoryTable(ctx, dbConn, tableName); err != nil {
return err
}
var wg sync.WaitGroup
var errCount int64
concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency
if override, ok := utils.EnvInt("VCTP_HOURLY_SNAPSHOT_CONCURRENCY"); ok && override >= 0 {
concurrencyLimit = override
}
var sem chan struct{}
if concurrencyLimit > 0 {
sem = make(chan struct{}, concurrencyLimit)
}
c.Logger.Info("Starting hourly snapshots", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "concurrency_limit", concurrencyLimit)
for _, url := range c.Settings.Values.Settings.VcenterAddresses {
wg.Add(1)
go func(url string) {
defer wg.Done()
waitStarted := time.Now()
vcStart := time.Now()
if sem != nil {
sem <- struct{}{}
defer func() { <-sem }()
}
waitDuration := time.Since(waitStarted)
timeout := durationFromSeconds(c.Settings.Values.Settings.HourlySnapshotTimeoutSeconds, 10*time.Minute)
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c.Logger.Info("Starting hourly snapshot for vcenter", "url", url)
if err := c.captureHourlySnapshotForVcenter(runCtx, startTime, tableName, url); err != nil {
atomic.AddInt64(&errCount, 1)
c.Logger.Error("hourly snapshot failed", "error", err, "url", url)
} else {
c.Logger.Info("Finished hourly snapshot for vcenter",
"url", url,
"queue_wait", waitDuration,
"duration", time.Since(vcStart),
"timeout", timeout,
)
}
}(url)
}
wg.Wait()
if errCount > 0 {
err = fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
return err
}
rowCount, err := db.TableRowCount(ctx, dbConn, tableName)
if err != nil {
c.Logger.Warn("unable to count hourly snapshot rows", "error", err, "table", tableName)
}
if err := report.RegisterSnapshot(ctx, c.Database, "hourly", tableName, startTime, rowCount); err != nil {
c.Logger.Warn("failed to register hourly snapshot", "error", err, "table", tableName)
}
metrics.RecordHourlySnapshot(startTime, rowCount, err)
if err := c.generateReport(ctx, tableName); err != nil {
c.Logger.Warn("failed to generate hourly report", "error", err, "table", tableName)
}
c.Logger.Debug("Finished hourly vcenter snapshot", "vcenter_count", len(c.Settings.Values.Settings.VcenterAddresses), "table", tableName, "row_count", rowCount)
return nil
}
// RunHourlySnapshotRetry retries failed vCenter hourly snapshots up to a maximum attempt count.
func (c *CronTask) RunHourlySnapshotRetry(ctx context.Context, logger *slog.Logger) (err error) {
jobStart := time.Now()
defer func() {
logger.Info("Hourly snapshot retry finished", "duration", time.Since(jobStart))
}()
maxRetries := c.Settings.Values.Settings.HourlySnapshotMaxRetries
if maxRetries <= 0 {
maxRetries = 3
}
dbConn := c.Database.DB()
if err := db.EnsureSnapshotRunTable(ctx, dbConn); err != nil {
return err
}
failed, err := db.ListFailedSnapshotRuns(ctx, dbConn, maxRetries)
if err != nil {
return err
}
if len(failed) == 0 {
logger.Debug("No failed hourly snapshots to retry")
return nil
}
for _, f := range failed {
startTime := time.Unix(f.SnapshotTime, 0)
tableName, tnErr := hourlyInventoryTableName(startTime)
if tnErr != nil {
logger.Warn("unable to derive table name for retry", "error", tnErr, "snapshot_time", startTime, "vcenter", f.Vcenter)
continue
}
logger.Info("Retrying hourly snapshot", "vcenter", f.Vcenter, "snapshot_time", startTime, "attempt", f.Attempts+1)
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, f.Vcenter); err != nil {
logger.Warn("retry failed", "vcenter", f.Vcenter, "error", err)
}
}
return nil
}
// RunSnapshotCleanup drops hourly and daily snapshot tables older than retention.
func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) (err error) {
jobCtx := ctx
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.CleanupJobTimeoutSeconds, 10*time.Minute)
if jobTimeout > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(ctx, jobTimeout)
defer cancel()
}
tracker := NewCronTracker(c.Database)
done, skip, err := tracker.Start(jobCtx, "snapshot_cleanup")
if err != nil {
return err
}
if skip {
logger.Warn("Snapshot cleanup skipped because a previous run is still active")
return nil
}
defer func() { done(err) }()
if err := db.CheckMigrationState(jobCtx, c.Database.DB()); err != nil {
return err
}
startedAt := time.Now()
defer func() {
logger.Info("Snapshot cleanup job finished", "duration", time.Since(startedAt))
}()
now := time.Now()
hourlyMaxDays := intWithDefault(c.Settings.Values.Settings.HourlySnapshotMaxAgeDays, 60)
dailyMaxMonths := intWithDefault(c.Settings.Values.Settings.DailySnapshotMaxAgeMonths, 12)
hourlyCutoff := now.AddDate(0, 0, -hourlyMaxDays)
dailyCutoff := now.AddDate(0, -dailyMaxMonths, 0)
dbConn := c.Database.DB()
hourlyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_hourly_")
if err != nil {
return err
}
removedHourly := 0
for _, table := range hourlyTables {
if strings.HasPrefix(table, "inventory_daily_summary_") {
continue
}
tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch")
if !ok {
continue
}
if tableDate.Before(truncateDate(hourlyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop hourly snapshot table", "error", err, "table", table)
} else {
removedHourly++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove hourly snapshot registry entry", "error", err, "table", table)
}
}
}
}
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, "inventory_daily_summary_")
if err != nil {
return err
}
removedDaily := 0
for _, table := range dailyTables {
tableDate, ok := parseSnapshotDate(table, "inventory_daily_summary_", "20060102")
if !ok {
continue
}
if tableDate.Before(truncateDate(dailyCutoff)) {
if err := dropSnapshotTable(ctx, dbConn, table); err != nil {
c.Logger.Error("failed to drop daily snapshot table", "error", err, "table", table)
} else {
removedDaily++
if err := report.DeleteSnapshotRecord(ctx, c.Database, table); err != nil {
c.Logger.Warn("failed to remove daily snapshot registry entry", "error", err, "table", table)
}
}
}
}
c.Logger.Info("Finished snapshot cleanup",
"removed_hourly_tables", removedHourly,
"removed_daily_tables", removedDaily,
"hourly_max_age_days", hourlyMaxDays,
"daily_max_age_months", dailyMaxMonths,
)
return nil
}
func hourlyInventoryTableName(t time.Time) (string, error) {
return db.SafeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
}
func ensureDailyInventoryTable(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
if err := db.EnsureSnapshotTable(ctx, dbConn, tableName); err != nil {
return err
}
if err := ensureSnapshotRowID(ctx, dbConn, tableName); err != nil {
return err
}
return db.EnsureColumns(ctx, dbConn, tableName, []db.ColumnDef{
{Name: "VcpuCount", Type: "BIGINT"},
{Name: "RamGB", Type: "BIGINT"},
})
}
func buildUnionQuery(tables []string, columns []string, whereClause string) (string, error) {
if len(tables) == 0 {
return "", fmt.Errorf("no tables provided for union")
}
if len(columns) == 0 {
return "", fmt.Errorf("no columns provided for union")
}
queries := make([]string, 0, len(tables))
columnList := strings.Join(columns, ", ")
for _, table := range tables {
safeName, err := db.SafeTableName(table)
if err != nil {
return "", err
}
query := fmt.Sprintf("SELECT %s FROM %s", columnList, safeName)
if whereClause != "" {
query = fmt.Sprintf("%s WHERE %s", query, whereClause)
}
queries = append(queries, query)
}
if len(queries) == 0 {
return "", fmt.Errorf("no valid tables provided for union")
}
return strings.Join(queries, "\nUNION ALL\n"), nil
}
func templateExclusionFilter() string {
return `COALESCE(CAST("IsTemplate" AS TEXT), '') NOT IN ('TRUE', 'true', '1')`
}
func parseSnapshotDate(table string, prefix string, layout string) (time.Time, bool) {
if !strings.HasPrefix(table, prefix) {
return time.Time{}, false
}
suffix := strings.TrimPrefix(table, prefix)
if layout == "epoch" {
epoch, err := strconv.ParseInt(suffix, 10, 64)
if err != nil {
return time.Time{}, false
}
return time.Unix(epoch, 0), true
}
parsed, err := time.Parse(layout, suffix)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func truncateDate(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := db.SafeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
return err
}
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := db.SafeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
if err != nil {
return fmt.Errorf("failed to clear table %s: %w", table, err)
}
return nil
}
func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord {
filtered := snapshots[:0]
for _, snapshot := range snapshots {
if rowsExist, err := db.TableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
filtered = append(filtered, snapshot)
}
}
return filtered
}
func filterRecordsInRange(records []report.SnapshotRecord, start, end time.Time) []report.SnapshotRecord {
filtered := records[:0]
for _, r := range records {
if !r.SnapshotTime.Before(start) && r.SnapshotTime.Before(end) {
filtered = append(filtered, r)
}
}
return filtered
}
type columnDef struct {
Name string
Type string
}
var summaryUnionColumns = []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, `"IsPresent"`,
}
func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
case "pgx", "postgres":
hasColumn, err := db.ColumnExists(ctx, dbConn, tableName, "RowId")
if err != nil {
return err
}
if !hasColumn {
if err := db.AddColumnIfMissing(ctx, dbConn, tableName, db.ColumnDef{Name: "RowId", Type: "BIGSERIAL"}); err != nil {
return err
}
}
if err := db.BackfillSerialColumn(ctx, dbConn, tableName, "RowId"); err != nil {
return err
}
case "sqlite":
return nil
}
return nil
}
func nullInt64ToInt(value sql.NullInt64) int64 {
if value.Valid {
return value.Int64
}
return 0
}
func nullFloat64ToFloat(value sql.NullFloat64) float64 {
if value.Valid {
return value.Float64
}
return 0
}
func intWithDefault(value int, fallback int) int {
if value <= 0 {
return fallback
}
return value
}
func durationFromSeconds(seconds int, fallback time.Duration) time.Duration {
if seconds > 0 {
return time.Duration(seconds) * time.Second
}
return fallback
}
func normalizeResourcePool(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return ""
}
lower := strings.ToLower(trimmed)
canonical := map[string]string{
"tin": "Tin",
"bronze": "Bronze",
"silver": "Silver",
"gold": "Gold",
}
if val, ok := canonical[lower]; ok {
return val
}
return trimmed
}
func (c *CronTask) reportsDir() string {
if c.Settings != nil && c.Settings.Values != nil {
if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" {
return dir
}
}
return "/var/lib/vctp/reports"
}
func (c *CronTask) generateReport(ctx context.Context, tableName string) error {
dest := c.reportsDir()
_, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest)
return err
}
func snapshotFromVM(vmObject *mo.VirtualMachine, vc *vcenter.Vcenter, snapshotTime time.Time, inv *queries.Inventory, hostLookup map[string]vcenter.HostLookup, folderLookup vcenter.FolderLookup, rpLookup map[string]string) (inventorySnapshotRow, error) {
if vmObject == nil {
return inventorySnapshotRow{}, fmt.Errorf("missing VM object")
}
row := inventorySnapshotRow{
Name: vmObject.Name,
Vcenter: vc.Vurl,
VmId: sql.NullString{String: vmObject.Reference().Value, Valid: vmObject.Reference().Value != ""},
SnapshotTime: snapshotTime.Unix(),
}
if inv != nil {
row.InventoryId = sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0}
row.EventKey = inv.EventKey
row.CloudId = inv.CloudId
row.DeletionTime = inv.DeletionTime
}
if vmObject.Config != nil {
row.VmUuid = sql.NullString{String: vmObject.Config.Uuid, Valid: vmObject.Config.Uuid != ""}
if !vmObject.Config.CreateDate.IsZero() {
row.CreationTime = sql.NullInt64{Int64: vmObject.Config.CreateDate.Unix(), Valid: true}
}
row.VcpuCount = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.NumCPU), Valid: vmObject.Config.Hardware.NumCPU > 0}
row.RamGB = sql.NullInt64{Int64: int64(vmObject.Config.Hardware.MemoryMB) / 1024, Valid: vmObject.Config.Hardware.MemoryMB > 0}
totalDiskBytes := int64(0)
for _, device := range vmObject.Config.Hardware.Device {
if disk, ok := device.(*types.VirtualDisk); ok {
totalDiskBytes += disk.CapacityInBytes
}
}
if totalDiskBytes > 0 {
row.ProvisionedDisk = sql.NullFloat64{Float64: float64(totalDiskBytes / 1024 / 1024 / 1024), Valid: true}
}
if vmObject.Config.ManagedBy != nil && vmObject.Config.ManagedBy.ExtensionKey == "com.vmware.vcDr" && vmObject.Config.ManagedBy.Type == "placeholderVm" {
row.SrmPlaceholder = "TRUE"
} else {
row.SrmPlaceholder = "FALSE"
}
if vmObject.Config.Template {
row.IsTemplate = "TRUE"
} else {
row.IsTemplate = "FALSE"
}
}
if vmObject.Runtime.PowerState == "poweredOff" {
row.PoweredOn = "FALSE"
} else {
row.PoweredOn = "TRUE"
}
if inv != nil {
if inv.ResourcePool.Valid {
row.ResourcePool = sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: true}
}
row.Datacenter = inv.Datacenter
row.Cluster = inv.Cluster
row.Folder = inv.Folder
if !row.CreationTime.Valid {
row.CreationTime = inv.CreationTime
}
if !row.ProvisionedDisk.Valid {
row.ProvisionedDisk = inv.ProvisionedDisk
}
if !row.VcpuCount.Valid {
row.VcpuCount = inv.InitialVcpus
}
if !row.RamGB.Valid && inv.InitialRam.Valid {
row.RamGB = sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Int64 > 0}
}
if row.IsTemplate == "" {
row.IsTemplate = boolStringFromInterface(inv.IsTemplate)
}
if row.PoweredOn == "" {
row.PoweredOn = boolStringFromInterface(inv.PoweredOn)
}
if row.SrmPlaceholder == "" {
row.SrmPlaceholder = boolStringFromInterface(inv.SrmPlaceholder)
}
if !row.VmUuid.Valid {
row.VmUuid = inv.VmUuid
}
}
if row.ResourcePool.String == "" && vmObject.ResourcePool != nil {
if rpLookup != nil {
if rpName, ok := rpLookup[vmObject.ResourcePool.Value]; ok {
row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""}
}
}
if !row.ResourcePool.Valid {
if rpName, err := vc.GetVmResourcePool(*vmObject); err == nil {
row.ResourcePool = sql.NullString{String: normalizeResourcePool(rpName), Valid: rpName != ""}
}
}
}
if row.Folder.String == "" {
if folderPath, ok := vc.GetVMFolderPathFromLookup(*vmObject, folderLookup); ok {
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
} else if folderPath, err := vc.GetVMFolderPath(*vmObject); err == nil {
row.Folder = sql.NullString{String: folderPath, Valid: folderPath != ""}
}
}
if vmObject.Runtime.Host != nil && hostLookup != nil {
if lookup, ok := hostLookup[vmObject.Runtime.Host.Value]; ok {
if row.Cluster.String == "" && lookup.Cluster != "" {
row.Cluster = sql.NullString{String: lookup.Cluster, Valid: true}
}
if row.Datacenter.String == "" && lookup.Datacenter != "" {
row.Datacenter = sql.NullString{String: lookup.Datacenter, Valid: true}
}
}
}
if row.Cluster.String == "" {
if clusterName, err := vc.GetClusterFromHost(vmObject.Runtime.Host); err == nil {
row.Cluster = sql.NullString{String: clusterName, Valid: clusterName != ""}
}
}
if row.Datacenter.String == "" {
if dcName, err := vc.GetDatacenterForVM(*vmObject); err == nil {
row.Datacenter = sql.NullString{String: dcName, Valid: dcName != ""}
}
}
return row, nil
}
func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) inventorySnapshotRow {
return inventorySnapshotRow{
InventoryId: sql.NullInt64{Int64: inv.Iid, Valid: inv.Iid > 0},
Name: inv.Name,
Vcenter: inv.Vcenter,
VmId: inv.VmId,
EventKey: inv.EventKey,
CloudId: inv.CloudId,
CreationTime: inv.CreationTime,
DeletionTime: inv.DeletionTime,
ResourcePool: sql.NullString{String: normalizeResourcePool(inv.ResourcePool.String), Valid: inv.ResourcePool.Valid},
Datacenter: inv.Datacenter,
Cluster: inv.Cluster,
Folder: inv.Folder,
ProvisionedDisk: inv.ProvisionedDisk,
VcpuCount: inv.InitialVcpus,
RamGB: sql.NullInt64{Int64: inv.InitialRam.Int64 / 1024, Valid: inv.InitialRam.Valid && inv.InitialRam.Int64 > 0},
IsTemplate: boolStringFromInterface(inv.IsTemplate),
PoweredOn: boolStringFromInterface(inv.PoweredOn),
SrmPlaceholder: boolStringFromInterface(inv.SrmPlaceholder),
VmUuid: inv.VmUuid,
SnapshotTime: snapshotTime.Unix(),
}
}
func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []inventorySnapshotRow) error {
if len(rows) == 0 {
return nil
}
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PreparexContext(ctx, sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), fmt.Sprintf(`
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", "IsPresent"
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, tableName)))
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, row := range rows {
if _, err := stmt.ExecContext(ctx,
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
row.IsPresent,
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error {
started := time.Now()
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return fmt.Errorf("unable to connect to vcenter: %w", err)
}
defer func() {
if err := vc.Logout(); err != nil {
c.Logger.Warn("vcenter logout failed", "url", url, "error", err)
}
}()
vcVms, err := vc.GetAllVMsWithProps()
if err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return fmt.Errorf("unable to get VMs from vcenter: %w", err)
}
hostLookup, err := vc.BuildHostLookup()
if err != nil {
c.Logger.Warn("failed to build host lookup", "url", url, "error", err)
hostLookup = nil
} else {
c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup))
}
folderLookup, err := vc.BuildFolderPathLookup()
if err != nil {
c.Logger.Warn("failed to build folder lookup", "url", url, "error", err)
folderLookup = nil
} else {
c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup))
}
rpLookup, err := vc.BuildResourcePoolLookup()
if err != nil {
c.Logger.Warn("failed to build resource pool lookup", "url", url, "error", err)
rpLookup = nil
} else {
c.Logger.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup))
}
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
if err != nil {
return fmt.Errorf("unable to query inventory table: %w", err)
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByUuid := make(map[string]queries.Inventory, len(inventoryRows))
inventoryByName := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
}
if inv.VmUuid.Valid {
inventoryByUuid[inv.VmUuid.String] = inv
}
if inv.Name != "" {
inventoryByName[inv.Name] = inv
}
}
dbConn := c.Database.DB()
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
presentByUuid := make(map[string]struct{}, len(vcVms))
presentByName := make(map[string]struct{}, len(vcVms))
totals := snapshotTotals{}
deletionsMarked := false
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name, "vCLS-") {
continue
}
if vm.Config != nil && vm.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(&vm, vc, startTime, inv, hostLookup, folderLookup, rpLookup)
if err != nil {
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row
if row.VmUuid.Valid {
presentByUuid[row.VmUuid.String] = struct{}{}
}
if row.Name != "" {
presentByName[row.Name] = struct{}{}
}
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
totals.RamTotal += nullInt64ToInt(row.RamGB)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
batch := make([]inventorySnapshotRow, 0, len(presentSnapshots)+len(inventoryRows))
for _, row := range presentSnapshots {
batch = append(batch, row)
}
missingCount := 0
for _, inv := range inventoryRows {
if strings.HasPrefix(inv.Name, "vCLS-") {
continue
}
vmID := inv.VmId.String
uuid := ""
if inv.VmUuid.Valid {
uuid = inv.VmUuid.String
}
name := inv.Name
found := false
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
found = true
}
}
if !found && uuid != "" {
if _, ok := presentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := presentByName[name]; ok {
found = true
}
}
if found {
continue
}
row := snapshotFromInventory(inv, startTime)
if !row.DeletionTime.Valid {
deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: row.DeletionTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime)
deletionsMarked = true
}
missingCount++
}
if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil {
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error())
return err
}
// Compare with previous snapshot for this vcenter to mark deletions at snapshot time.
if prevTable, err := latestHourlySnapshotBefore(ctx, dbConn, startTime); err == nil && prevTable != "" {
moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTable, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName)
missingCount += moreMissing
} else if err != nil {
c.Logger.Warn("failed to locate previous hourly snapshot for deletion comparison", "error", err, "url", url)
}
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
"missing_marked", missingCount,
)
metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil)
_ = db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, "")
if deletionsMarked {
if err := c.generateReport(ctx, tableName); err != nil {
c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName)
} else {
c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName)
}
}
return nil
}
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil:
return ""
case string:
return v
case []byte:
return string(v)
case bool:
if v {
return "TRUE"
}
return "FALSE"
case int:
if v != 0 {
return "TRUE"
}
return "FALSE"
case int64:
if v != 0 {
return "TRUE"
}
return "FALSE"
default:
return fmt.Sprint(v)
}
}
// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time.
func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) {
driver := strings.ToLower(dbConn.DriverName())
var rows *sqlx.Rows
var err error
switch driver {
case "sqlite":
rows, err = dbConn.QueryxContext(ctx, `
SELECT name FROM sqlite_master
WHERE type = 'table' AND name LIKE 'inventory_hourly_%'
`)
case "pgx", "postgres":
rows, err = dbConn.QueryxContext(ctx, `
SELECT tablename FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%'
`)
default:
return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver)
}
if err != nil {
return "", err
}
defer rows.Close()
var latest string
var latestTime int64
for rows.Next() {
var name string
if scanErr := rows.Scan(&name); scanErr != nil {
continue
}
if !strings.HasPrefix(name, "inventory_hourly_") {
continue
}
suffix := strings.TrimPrefix(name, "inventory_hourly_")
epoch, parseErr := strconv.ParseInt(suffix, 10, 64)
if parseErr != nil {
continue
}
if epoch < cutoff.Unix() && epoch > latestTime {
latestTime = epoch
latest = name
}
}
return latest, nil
}
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time,
currentByID map[string]inventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{},
invByID map[string]queries.Inventory, invByUuid map[string]queries.Inventory, invByName map[string]queries.Inventory) int {
if err := db.ValidateTableName(prevTable); err != nil {
return 0
}
query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable)
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
type prevRow struct {
VmId sql.NullString `db:"VmId"`
VmUuid sql.NullString `db:"VmUuid"`
Name string `db:"Name"`
Datacenter sql.NullString `db:"Datacenter"`
DeletionTime sql.NullInt64 `db:"DeletionTime"`
}
rows, err := dbConn.QueryxContext(ctx, query, vcenter)
if err != nil {
c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter)
return 0
}
defer rows.Close()
missing := 0
for rows.Next() {
var r prevRow
if err := rows.StructScan(&r); err != nil {
continue
}
vmID := r.VmId.String
uuid := r.VmUuid.String
name := r.Name
found := false
if vmID != "" {
if _, ok := currentByID[vmID]; ok {
found = true
}
}
if !found && uuid != "" {
if _, ok := currentByUuid[uuid]; ok {
found = true
}
}
if !found && name != "" {
if _, ok := currentByName[name]; ok {
found = true
}
}
if found {
continue
}
var inv queries.Inventory
var ok bool
if vmID != "" {
inv, ok = invByID[vmID]
}
if !ok && uuid != "" {
inv, ok = invByUuid[uuid]
}
if !ok && name != "" {
inv, ok = invByName[name]
}
if !ok {
continue
}
if inv.DeletionTime.Valid {
continue
}
delTime := sql.NullInt64{Int64: snapshotTime.Unix(), Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: delTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted from previous snapshot", "error", err, "vm_id", inv.VmId.String)
continue
}
c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable)
missing++
}
return missing
}