improve rollup reporting
Some checks failed
continuous-integration/drone/push Build was killed

This commit is contained in:
2026-01-14 10:03:04 +11:00
parent 7400e08c54
commit ca8b39ba0e
4 changed files with 365 additions and 144 deletions

View File

@@ -5,7 +5,10 @@ import (
"database/sql"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"vctp/db/queries"
"vctp/internal/report"
@@ -63,124 +66,33 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
// reload settings in case vcenter list has changed
c.Settings.ReadYMLSettings()
var wg sync.WaitGroup
var errCount int64
concurrencyLimit := c.Settings.Values.Settings.HourlySnapshotConcurrency
var sem chan struct{}
if concurrencyLimit > 0 {
sem = make(chan struct{}, concurrencyLimit)
}
for _, url := range c.Settings.Values.Settings.VcenterAddresses {
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil {
c.Logger.Error("unable to connect to vcenter for hourly snapshot", "error", err, "url", url)
continue
url := url
if sem != nil {
sem <- struct{}{}
}
vcVms, err := vc.GetAllVmReferences()
if err != nil {
c.Logger.Error("unable to get VMs from vcenter", "error", err, "url", url)
vc.Logout()
continue
}
canDetectMissing := len(vcVms) > 0
if !canDetectMissing {
c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url)
}
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
if err != nil {
c.Logger.Error("unable to query inventory table", "error", err, "url", url)
vc.Logout()
continue
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
wg.Add(1)
go func() {
defer wg.Done()
if sem != nil {
defer func() { <-sem }()
}
}
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
totals := snapshotTotals{}
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name(), "vCLS-") {
continue
if err := c.captureHourlySnapshotForVcenter(ctx, startTime, tableName, url); err != nil {
atomic.AddInt64(&errCount, 1)
c.Logger.Error("hourly snapshot failed", "error", err, "url", url)
}
vmObj, err := vc.ConvertObjToMoVM(vm)
if err != nil {
c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err)
continue
}
if vmObj.Config != nil && vmObj.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(vmObj, vc, startTime, inv)
if err != nil {
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
totals.RamTotal += nullInt64ToInt(row.RamGB)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
for _, row := range presentSnapshots {
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String)
}
}
if !canDetectMissing {
vc.Logout()
continue
}
for _, inv := range inventoryRows {
if strings.HasPrefix(inv.Name, "vCLS-") {
continue
}
vmID := inv.VmId.String
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
continue
}
}
row := snapshotFromInventory(inv, startTime)
row.IsPresent = "FALSE"
if !row.DeletionTime.Valid {
deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: row.DeletionTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
}
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String)
}
}
vc.Logout()
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
}()
}
wg.Wait()
if errCount > 0 {
return fmt.Errorf("hourly snapshot failed for %d vcenter(s)", errCount)
}
c.Logger.Debug("Finished hourly vcenter snapshot")
@@ -190,10 +102,8 @@ func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Lo
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Logger) error {
targetTime := time.Now().Add(-time.Minute)
sourceTable, err := hourlyInventoryTableName(targetTime)
if err != nil {
return err
}
dayStart := time.Date(targetTime.Year(), targetTime.Month(), targetTime.Day(), 0, 0, 0, 0, targetTime.Location())
dayEnd := dayStart.AddDate(0, 0, 1)
summaryTable, err := dailySummaryTableName(targetTime)
if err != nil {
return err
@@ -206,13 +116,38 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
if rowsExist, err := tableHasRows(ctx, dbConn, summaryTable); err != nil {
return err
} else if rowsExist {
c.Logger.Debug("Daily summary already exists, skipping aggregation", "summary_table", summaryTable)
return nil
}
currentTotals, err := snapshotTotalsForTable(ctx, dbConn, sourceTable)
hourlySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", dayStart, dayEnd)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "table", sourceTable)
return err
}
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
hourlyTables := make([]string, 0, len(hourlySnapshots))
for _, snapshot := range hourlySnapshots {
hourlyTables = append(hourlyTables, snapshot.TableName)
}
unionQuery := buildUnionQuery(hourlyTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
currentTotals, err := snapshotTotalsForUnion(ctx, dbConn, unionQuery)
if err != nil {
c.Logger.Warn("unable to calculate daily totals", "error", err, "date", dayStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot totals",
"table", sourceTable,
"date", dayStart.Format("2006-01-02"),
"vm_count", currentTotals.VmCount,
"vcpu_total", currentTotals.VcpuTotal,
"ram_total_gb", currentTotals.RamTotal,
@@ -220,15 +155,27 @@ func (c *CronTask) RunVcenterDailyAggregate(ctx context.Context, logger *slog.Lo
)
}
prevTable, _ := hourlyInventoryTableName(targetTime.AddDate(0, 0, -1))
if prevTable != "" && tableExists(ctx, dbConn, prevTable) {
prevTotals, err := snapshotTotalsForTable(ctx, dbConn, prevTable)
prevStart := dayStart.AddDate(0, 0, -1)
prevEnd := dayStart
prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
}
prevUnion := buildUnionQuery(prevTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
})
prevTotals, err := snapshotTotalsForUnion(ctx, dbConn, prevUnion)
if err != nil {
c.Logger.Warn("unable to calculate previous day totals", "error", err, "table", prevTable)
c.Logger.Warn("unable to calculate previous day totals", "error", err, "date", prevStart.Format("2006-01-02"))
} else {
c.Logger.Info("Daily snapshot comparison",
"current_table", sourceTable,
"previous_table", prevTable,
"current_date", dayStart.Format("2006-01-02"),
"previous_date", prevStart.Format("2006-01-02"),
"vm_delta", currentTotals.VmCount-prevTotals.VmCount,
"vcpu_delta", currentTotals.VcpuTotal-prevTotals.VcpuTotal,
"ram_delta_gb", currentTotals.RamTotal-prevTotals.RamTotal,
@@ -262,22 +209,24 @@ SELECT
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolSilverPct",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "PoolGoldPct"
FROM %s
FROM (
%s
) snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, summaryTable, sourceTable)
`, summaryTable, unionQuery)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "source_table", sourceTable)
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
return err
}
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, targetTime); err != nil {
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart); err != nil {
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
}
c.Logger.Debug("Finished daily inventory aggregation", "source_table", sourceTable, "summary_table", summaryTable)
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
return nil
}
@@ -287,13 +236,18 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
monthPrefix := fmt.Sprintf("inventory_hourly_%s", targetMonth.Format("200601"))
dailyTables, err := report.ListTablesByPrefix(ctx, c.Database, monthPrefix)
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
return err
}
monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location())
monthEnd := monthStart.AddDate(0, 1, 0)
dailySnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", monthStart, monthEnd)
if err != nil {
return err
}
if len(dailyTables) == 0 {
return fmt.Errorf("no daily snapshot tables found for %s", targetMonth.Format("2006-01"))
if len(dailySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01"))
}
monthlyTable, err := monthlySummaryTableName(targetMonth)
@@ -305,10 +259,17 @@ func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
if rowsExist, err := tableHasRows(ctx, dbConn, monthlyTable); err != nil {
return err
} else if rowsExist {
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
return nil
}
dailyTables := make([]string, 0, len(dailySnapshots))
for _, snapshot := range dailySnapshots {
dailyTables = append(dailyTables, snapshot.TableName)
}
unionQuery := buildUnionQuery(dailyTables, []string{
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"VmType"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
@@ -398,7 +359,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
if strings.HasPrefix(table, "inventory_daily_summary_") {
continue
}
tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "2006010215")
tableDate, ok := parseSnapshotDate(table, "inventory_hourly_", "epoch")
if !ok {
continue
}
@@ -446,7 +407,7 @@ func (c *CronTask) RunSnapshotCleanup(ctx context.Context, logger *slog.Logger)
}
func hourlyInventoryTableName(t time.Time) (string, error) {
return safeTableName(fmt.Sprintf("inventory_hourly_%s", t.Format("2006010215")))
return safeTableName(fmt.Sprintf("inventory_hourly_%d", t.Unix()))
}
func dailySummaryTableName(t time.Time) (string, error) {
@@ -617,6 +578,13 @@ func parseSnapshotDate(table string, prefix string, layout string) (time.Time, b
return time.Time{}, false
}
suffix := strings.TrimPrefix(table, prefix)
if layout == "epoch" {
epoch, err := strconv.ParseInt(suffix, 10, 64)
if err != nil {
return time.Time{}, false
}
return time.Unix(epoch, 0), true
}
parsed, err := time.Parse(layout, suffix)
if err != nil {
return time.Time{}, false
@@ -636,6 +604,21 @@ func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error
return err
}
func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
if _, err := safeTableName(table); err != nil {
return false, err
}
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
var exists int
if err := dbConn.GetContext(ctx, &exists, query); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
type snapshotTotals struct {
VmCount int64
VcpuTotal int64
@@ -938,6 +921,128 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
return err
}
func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error {
c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url)
vc := vcenter.New(c.Logger, c.VcCreds)
if err := vc.Login(url); err != nil {
return fmt.Errorf("unable to connect to vcenter: %w", err)
}
defer vc.Logout()
vcVms, err := vc.GetAllVmReferences()
if err != nil {
return fmt.Errorf("unable to get VMs from vcenter: %w", err)
}
canDetectMissing := len(vcVms) > 0
if !canDetectMissing {
c.Logger.Warn("no VMs returned from vcenter; skipping missing VM detection", "url", url)
}
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url)
if err != nil {
return fmt.Errorf("unable to query inventory table: %w", err)
}
inventoryByVmID := make(map[string]queries.Inventory, len(inventoryRows))
for _, inv := range inventoryRows {
if inv.VmId.Valid {
inventoryByVmID[inv.VmId.String] = inv
}
}
dbConn := c.Database.DB()
presentSnapshots := make(map[string]inventorySnapshotRow, len(vcVms))
totals := snapshotTotals{}
for _, vm := range vcVms {
if strings.HasPrefix(vm.Name(), "vCLS-") {
continue
}
vmObj, err := vc.ConvertObjToMoVM(vm)
if err != nil {
c.Logger.Error("failed to read VM details", "vm_id", vm.Reference().Value, "error", err)
continue
}
if vmObj.Config != nil && vmObj.Config.Template {
continue
}
var inv *queries.Inventory
if existing, ok := inventoryByVmID[vm.Reference().Value]; ok {
existingCopy := existing
inv = &existingCopy
}
row, err := snapshotFromVM(vmObj, vc, startTime, inv)
if err != nil {
c.Logger.Error("unable to build snapshot for VM", "vm_id", vm.Reference().Value, "error", err)
continue
}
row.IsPresent = "TRUE"
presentSnapshots[vm.Reference().Value] = row
totals.VmCount++
totals.VcpuTotal += nullInt64ToInt(row.VcpuCount)
totals.RamTotal += nullInt64ToInt(row.RamGB)
totals.DiskTotal += nullFloat64ToFloat(row.ProvisionedDisk)
}
for _, row := range presentSnapshots {
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert hourly snapshot", "error", err, "vm_id", row.VmId.String)
}
}
if !canDetectMissing {
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
return nil
}
for _, inv := range inventoryRows {
if strings.HasPrefix(inv.Name, "vCLS-") {
continue
}
vmID := inv.VmId.String
if vmID != "" {
if _, ok := presentSnapshots[vmID]; ok {
continue
}
}
row := snapshotFromInventory(inv, startTime)
row.IsPresent = "FALSE"
if !row.DeletionTime.Valid {
deletionTime := startTime.Unix()
row.DeletionTime = sql.NullInt64{Int64: deletionTime, Valid: true}
if err := c.Database.Queries().InventoryMarkDeleted(ctx, queries.InventoryMarkDeletedParams{
DeletionTime: row.DeletionTime,
VmId: inv.VmId,
DatacenterName: inv.Datacenter,
}); err != nil {
c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String)
}
}
if err := insertDailyInventoryRow(ctx, dbConn, tableName, row); err != nil {
c.Logger.Error("failed to insert missing VM snapshot", "error", err, "vm_id", row.VmId.String)
}
}
c.Logger.Info("Hourly snapshot summary",
"vcenter", url,
"vm_count", totals.VmCount,
"vcpu_total", totals.VcpuTotal,
"ram_total_gb", totals.RamTotal,
"disk_total_gb", totals.DiskTotal,
)
return nil
}
func boolStringFromInterface(value interface{}) string {
switch v := value.(type) {
case nil: