diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index e718114..5f65bbc 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -14,6 +14,8 @@ import ( "vctp/db" "vctp/internal/metrics" "vctp/internal/report" + + "github.com/jmoiron/sqlx" ) // RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table. @@ -198,6 +200,8 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots) if len(hourlySnapshots) == 0 { return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02")) + } else { + c.Logger.Debug("Found hourly snapshot tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots)) } hourlyTables := make([]string, 0, len(hourlySnapshots)) @@ -210,6 +214,8 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter()) if err != nil { return err + } else { + c.Logger.Debug("Built union query", "string", unionQuery) } // Clear existing summary if forcing. @@ -235,6 +241,7 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd if cacheErr != nil { c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr) } else if len(cacheAgg) > 0 { + c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg)) aggMap = cacheAgg snapTimes = cacheTimes totalSamples = len(cacheTimes) @@ -247,6 +254,7 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd if errScan != nil { return errScan } + c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap)) if len(aggMap) == 0 { return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02")) } @@ -259,26 +267,99 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] }) } - for _, v := range aggMap { - if v.creation == 0 { - v.creation = v.firstSeen + // Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day. + var nextSnapshotTable string + nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, ` +SELECT table_name +FROM snapshot_registry +WHERE snapshot_type = 'hourly' AND snapshot_time >= ? +ORDER BY snapshot_time ASC +LIMIT 1 +`, dayEnd.Unix()) + if nextErr == nil { + if nextSnapshotRows.Next() { + if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil { + nextSnapshotTable = "" + } } - // Infer deletion as the first snapshot time after lastSeen where the VM is absent. + nextSnapshotRows.Close() + } + nextPresence := make(map[string]struct{}) + if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) { + q := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, nextSnapshotTable) + q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q) + rows, err := dbConn.QueryxContext(ctx, q, c.Settings.Values.Settings.VcenterAddresses[0]) + if err == nil { + for rows.Next() { + var vmId, vmUuid, name sql.NullString + if err := rows.Scan(&vmId, &vmUuid, &name); err == nil { + if vmId.Valid { + nextPresence["id:"+vmId.String] = struct{}{} + } + if vmUuid.Valid { + nextPresence["uuid:"+vmUuid.String] = struct{}{} + } + if name.Valid { + nextPresence["name:"+name.String] = struct{}{} + } + } + } + rows.Close() + } + } + + var maxSnap int64 + if len(snapTimes) > 0 { + maxSnap = snapTimes[len(snapTimes)-1] + } + + for _, v := range aggMap { + // Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen. + if maxSnap > 0 && len(v.seen) > 0 && v.lastSeen < maxSnap { + c.Logger.Debug("inferring deletion window", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(snapTimes)) + } + consecutiveMisses := 0 + firstMiss := int64(0) for _, t := range snapTimes { if t <= v.lastSeen { continue } - if _, ok := v.seen[t]; !ok { - v.deletion = t + if _, ok := v.seen[t]; ok { + consecutiveMisses = 0 + firstMiss = 0 + continue + } + consecutiveMisses++ + if firstMiss == 0 { + firstMiss = t + } + if consecutiveMisses >= 2 { + v.deletion = firstMiss break } } + if v.deletion == 0 && firstMiss > 0 { + // Not enough consecutive misses within the day; try to use the first snapshot of the next day to confirm. + if nextSnapshotTable != "" && len(nextPresence) > 0 { + _, presentByID := nextPresence["id:"+v.key.VmId] + _, presentByUUID := nextPresence["uuid:"+v.key.VmUuid] + _, presentByName := nextPresence["name:"+v.key.Name] + if !presentByID && !presentByUUID && !presentByName { + v.deletion = firstMiss + c.Logger.Debug("cross-day deletion inferred from next snapshot", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable) + } + } + if v.deletion == 0 { + c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss) + } + } } // Insert aggregated rows. if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil { return err } + c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples) // Persist rollup cache for monthly aggregation. if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil { @@ -303,7 +384,13 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd return err } - c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart)) + c.Logger.Debug("Finished daily inventory aggregation (Go path)", + "summary_table", summaryTable, + "duration", time.Since(jobStart), + "tables_scanned", len(hourlyTables), + "rows_written", rowCount, + "total_samples", totalSamples, + ) return nil } diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index 001f0c7..535ae89 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -265,6 +265,7 @@ func mergeMonthlyAgg(dst, src *monthlyAggVal) { if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) { dst.creation = src.creation } + // If creation is unknown in all daily summaries, leave it zero for reports (VM trace handles approximation separately). if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) { dst.deletion = src.deletion }