From ca8b39ba0e127315b4d6c46375211cd91780413c Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Wed, 14 Jan 2026 10:03:04 +1100 Subject: [PATCH] improve rollup reporting --- internal/report/snapshots.go | 124 ++++++++- internal/settings/settings.go | 1 + internal/tasks/inventorySnapshots.go | 383 +++++++++++++++++---------- src/vctp.yml | 1 + 4 files changed, 365 insertions(+), 144 deletions(-) diff --git a/internal/report/snapshots.go b/internal/report/snapshots.go index a1dbaf0..ddbd13c 100644 --- a/internal/report/snapshots.go +++ b/internal/report/snapshots.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "log/slog" + "strconv" "strings" "time" "vctp/db" @@ -187,6 +188,63 @@ ORDER BY snapshot_time DESC, table_name DESC return records, rows.Err() } +func ListSnapshotsByRange(ctx context.Context, database db.Database, snapshotType string, start time.Time, end time.Time) ([]SnapshotRecord, error) { + dbConn := database.DB() + driver := strings.ToLower(dbConn.DriverName()) + + startUnix := start.Unix() + endUnix := end.Unix() + + var rows *sqlx.Rows + var err error + + switch driver { + case "sqlite": + rows, err = dbConn.QueryxContext(ctx, ` +SELECT table_name, snapshot_time, snapshot_type +FROM snapshot_registry +WHERE snapshot_type = ? + AND snapshot_time >= ? + AND snapshot_time < ? +ORDER BY snapshot_time ASC, table_name ASC +`, snapshotType, startUnix, endUnix) + case "pgx", "postgres": + rows, err = dbConn.QueryxContext(ctx, ` +SELECT table_name, snapshot_time, snapshot_type +FROM snapshot_registry +WHERE snapshot_type = $1 + AND snapshot_time >= $2 + AND snapshot_time < $3 +ORDER BY snapshot_time ASC, table_name ASC +`, snapshotType, startUnix, endUnix) + default: + return nil, fmt.Errorf("unsupported driver for listing snapshots: %s", driver) + } + + if err != nil { + return nil, err + } + defer rows.Close() + + records := make([]SnapshotRecord, 0) + for rows.Next() { + var ( + tableName string + snapshotTime int64 + recordType string + ) + if err := rows.Scan(&tableName, &snapshotTime, &recordType); err != nil { + return nil, err + } + records = append(records, SnapshotRecord{ + TableName: tableName, + SnapshotTime: time.Unix(snapshotTime, 0), + SnapshotType: recordType, + }) + } + return records, rows.Err() +} + func FormatSnapshotLabel(snapshotType string, snapshotTime time.Time, tableName string) string { switch snapshotType { case "hourly": @@ -214,6 +272,23 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co return nil, fmt.Errorf("no columns found for table %s", tableName) } + humanizeTimes := strings.HasPrefix(tableName, "inventory_daily_summary_") || strings.HasPrefix(tableName, "inventory_monthly_summary_") + type columnSpec struct { + Name string + SourceIndex int + Humanize bool + } + specs := make([]columnSpec, 0, len(columns)+2) + for i, columnName := range columns { + specs = append(specs, columnSpec{Name: columnName, SourceIndex: i}) + if humanizeTimes && columnName == "CreationTime" { + specs = append(specs, columnSpec{Name: "CreationTimeReadable", SourceIndex: i, Humanize: true}) + } + if humanizeTimes && columnName == "DeletionTime" { + specs = append(specs, columnSpec{Name: "DeletionTimeReadable", SourceIndex: i, Humanize: true}) + } + } + query := fmt.Sprintf(`SELECT * FROM %s`, tableName) orderBy := snapshotOrderBy(columns) if orderBy != "" { @@ -240,12 +315,12 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co logger.Error("Error setting document properties", "error", err, "sheet_name", sheetName) } - for i, columnName := range columns { + for i, spec := range specs { cell := fmt.Sprintf("%s1", string(rune('A'+i))) - xlsx.SetCellValue(sheetName, cell, columnName) + xlsx.SetCellValue(sheetName, cell, spec.Name) } - if endCell, err := excelize.CoordinatesToCellName(len(columns), 1); err == nil { + if endCell, err := excelize.CoordinatesToCellName(len(specs), 1); err == nil { filterRange := "A1:" + endCell if err := xlsx.AutoFilter(sheetName, filterRange, nil); err != nil { logger.Error("Error setting autofilter", "error", err) @@ -269,9 +344,14 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co if err != nil { return nil, err } - for colIndex, value := range values { + for colIndex, spec := range specs { cell := fmt.Sprintf("%s%d", string(rune('A'+colIndex)), rowIndex) - xlsx.SetCellValue(sheetName, cell, normalizeCellValue(value)) + value := values[spec.SourceIndex] + if spec.Humanize { + xlsx.SetCellValue(sheetName, cell, formatEpochHuman(value)) + } else { + xlsx.SetCellValue(sheetName, cell, normalizeCellValue(value)) + } } rowIndex++ } @@ -410,3 +490,37 @@ func normalizeCellValue(value interface{}) interface{} { return v } } + +func formatEpochHuman(value interface{}) string { + var epoch int64 + switch v := value.(type) { + case nil: + return "" + case int64: + epoch = v + case int32: + epoch = int64(v) + case int: + epoch = int64(v) + case float64: + epoch = int64(v) + case []byte: + parsed, err := strconv.ParseInt(string(v), 10, 64) + if err != nil { + return "" + } + epoch = parsed + case string: + parsed, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return "" + } + epoch = parsed + default: + return "" + } + if epoch <= 0 { + return "" + } + return time.Unix(epoch, 0).Local().Format("Mon 02 Jan 2006 15:04:05 MST") +} diff --git a/internal/settings/settings.go b/internal/settings/settings.go index e930918..b4ef49f 100644 --- a/internal/settings/settings.go +++ b/internal/settings/settings.go @@ -36,6 +36,7 @@ type SettingsYML struct { VcenterInventoryPollingSeconds int `yaml:"vcenter_inventory_polling_seconds"` VcenterInventorySnapshotSeconds int `yaml:"vcenter_inventory_snapshot_seconds"` VcenterInventoryAggregateSeconds int `yaml:"vcenter_inventory_aggregate_seconds"` + HourlySnapshotConcurrency int `yaml:"hourly_snapshot_concurrency"` HourlySnapshotMaxAgeDays int `yaml:"hourly_snapshot_max_age_days"` DailySnapshotMaxAgeMonths int `yaml:"daily_snapshot_max_age_months"` SnapshotCleanupCron string `yaml:"snapshot_cleanup_cron"` diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 6d92bb6..39fc622 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -5,7 +5,10 @@ import ( "database/sql" "fmt" "log/slog" + "strconv" "strings" + "sync" + "sync/atomic" "time" "vctp/db/queries" "vctp/internal/report" @@ -63,124 +66,33 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo // reload settings in case vcenter list has changed c.Settings.ReadYMLSettings() + var wg sync.WaitGroup + var errCount int64 + concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency + var sem chan struct{} + if concurrencyLimit > 0 { + sem = make(chan struct{}, concurrencyLimit) + } for _, url := range c.Settings.Values.Settings.VcenterAddresses { - c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) - vc := vcenter.New(c.Logger, c.VcCreds) - if err := vc.Login(url); err != nil { - c.Logger.Error("unable to connect to vcenter for hourly snapshot", "error", err, "url", url) - continue + url := url + if sem != nil { + sem <- struct{}{} } - - vcVms, err := vc.GetAllVmReferences() - if err != nil { - c.Logger.Error("unable to get VMs from vcenter", "error", err, "url", url) - vc.Logout() - continue - } - canDetectMissing := len(vcVms) > 0 - if !canDetectMissing { - c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url) - } - - inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) - if err != nil { - c.Logger.Error("unable to query inventory table", "error", err, "url", url) - vc.Logout() - continue - } - - inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) - for _, inv := range inventoryRows { - if inv.VmId.Valid { - inventoryByVmID[inv.VmId.String] = inv + wg.Add(1) + go func() { + defer wg.Done() + if sem != nil { + defer func() { <-sem }() } - } - - presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) - totals := snapshotTotals{} - for _, vm := range vcVms { - if strings.HasPrefix(vm.Name(), "vCLS-") { - continue + if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, url); err != nil { + atomic.AddInt64(&errCount, 1) + c.Logger.Error("hourly snapshot failed", "error", err, "url", url) } - - vmObj, err := vc.ConvertObjToMoVM(vm) - if err != nil { - c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err) - continue - } - if vmObj.Config != nil && vmObj.Config.Template { - continue - } - - var inv *queries.Inventory - if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { - existingCopy := existing - inv = &existingCopy - } - - row, err := snapshotFromVM(vmObj, vc, startTime, inv) - if err != nil { - c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) - continue - } - row.IsPresent = "TRUE" - presentSnapshots[vm.Reference().Value] = row - - totals.VmCount++ - totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) - totals.RamTotal += nullInt64ToInt(row.RamGB) - totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) - } - - for _, row := range presentSnapshots { - if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { - c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String) - } - } - - if !canDetectMissing { - vc.Logout() - continue - } - - for _, inv := range inventoryRows { - if strings.HasPrefix(inv.Name, "vCLS-") { - continue - } - vmID := inv.VmId.String - if vmID != "" { - if _, ok := presentSnapshots[vmID]; ok { - continue - } - } - - row := snapshotFromInventory(inv, startTime) - row.IsPresent = "FALSE" - if !row.DeletionTime.Valid { - deletionTime := startTime.Unix() - row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} - if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ - DeletionTime: row.DeletionTime, - VmId: inv.VmId, - DatacenterName: inv.Datacenter, - }); err != nil { - c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) - } - } - if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { - c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String) - } - } - - vc.Logout() - - c.Logger.Info("Hourly snapshot summary", - "vcenter", url, - "vm_count", totals.VmCount, - "vcpu_total", totals.VcpuTotal, - "ram_total_gb", totals.RamTotal, - "disk_total_gb", totals.DiskTotal, - ) + }() + } + wg.Wait() + if errCount > 0 { + return fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount) } c.Logger.Debug("Finished hourly vcenter snapshot") @@ -190,10 +102,8 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error { targetTime := time.Now().Add(-time.Minute) - sourceTable, err := hourlyInventoryTableName(targetTime) - if err != nil { - return err - } + dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location()) + dayEnd := dayStart.AddDate(0, 0, 1) summaryTable, err := dailySummaryTableName(targetTime) if err != nil { return err @@ -206,13 +116,38 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { return err } + if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil { + return err + } else if rowsExist { + c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable) + return nil + } - currentTotals, err := snapshotTotalsForTable(ctx, dbConn, sourceTable) + hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd) if err != nil { - c.Logger.Warn("unable to calculate daily totals", "error", err, "table", sourceTable) + return err + } + if len(hourlySnapshots) == 0 { + return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) + } + + hourlyTables := make([]string, 0, len(hourlySnapshots)) + for _, snapshot := range hourlySnapshots { + hourlyTables = append(hourlyTables, snapshot.TableName) + } + unionQuery := buildUnionQuery(hourlyTables, []string{ + `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, + `"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, + `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, + `"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`, + }) + + currentTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery) + if err != nil { + c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot totals", - "table", sourceTable, + "date", dayStart.Format("2006-01-02"), "vm_count", currentTotals.VmCount, "vcpu_total", currentTotals.VcpuTotal, "ram_total_gb", currentTotals.RamTotal, @@ -220,15 +155,27 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo ) } - prevTable, _ := hourlyInventoryTableName(targetTime.AddDate(0, 0, -1)) - if prevTable != "" && tableExists(ctx, dbConn, prevTable) { - prevTotals, err := snapshotTotalsForTable(ctx, dbConn, prevTable) + prevStart := dayStart.AddDate(0, 0, -1) + prevEnd := dayStart + prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd) + if err == nil && len(prevSnapshots) > 0 { + prevTables := make([]string, 0, len(prevSnapshots)) + for _, snapshot := range prevSnapshots { + prevTables = append(prevTables, snapshot.TableName) + } + prevUnion := buildUnionQuery(prevTables, []string{ + `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, + `"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, + `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, + `"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`, + }) + prevTotals, err := snapshotTotalsForUnion(ctx, dbConn, prevUnion) if err != nil { - c.Logger.Warn("unable to calculate previous day totals", "error", err, "table", prevTable) + c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02")) } else { c.Logger.Info("Daily snapshot comparison", - "current_table", sourceTable, - "previous_table", prevTable, + "current_date", dayStart.Format("2006-01-02"), + "previous_date", prevStart.Format("2006-01-02"), "vm_delta", currentTotals.VmCount-prevTotals.VmCount, "vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal, "ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal, @@ -262,22 +209,24 @@ SELECT / NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct", 100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END) / NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct" -FROM %s +FROM ( +%s +) snapshots GROUP BY "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid"; -`, summaryTable, sourceTable) +`, summaryTable, unionQuery) if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil { - c.Logger.Error("failed to aggregate daily inventory", "error", err, "source_table", sourceTable) + c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02")) return err } - if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, targetTime); err != nil { + if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart); err != nil { c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable) } - c.Logger.Debug("Finished daily inventory aggregation", "source_table", sourceTable, "summary_table", summaryTable) + c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) return nil } @@ -287,13 +236,18 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog. firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) targetMonth := firstOfThisMonth.AddDate(0, -1, 0) - monthPrefix := fmt.Sprintf("inventory_hourly_%s", targetMonth.Format("200601")) - dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, monthPrefix) + if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { + return err + } + + monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location()) + monthEnd := monthStart.AddDate(0, 1, 0) + dailySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", monthStart, monthEnd) if err != nil { return err } - if len(dailyTables) == 0 { - return fmt.Errorf("no daily snapshot tables found for %s", targetMonth.Format("2006-01")) + if len(dailySnapshots) == 0 { + return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01")) } monthlyTable, err := monthlySummaryTableName(targetMonth) @@ -305,10 +259,17 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog. if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil { return err } - if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil { + if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil { return err + } else if rowsExist { + c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable) + return nil } + dailyTables := make([]string, 0, len(dailySnapshots)) + for _, snapshot := range dailySnapshots { + dailyTables = append(dailyTables, snapshot.TableName) + } unionQuery := buildUnionQuery(dailyTables, []string{ `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, `"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, @@ -398,7 +359,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) if strings.HasPrefix(table, "inventory_daily_summary_") { continue } - tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "2006010215") + tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch") if !ok { continue } @@ -446,7 +407,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger) } func hourlyInventoryTableName(t time.Time) (string, error) { - return safeTableName(fmt.Sprintf("inventory_hourly_%s", t.Format("2006010215"))) + return safeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix())) } func dailySummaryTableName(t time.Time) (string, error) { @@ -617,6 +578,13 @@ func parseSnapshotDate(table string, prefix string, layout string) (time.Time, b return time.Time{}, false } suffix := strings.TrimPrefix(table, prefix) + if layout == "epoch" { + epoch, err := strconv.ParseInt(suffix, 10, 64) + if err != nil { + return time.Time{}, false + } + return time.Unix(epoch, 0), true + } parsed, err := time.Parse(layout, suffix) if err != nil { return time.Time{}, false @@ -636,6 +604,21 @@ func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error return err } +func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) { + if _, err := safeTableName(table); err != nil { + return false, err + } + query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table) + var exists int + if err := dbConn.GetContext(ctx, &exists, query); err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + return true, nil +} + type snapshotTotals struct { VmCount int64 VcpuTotal int64 @@ -938,6 +921,128 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); return err } +func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { + c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) + vc := vcenter.New(c.Logger, c.VcCreds) + if err := vc.Login(url); err != nil { + return fmt.Errorf("unable to connect to vcenter: %w", err) + } + defer vc.Logout() + + vcVms, err := vc.GetAllVmReferences() + if err != nil { + return fmt.Errorf("unable to get VMs from vcenter: %w", err) + } + canDetectMissing := len(vcVms) > 0 + if !canDetectMissing { + c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url) + } + + inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) + if err != nil { + return fmt.Errorf("unable to query inventory table: %w", err) + } + + inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows)) + for _, inv := range inventoryRows { + if inv.VmId.Valid { + inventoryByVmID[inv.VmId.String] = inv + } + } + + dbConn := c.Database.DB() + presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms)) + totals := snapshotTotals{} + for _, vm := range vcVms { + if strings.HasPrefix(vm.Name(), "vCLS-") { + continue + } + + vmObj, err := vc.ConvertObjToMoVM(vm) + if err != nil { + c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err) + continue + } + if vmObj.Config != nil && vmObj.Config.Template { + continue + } + + var inv *queries.Inventory + if existing, ok := inventoryByVmID[vm.Reference().Value]; ok { + existingCopy := existing + inv = &existingCopy + } + + row, err := snapshotFromVM(vmObj, vc, startTime, inv) + if err != nil { + c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err) + continue + } + row.IsPresent = "TRUE" + presentSnapshots[vm.Reference().Value] = row + + totals.VmCount++ + totals.VcpuTotal += nullInt64ToInt(row.VcpuCount) + totals.RamTotal += nullInt64ToInt(row.RamGB) + totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk) + } + + for _, row := range presentSnapshots { + if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { + c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String) + } + } + + if !canDetectMissing { + c.Logger.Info("Hourly snapshot summary", + "vcenter", url, + "vm_count", totals.VmCount, + "vcpu_total", totals.VcpuTotal, + "ram_total_gb", totals.RamTotal, + "disk_total_gb", totals.DiskTotal, + ) + return nil + } + + for _, inv := range inventoryRows { + if strings.HasPrefix(inv.Name, "vCLS-") { + continue + } + vmID := inv.VmId.String + if vmID != "" { + if _, ok := presentSnapshots[vmID]; ok { + continue + } + } + + row := snapshotFromInventory(inv, startTime) + row.IsPresent = "FALSE" + if !row.DeletionTime.Valid { + deletionTime := startTime.Unix() + row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true} + if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{ + DeletionTime: row.DeletionTime, + VmId: inv.VmId, + DatacenterName: inv.Datacenter, + }); err != nil { + c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) + } + } + if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil { + c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String) + } + } + + c.Logger.Info("Hourly snapshot summary", + "vcenter", url, + "vm_count", totals.VmCount, + "vcpu_total", totals.VcpuTotal, + "ram_total_gb", totals.RamTotal, + "disk_total_gb", totals.DiskTotal, + ) + return nil +} + func boolStringFromInterface(value interface{}) string { switch v := value.(type) { case nil: diff --git a/src/vctp.yml b/src/vctp.yml index 64d3f20..0d47021 100644 --- a/src/vctp.yml +++ b/src/vctp.yml @@ -15,6 +15,7 @@ settings: vcenter_inventory_polling_seconds: 7200 vcenter_inventory_snapshot_seconds: 3600 vcenter_inventory_aggregate_seconds: 86400 + hourly_snapshot_concurrency: 0 hourly_snapshot_max_age_days: 60 daily_snapshot_max_age_months: 12 snapshot_cleanup_cron: "30 2 * * *"