diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index 5f65bbc..f0ed3ee 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -14,8 +14,6 @@ import ( "vctp/db" "vctp/internal/metrics" "vctp/internal/report" - - "github.com/jmoiron/sqlx" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. @@ -89,10 +87,6 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) - // Ensure indexes exist on historical hourly tables for faster aggregation. - if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { - c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) - } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { @@ -207,15 +201,10 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd hourlyTables := make([]string, 0, len(hourlySnapshots)) for _, snapshot := range hourlySnapshots { hourlyTables = append(hourlyTables, snapshot.TableName) - if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil { - c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err) - } } unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err - } else { - c.Logger.Debug("Built union query", "string", unionQuery) } // Clear existing summary if forcing. @@ -286,9 +275,7 @@ LIMIT 1 } nextPresence := make(map[string]struct{}) if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) { - q := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, nextSnapshotTable) - q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q) - rows, err := dbConn.QueryxContext(ctx, q, c.Settings.Values.Settings.VcenterAddresses[0]) + rows, err := querySnapshotRows(ctx, dbConn, nextSnapshotTable, []string{"VmId", "VmUuid", "Name"}, `"Vcenter" = ?`, c.Settings.Values.Settings.VcenterAddresses[0]) if err == nil { for rows.Next() { var vmId, vmUuid, name sql.NullString diff --git a/internal/tasks/inventoryHelpers.go b/internal/tasks/inventoryHelpers.go index 52beefb..c494e8d 100644 --- a/internal/tasks/inventoryHelpers.go +++ b/internal/tasks/inventoryHelpers.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "strconv" "strings" "time" @@ -42,51 +41,62 @@ func boolStringFromInterface(value interface{}) string { } } -// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time. +// latestHourlySnapshotBefore finds the most recent hourly snapshot table prior to the given time, skipping empty tables. func latestHourlySnapshotBefore(ctx context.Context, dbConn *sqlx.DB, cutoff time.Time) (string, error) { - driver := strings.ToLower(dbConn.DriverName()) - var rows *sqlx.Rows - var err error - switch driver { - case "sqlite": - rows, err = dbConn.QueryxContext(ctx, ` -SELECT name FROM sqlite_master -WHERE type = 'table' AND name LIKE 'inventory_hourly_%' -`) - case "pgx", "postgres": - rows, err = dbConn.QueryxContext(ctx, ` -SELECT tablename FROM pg_catalog.pg_tables -WHERE schemaname = 'public' AND tablename LIKE 'inventory_hourly_%' -`) - default: - return "", fmt.Errorf("unsupported driver for snapshot lookup: %s", driver) - } + rows, err := dbConn.QueryxContext(ctx, ` +SELECT table_name, snapshot_time +FROM snapshot_registry +WHERE snapshot_type = 'hourly' AND snapshot_time < ? +ORDER BY snapshot_time DESC +`, cutoff.Unix()) if err != nil { return "", err } defer rows.Close() - var latest string - var latestTime int64 for rows.Next() { var name string - if scanErr := rows.Scan(&name); scanErr != nil { + var ts int64 + if scanErr := rows.Scan(&name, &ts); scanErr != nil { continue } - if !strings.HasPrefix(name, "inventory_hourly_") { + if err := db.ValidateTableName(name); err != nil { continue } - suffix := strings.TrimPrefix(name, "inventory_hourly_") - epoch, parseErr := strconv.ParseInt(suffix, 10, 64) - if parseErr != nil { + hasRows, err := db.TableHasRows(ctx, dbConn, name) + if err != nil { continue } - if epoch < cutoff.Unix() && epoch > latestTime { - latestTime = epoch - latest = name + if hasRows { + return name, nil } } - return latest, nil + return "", nil +} + +// HasSnapshotGap reports whether the gap between prev and curr exceeds 2x the expected interval. +func HasSnapshotGap(prevUnix, currUnix int64, expectedSeconds int64) bool { + if prevUnix == 0 || currUnix == 0 || expectedSeconds <= 0 { + return false + } + return currUnix-prevUnix > expectedSeconds*2 +} + +// querySnapshotRows builds a SELECT with proper rebind for the given table/columns/where. +func querySnapshotRows(ctx context.Context, dbConn *sqlx.DB, table string, columns []string, where string, args ...interface{}) (*sqlx.Rows, error) { + if err := db.ValidateTableName(table); err != nil { + return nil, err + } + colExpr := "*" + if len(columns) > 0 { + colExpr = `"` + strings.Join(columns, `","`) + `"` + } + query := fmt.Sprintf(`SELECT %s FROM %s`, colExpr, table) + if strings.TrimSpace(where) != "" { + query = fmt.Sprintf(`%s WHERE %s`, query, where) + } + query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) + return dbConn.QueryxContext(ctx, query, args...) } // markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now. @@ -98,9 +108,6 @@ func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, return 0 } - query := fmt.Sprintf(`SELECT "VmId","VmUuid","Name","Cluster","Datacenter","DeletionTime" FROM %s WHERE "Vcenter" = ?`, prevTable) - query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) - type prevRow struct { VmId sql.NullString `db:"VmId"` VmUuid sql.NullString `db:"VmUuid"` @@ -110,7 +117,7 @@ func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, DeletionTime sql.NullInt64 `db:"DeletionTime"` } - rows, err := dbConn.QueryxContext(ctx, query, vcenter) + rows, err := querySnapshotRows(ctx, dbConn, prevTable, []string{"VmId", "VmUuid", "Name", "Cluster", "Datacenter", "DeletionTime"}, `"Vcenter" = ?`, vcenter) if err != nil { c.Logger.Warn("failed to read previous snapshot for deletion detection", "error", err, "table", prevTable, "vcenter", vcenter) return 0 diff --git a/internal/tasks/inventoryLifecycle.go b/internal/tasks/inventoryLifecycle.go new file mode 100644 index 0000000..2b9cef8 --- /dev/null +++ b/internal/tasks/inventoryLifecycle.go @@ -0,0 +1,252 @@ +package tasks + +import ( + "context" + "database/sql" + "log/slog" + "time" + + "vctp/db" + + "github.com/jmoiron/sqlx" +) + +// presenceKeys builds lookup keys for vm presence comparison. +func presenceKeys(vmID, vmUUID, name string) []string { + keys := make([]string, 0, 3) + if vmID != "" { + keys = append(keys, "id:"+vmID) + } + if vmUUID != "" { + keys = append(keys, "uuid:"+vmUUID) + } + if name != "" { + keys = append(keys, "name:"+name) + } + return keys +} + +// backfillLifecycleDeletionsToday looks for VMs in the lifecycle cache that are not in the current inventory, +// have no DeletedAt, and determines their deletion time from today's hourly snapshots, optionally checking the next snapshot (next day) to confirm. +func backfillLifecycleDeletionsToday(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, present map[string]InventorySnapshotRow) error { + dayStart := truncateDate(snapshotTime) + dayEnd := dayStart.Add(24 * time.Hour) + + candidates, err := loadLifecycleCandidates(ctx, dbConn, vcenter, present) + if err != nil || len(candidates) == 0 { + return err + } + + tables, err := listHourlyTablesForDay(ctx, dbConn, dayStart, dayEnd) + if err != nil { + return err + } + if len(tables) == 0 { + return nil + } + + nextPresence := make(map[string]struct{}) + if nextTable, nextErr := nextSnapshotAfter(ctx, dbConn, dayEnd, vcenter); nextErr == nil && nextTable != "" { + nextPresence = loadPresenceKeys(ctx, dbConn, nextTable, vcenter) + } + + for _, cand := range candidates { + deletion, firstMiss := findDeletionInTables(ctx, dbConn, tables, vcenter, cand) + if deletion == 0 && len(nextPresence) > 0 && firstMiss > 0 { + if !isPresent(nextPresence, cand) { + // Single miss at end of day, confirmed by next-day absence. + deletion = firstMiss + logger.Debug("cross-day deletion inferred from next snapshot", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "deletion", deletion) + } + } + if deletion > 0 { + if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, cand.vmID, cand.vmUUID, cand.name, cand.cluster, deletion); err != nil { + logger.Warn("lifecycle backfill mark deleted failed", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion, "error", err) + continue + } + logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion) + } + } + return nil +} + +type lifecycleCandidate struct { + vmID string + vmUUID string + name string + cluster string +} + +func loadLifecycleCandidates(ctx context.Context, dbConn *sqlx.DB, vcenter string, present map[string]InventorySnapshotRow) ([]lifecycleCandidate, error) { + rows, err := dbConn.QueryxContext(ctx, ` +SELECT "VmId","VmUuid","Name","Cluster" +FROM vm_lifecycle_cache +WHERE "Vcenter" = ? AND ("DeletedAt" IS NULL OR "DeletedAt" = 0) +`, vcenter) + if err != nil { + return nil, err + } + defer rows.Close() + + var cands []lifecycleCandidate + for rows.Next() { + var vmID, vmUUID, name, cluster sql.NullString + if scanErr := rows.Scan(&vmID, &vmUUID, &name, &cluster); scanErr != nil { + continue + } + if vmID.String == "" { + continue + } + if _, ok := present[vmID.String]; ok { + continue // still present, skip + } + cands = append(cands, lifecycleCandidate{ + vmID: vmID.String, + vmUUID: vmUUID.String, + name: name.String, + cluster: cluster.String, + }) + } + return cands, nil +} + +type snapshotTable struct { + Table string `db:"table_name"` + Time int64 `db:"snapshot_time"` +} + +func listHourlyTablesForDay(ctx context.Context, dbConn *sqlx.DB, dayStart, dayEnd time.Time) ([]snapshotTable, error) { + rows, err := dbConn.QueryxContext(ctx, ` +SELECT table_name, snapshot_time +FROM snapshot_registry +WHERE snapshot_type = 'hourly' AND snapshot_time >= ? AND snapshot_time < ? +ORDER BY snapshot_time ASC +`, dayStart.Unix(), dayEnd.Unix()) + if err != nil { + return nil, err + } + defer rows.Close() + + var tables []snapshotTable + for rows.Next() { + var t snapshotTable + if err := rows.StructScan(&t); err != nil { + continue + } + if err := db.ValidateTableName(t.Table); err != nil { + continue + } + hasRows, err := db.TableHasRows(ctx, dbConn, t.Table) + if err != nil || !hasRows { + continue + } + tables = append(tables, t) + } + return tables, nil +} + +func nextSnapshotAfter(ctx context.Context, dbConn *sqlx.DB, after time.Time, vcenter string) (string, error) { + rows, err := dbConn.QueryxContext(ctx, ` +SELECT table_name +FROM snapshot_registry +WHERE snapshot_type = 'hourly' AND snapshot_time >= ? +ORDER BY snapshot_time ASC +LIMIT 1 +`, after.Unix()) + if err != nil { + return "", err + } + defer rows.Close() + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + continue + } + if err := db.ValidateTableName(name); err != nil { + continue + } + // ensure the snapshot table actually has entries for this vcenter + vrows, qerr := querySnapshotRows(ctx, dbConn, name, []string{"VmId"}, `"Vcenter" = ? LIMIT 1`, vcenter) + if qerr != nil { + continue + } + hasVcenter := vrows.Next() + vrows.Close() + if hasVcenter { + return name, nil + } + } + return "", nil +} + +func loadPresenceKeys(ctx context.Context, dbConn *sqlx.DB, table, vcenter string) map[string]struct{} { + out := make(map[string]struct{}) + rows, err := querySnapshotRows(ctx, dbConn, table, []string{"VmId", "VmUuid", "Name"}, `"Vcenter" = ?`, vcenter) + if err != nil { + return out + } + defer rows.Close() + for rows.Next() { + var vmId, vmUuid, name sql.NullString + if err := rows.Scan(&vmId, &vmUuid, &name); err == nil { + for _, k := range presenceKeys(vmId.String, vmUuid.String, name.String) { + out[k] = struct{}{} + } + } + } + return out +} + +func isPresent(presence map[string]struct{}, cand lifecycleCandidate) bool { + for _, k := range presenceKeys(cand.vmID, cand.vmUUID, cand.name) { + if _, ok := presence[k]; ok { + return true + } + } + return false +} + +func findDeletionInTables(ctx context.Context, dbConn *sqlx.DB, tables []snapshotTable, vcenter string, cand lifecycleCandidate) (int64, int64) { + var lastSeen int64 + var firstMiss int64 + for i, tbl := range tables { + rows, err := querySnapshotRows(ctx, dbConn, tbl.Table, []string{"VmId", "VmUuid", "Name", "Cluster"}, `"Vcenter" = ? AND "VmId" = ?`, vcenter, cand.vmID) + if err == nil { + if rows.Next() { + var vmId, vmUuid, name, cluster sql.NullString + if scanErr := rows.Scan(&vmId, &vmUuid, &name, &cluster); scanErr == nil { + lastSeen = tbl.Time + if cand.name == "" && name.Valid { + cand.name = name.String + } + if cand.cluster == "" && cluster.Valid { + cand.cluster = cluster.String + } + } + } + rows.Close() + } + if lastSeen > 0 && tbl.Time > lastSeen { + // first table after last seen -> first miss + if seen, _ := candSeenInTable(ctx, dbConn, tbl.Table, vcenter, cand.vmID); !seen { + firstMiss = tbl.Time + // need two consecutive misses + if i+1 < len(tables) { + if seen2, _ := candSeenInTable(ctx, dbConn, tables[i+1].Table, vcenter, cand.vmID); !seen2 { + return firstMiss, firstMiss + } + } + } + } + } + return 0, firstMiss +} + +func candSeenInTable(ctx context.Context, dbConn *sqlx.DB, table, vcenter, vmID string) (bool, error) { + rows, err := querySnapshotRows(ctx, dbConn, table, []string{"VmId"}, `"Vcenter" = ? AND "VmId" = ? LIMIT 1`, vcenter, vmID) + if err != nil { + return false, err + } + defer rows.Close() + return rows.Next(), nil +} diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 339b86d..7372123 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -503,128 +503,6 @@ func normalizeResourcePool(value string) string { return trimmed } -// backfillLifecycleDeletionsToday looks for VMs in the lifecycle cache that are not in the current inventory, -// have no DeletedAt, and determines their deletion time from today's hourly snapshots. -func backfillLifecycleDeletionsToday(ctx context.Context, logger *slog.Logger, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, present map[string]InventorySnapshotRow) error { - dayStart := truncateDate(snapshotTime) - dayEnd := dayStart.Add(24 * time.Hour) - - // Lifecycle entries missing DeletedAt. - queryLifecycle := ` -SELECT "VmId","VmUuid","Name","Cluster" -FROM vm_lifecycle_cache -WHERE "Vcenter" = ? AND ("DeletedAt" IS NULL OR "DeletedAt" = 0) -` - rows, err := dbConn.QueryxContext(ctx, queryLifecycle, vcenter) - if err != nil { - return err - } - defer rows.Close() - - type candidate struct { - vmID string - vmUUID string - name string - cluster string - } - var cands []candidate - for rows.Next() { - var vmID, vmUUID, name, cluster sql.NullString - if scanErr := rows.Scan(&vmID, &vmUUID, &name, &cluster); scanErr != nil { - continue - } - if vmID.String == "" { - continue - } - if _, ok := present[vmID.String]; ok { - continue // still present, skip - } - cands = append(cands, candidate{ - vmID: vmID.String, - vmUUID: vmUUID.String, - name: name.String, - cluster: cluster.String, - }) - } - - if len(cands) == 0 { - return nil - } - - // Get today's hourly tables. - query := ` -SELECT table_name, snapshot_time -FROM snapshot_registry -WHERE snapshot_type = 'hourly' AND snapshot_time >= ? AND snapshot_time < ? -ORDER BY snapshot_time ASC -` - query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query) - var tables []struct { - Table string `db:"table_name"` - Time int64 `db:"snapshot_time"` - } - rowsTables, err := dbConn.QueryxContext(ctx, query, dayStart.Unix(), dayEnd.Unix()) - if err != nil { - return err - } - defer rowsTables.Close() - for rowsTables.Next() { - var t struct { - Table string `db:"table_name"` - Time int64 `db:"snapshot_time"` - } - if err := rowsTables.StructScan(&t); err != nil { - continue - } - tables = append(tables, t) - } - if len(tables) == 0 { - return nil - } - - for _, cand := range cands { - var lastSeen int64 - var deletion int64 - logger.Debug("lifecycle backfill candidate", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "tables", len(tables)) - for i, tbl := range tables { - if err := db.ValidateTableName(tbl.Table); err != nil { - continue - } - q := fmt.Sprintf(`SELECT "Name","Cluster" FROM %s WHERE "Vcenter" = ? AND "VmId" = ? LIMIT 1`, tbl.Table) - q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q) - var name, cluster sql.NullString - err := dbConn.QueryRowxContext(ctx, q, vcenter, cand.vmID).Scan(&name, &cluster) - if err == nil { - lastSeen = tbl.Time - if cand.name == "" && name.Valid { - cand.name = name.String - } - if cand.cluster == "" && cluster.Valid { - cand.cluster = cluster.String - } - continue - } - // Not found in this table; if previously seen today, mark deletion at this snapshot time. - if lastSeen > 0 { - deletion = tbl.Time - break - } - // If never seen today and we're at the last table, mark deletion at current snapshot time. - if i == len(tables)-1 { - deletion = tbl.Time - } - } - if deletion > 0 { - if err := db.MarkVmDeletedWithDetails(ctx, dbConn, vcenter, cand.vmID, cand.vmUUID, cand.name, cand.cluster, deletion); err != nil { - logger.Warn("lifecycle backfill mark deleted failed", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion, "error", err) - continue - } - logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion) - } - } - return nil -} - func (c *CronTask) reportsDir() string { if c.Settings != nil && c.Settings.Values != nil { if dir := strings.TrimSpace(c.Settings.Values.Settings.ReportsDir); dir != "" { @@ -1099,8 +977,8 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim moreMissing := c.markMissingFromPrevious(ctx, dbConn, prevTableName, url, startTime, presentSnapshots, presentByUuid, presentByName, inventoryByVmID, inventoryByUuid, inventoryByName) missingCount += moreMissing // Guard against gaps: if previous snapshot is much older than expected, skip "new" detection to avoid false positives when an hourly run was missed. - snapshotPeriod := durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour).Seconds() - if prevSnapshotTime > 0 && startTime.Unix()-prevSnapshotTime > int64(snapshotPeriod*2) { + expectedSeconds := int64(durationFromSeconds(c.Settings.Values.Settings.VcenterInventorySnapshotSeconds, time.Hour).Seconds()) + if HasSnapshotGap(prevSnapshotTime, startTime.Unix(), expectedSeconds) { c.Logger.Info("skipping new-VM detection due to gap between snapshots", "prev_table", prevTableName, "prev_snapshot_unix", prevSnapshotTime, "current_snapshot_unix", startTime.Unix()) } else { newCount = countNewFromPrevious(ctx, dbConn, prevTableName, url, presentSnapshots) diff --git a/internal/tasks/tasks.go b/internal/tasks/tasks.go deleted file mode 100644 index dd4c8ad..0000000 --- a/internal/tasks/tasks.go +++ /dev/null @@ -1,3 +0,0 @@ -package tasks - -// Legacy placeholder: type definitions moved to types.go.