From 2483091861cb9ef306ede41cd5ad79642fc4a48b Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Wed, 21 Jan 2026 10:25:04 +1100 Subject: [PATCH] improve logging and concurrent vcenter inventory --- db/helpers.go | 28 ++++++++++- internal/tasks/dailyAggregate.go | 6 +++ internal/tasks/inventoryHelpers.go | 56 +++++++++++++++------ internal/tasks/inventoryLifecycle.go | 7 +-- internal/tasks/inventorySnapshots.go | 73 +++++++++++++++------------- 5 files changed, 115 insertions(+), 55 deletions(-) diff --git a/db/helpers.go b/db/helpers.go index 8a2f65a..db73df7 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -75,7 +75,12 @@ func getLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string slog.Debug("db get returned no rows", "query", strings.TrimSpace(query)) return err } - slog.Warn("db get failed", "query", strings.TrimSpace(query), "error", err) + // Soften logging for timeout/cancel scenarios commonly hit during best-effort probes. + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + slog.Debug("db get timed out", "query", strings.TrimSpace(query), "error", err) + } else { + slog.Warn("db get failed", "query", strings.TrimSpace(query), "error", err) + } } return err } @@ -83,7 +88,11 @@ func getLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string func selectLog(ctx context.Context, dbConn *sqlx.DB, dest interface{}, query string, args ...interface{}) error { err := dbConn.SelectContext(ctx, dest, query, args...) if err != nil { - slog.Warn("db select failed", "query", strings.TrimSpace(query), "error", err) + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + slog.Debug("db select timed out", "query", strings.TrimSpace(query), "error", err) + } else { + slog.Warn("db select failed", "query", strings.TrimSpace(query), "error", err) + } } return err } @@ -395,6 +404,7 @@ func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) { `PRAGMA synchronous=NORMAL;`, `PRAGMA temp_store=MEMORY;`, `PRAGMA optimize;`, + `PRAGMA busy_timeout=5000;`, } for _, pragma := range pragmas { _, err = execLog(ctx, dbConn, pragma) @@ -404,6 +414,20 @@ func ApplySQLiteTuning(ctx context.Context, dbConn *sqlx.DB) { } } +// CheckpointSQLite forces a WAL checkpoint (truncate) when using SQLite. No-op for other drivers. +func CheckpointSQLite(ctx context.Context, dbConn *sqlx.DB) error { + if strings.ToLower(dbConn.DriverName()) != "sqlite" { + return nil + } + if ctx == nil { + ctx = context.Background() + } + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + _, err := dbConn.ExecContext(cctx, `PRAGMA wal_checkpoint(TRUNCATE);`) + return err +} + // EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations. func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error { ddl := ` diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index f0ed3ee..2658ac4 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -169,6 +169,9 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti metrics.RecordDailyAggregation(time.Since(jobStart), err) return err } + if err := db.CheckpointSQLite(ctx, dbConn); err != nil { + c.Logger.Warn("failed to checkpoint sqlite after daily aggregation", "error", err) + } c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable) metrics.RecordDailyAggregation(time.Since(jobStart), nil) @@ -370,6 +373,9 @@ LIMIT 1 c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable) return err } + if err := db.CheckpointSQLite(ctx, dbConn); err != nil { + c.Logger.Warn("failed to checkpoint sqlite after daily aggregation (Go path)", "error", err) + } c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, diff --git a/internal/tasks/inventoryHelpers.go b/internal/tasks/inventoryHelpers.go index e40cd62..7e5b7af 100644 --- a/internal/tasks/inventoryHelpers.go +++ b/internal/tasks/inventoryHelpers.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "log/slog" "strconv" "strings" "time" @@ -44,7 +45,7 @@ func boolStringFromInterface(value interface{}) string { // latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables. func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) { - tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1) + tables, err := listLatestHourlyWithRows(ctx, dbConn, "", cutoff.Unix(), 1, nil) if err != nil { return "", err } @@ -68,7 +69,7 @@ func parseSnapshotTime(table string) (int64, bool) { } // listLatestHourlyWithRows returns recent hourly snapshot tables (ordered desc by time) that have rows, optionally filtered by vcenter. -func listLatestHourlyWithRows(ctx context.Context, dbConn *sqlx.DB, vcenter string, beforeUnix int64, limit int) ([]snapshotTable, error) { +func listLatestHourlyWithRows(ctx context.Context, dbConn *sqlx.DB, vcenter string, beforeUnix int64, limit int, logger *slog.Logger) ([]snapshotTable, error) { if limit <= 0 { limit = 50 } @@ -88,31 +89,54 @@ LIMIT ? for rows.Next() { var name string var ts int64 - var count int64 + var count sql.NullInt64 if scanErr := rows.Scan(&name, &ts, &count); scanErr != nil { continue } if err := db.ValidateTableName(name); err != nil { continue } - // Use snapshot_count first; fall back to row check (and vcenter filter) only when needed. - if count == 0 { - if has, _ := db.TableHasRows(ctx, dbConn, name); !has { - continue + if count.Valid && count.Int64 == 0 { + if logger != nil { + logger.Debug("skipping snapshot table with zero count", "table", name, "snapshot_time", ts) + } + continue + } + probed := false + hasRows := true + start := time.Now() + if !count.Valid { + probed = true + if ok, err := db.TableHasRows(ctx, dbConn, name); err == nil { + hasRows = ok + } else { + hasRows = false + if logger != nil { + logger.Debug("snapshot table probe failed", "table", name, "error", err) + } } } - if vcenter != "" { + if vcenter != "" && hasRows { + probed = true vrows, qerr := querySnapshotRows(ctx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter) - if qerr != nil { - continue - } - hasVcenter := vrows.Next() - vrows.Close() - if !hasVcenter { - continue + if qerr == nil { + hasRows = vrows.Next() + vrows.Close() + } else { + hasRows = false + if logger != nil { + logger.Debug("snapshot vcenter filter probe failed", "table", name, "vcenter", vcenter, "error", qerr) + } } } - out = append(out, snapshotTable{Table: name, Time: ts}) + elapsed := time.Since(start) + if logger != nil { + logger.Debug("evaluated snapshot table", "table", name, "snapshot_time", ts, "snapshot_count", count, "probed", probed, "has_rows", hasRows, "elapsed", elapsed) + } + if !hasRows { + continue + } + out = append(out, snapshotTable{Table: name, Time: ts, Count: count}) } return out, nil } diff --git a/internal/tasks/inventoryLifecycle.go b/internal/tasks/inventoryLifecycle.go index 270ee01..0c5e4fe 100644 --- a/internal/tasks/inventoryLifecycle.go +++ b/internal/tasks/inventoryLifecycle.go @@ -111,13 +111,14 @@ WHERE "Vcenter" = ? AND ("DeletedAt" IS NULL OR "DeletedAt" = 0) } type snapshotTable struct { - Table string `db:"table_name"` - Time int64 `db:"snapshot_time"` + Table string `db:"table_name"` + Time int64 `db:"snapshot_time"` + Count sql.NullInt64 `db:"snapshot_count"` } func listHourlyTablesForDay(ctx context.Context, dbConn *sqlx.DB, dayStart, dayEnd time.Time) ([]snapshotTable, error) { rows, err := dbConn.QueryxContext(ctx, ` -SELECT table_name, snapshot_time +SELECT table_name, snapshot_time, snapshot_count FROM snapshot_registry WHERE snapshot_type = 'hourly' AND snapshot_time >= ? AND snapshot_time < ? ORDER BY snapshot_time ASC diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index ed5fd80..80ba4a9 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -23,6 +23,8 @@ import ( "github.com/vmware/govmomi/vim25/types" ) +type ctxLoggerKey struct{} + // RunVcenterSnapshotHourly records hourly inventory snapshots into a daily table. // If force is true, any in-progress marker will be cleared before starting (useful for manual recovery). func (c *CronTask) RunVcenterSnapshotHourly(ctx context.Context, logger *slog.Logger, force bool) (err error) { @@ -701,13 +703,15 @@ func snapshotFromInventory(inv queries.Inventory, snapshotTime time.Time) Invent } func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTime time.Time, tableName string, url string) error { + log := c.Logger.With("vcenter", url) + ctx = context.WithValue(ctx, ctxLoggerKey{}, log) started := time.Now() - c.Logger.Debug("connecting to vcenter for hourly snapshot", "url", url) + log.Debug("connecting to vcenter for hourly snapshot", "url", url) vc := vcenter.New(c.Logger, c.VcCreds) if err := vc.Login(url); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { - c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) + log.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to connect to vcenter: %w", err) } @@ -715,9 +719,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim logCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := vc.Logout(logCtx); err != nil { - c.Logger.Warn("vcenter logout failed", "url", url, "error", err) + log.Warn("vcenter logout failed", "url", url, "error", err) } else { - c.Logger.Debug("vcenter logout succeeded", "url", url) + log.Debug("vcenter logout succeeded", "url", url) } }() @@ -725,34 +729,34 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim if err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), 0, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { - c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) + log.Warn("failed to record snapshot run", "url", url, "error", upErr) } return fmt.Errorf("unable to get VMs from vcenter: %w", err) } - c.Logger.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vcVms)) + log.Debug("retrieved VMs from vcenter", "url", url, "vm_count", len(vcVms)) if err := db.EnsureVmIdentityTables(ctx, c.Database.DB()); err != nil { - c.Logger.Warn("failed to ensure vm identity tables", "error", err) + log.Warn("failed to ensure vm identity tables", "error", err) } hostLookup, err := vc.BuildHostLookup() if err != nil { - c.Logger.Warn("failed to build host lookup", "url", url, "error", err) + log.Warn("failed to build host lookup", "url", url, "error", err) hostLookup = nil } else { - c.Logger.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) + log.Debug("built host lookup", "url", url, "hosts", len(hostLookup)) } folderLookup, err := vc.BuildFolderPathLookup() if err != nil { - c.Logger.Warn("failed to build folder lookup", "url", url, "error", err) + log.Warn("failed to build folder lookup", "url", url, "error", err) folderLookup = nil } else { - c.Logger.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) + log.Debug("built folder lookup", "url", url, "folders", len(folderLookup)) } rpLookup, err := vc.BuildResourcePoolLookup() if err != nil { - c.Logger.Warn("failed to build resource pool lookup", "url", url, "error", err) + log.Warn("failed to build resource pool lookup", "url", url, "error", err) rpLookup = nil } else { - c.Logger.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) + log.Debug("built resource pool lookup", "url", url, "pools", len(rpLookup)) } inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, url) @@ -844,7 +848,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim for _, row := range presentSnapshots { batch = append(batch, row) } - c.Logger.Debug("checking inventory for missing VMs", "vcenter", url) + log.Debug("checking inventory for missing VMs") missingCount := 0 newCount := 0 @@ -891,9 +895,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim VmId: inv.VmId, DatacenterName: inv.Datacenter, }); err != nil { - c.Logger.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) + log.Warn("failed to mark inventory record deleted", "error", err, "vm_id", row.VmId.String) } - c.Logger.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) + log.Debug("Marked VM as deleted", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", url, "snapshot_time", startTime) deletionsMarked = true } clusterName := "" @@ -909,10 +913,10 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim datacenter: inv.Datacenter, }) if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil { - c.Logger.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) + log.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err) } if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil { - c.Logger.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) + log.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err) } missingCount++ } @@ -927,9 +931,9 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { - c.Logger.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) + log.Warn("failed to fetch vcenter deletion events", "vcenter", url, "error", err) } else { - c.Logger.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) + log.Debug("fetched vcenter deletion events", "vcenter", url, "count", len(events), "window_start_local", begin, "window_end_local", end, "window_minutes", end.Sub(begin).Minutes(), "window_start_utc", begin.UTC(), "window_end_utc", end.UTC()) for _, cand := range candidates { if t, ok := events[cand.vmID]; ok { delTs := sql.NullInt64{Int64: t.Unix(), Valid: true} @@ -938,12 +942,12 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim VmId: sql.NullString{String: cand.vmID, Valid: cand.vmID != ""}, DatacenterName: cand.datacenter, }); err != nil { - c.Logger.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + log.Warn("failed to update inventory deletion time from event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, cand.vmID, cand.vmUUID, cand.name, cand.cluster, t.Unix()); err != nil { - c.Logger.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) + log.Warn("failed to refine lifecycle cache deletion time", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "vcenter", url, "error", err) } - c.Logger.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) + log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t) } } } @@ -958,22 +962,22 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim end := startTime events, err := vc.FindVmDeletionEvents(ctx, begin, end) if err != nil { - c.Logger.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) + log.Warn("count-drop: failed to fetch vcenter deletion events", "vcenter", url, "error", err, "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount) } else { - c.Logger.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end) + log.Info("count-drop: deletion events fetched", "vcenter", url, "events", len(events), "prev_vm_count", prevVmCount.Int64, "current_vm_count", totals.VmCount, "window_start", begin, "window_end", end) } } - c.Logger.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) + log.Debug("inserting hourly snapshot batch", "vcenter", url, "rows", len(batch)) if err := insertHourlyCache(ctx, dbConn, batch); err != nil { - c.Logger.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) + log.Warn("failed to insert hourly cache rows", "vcenter", url, "error", err) } if err := insertHourlyBatch(ctx, dbConn, tableName, batch); err != nil { metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, err) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, false, err.Error()); upErr != nil { - c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) + log.Warn("failed to record snapshot run", "url", url, "error", upErr) } return err } @@ -982,6 +986,7 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim slog.Warn("failed to insert vcenter totals", "vcenter", url, "snapshot_time", startTime.Unix(), "error", err) } + // Discover previous snapshots once per run (serial) to avoid concurrent probes across vCenters. prevTableName, newCount, missingCount = c.compareWithPreviousSnapshot(ctx, dbConn, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName, missingCount) // If VM count dropped versus totals and we still haven't marked missing, try another comparison + wider event window. @@ -1056,11 +1061,11 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim } // Backfill lifecycle deletions for VMs missing from inventory and without DeletedAt. - if err := backfillLifecycleDeletionsToday(ctx, c.Logger, dbConn, url, startTime, presentSnapshots); err != nil { - c.Logger.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err) + if err := backfillLifecycleDeletionsToday(ctx, log, dbConn, url, startTime, presentSnapshots); err != nil { + log.Warn("failed to backfill lifecycle deletions for today", "vcenter", url, "error", err) } - c.Logger.Info("Hourly snapshot summary", + log.Info("Hourly snapshot summary", "vcenter", url, "vm_count", totals.VmCount, "vcpu_total", totals.VcpuTotal, @@ -1072,13 +1077,13 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim ) metrics.RecordVcenterSnapshot(url, time.Since(started), totals.VmCount, nil) if upErr := db.UpsertSnapshotRun(ctx, c.Database.DB(), url, startTime, true, ""); upErr != nil { - c.Logger.Warn("failed to record snapshot run", "url", url, "error", upErr) + log.Warn("failed to record snapshot run", "url", url, "error", upErr) } if deletionsMarked { if err := c.generateReport(ctx, tableName); err != nil { - c.Logger.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName) + log.Warn("failed to regenerate hourly report after deletions", "error", err, "table", tableName) } else { - c.Logger.Debug("Regenerated hourly report after deletions", "table", tableName) + log.Debug("Regenerated hourly report after deletions", "table", tableName) } } return nil