use 0 instead of start of aggregation window for creationtime in xlsx
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -14,6 +14,8 @@ import (
|
|||||||
"vctp/db"
|
"vctp/db"
|
||||||
"vctp/internal/metrics"
|
"vctp/internal/metrics"
|
||||||
"vctp/internal/report"
|
"vctp/internal/report"
|
||||||
|
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
|
// RunVcenterDailyAggregate summarizes hourly snapshots into a daily summary table.
|
||||||
@@ -198,6 +200,8 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
||||||
if len(hourlySnapshots) == 0 {
|
if len(hourlySnapshots) == 0 {
|
||||||
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
||||||
|
} else {
|
||||||
|
c.Logger.Debug("Found hourly snapshot tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots))
|
||||||
}
|
}
|
||||||
|
|
||||||
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
||||||
@@ -210,6 +214,8 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
} else {
|
||||||
|
c.Logger.Debug("Built union query", "string", unionQuery)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear existing summary if forcing.
|
// Clear existing summary if forcing.
|
||||||
@@ -235,6 +241,7 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
if cacheErr != nil {
|
if cacheErr != nil {
|
||||||
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
|
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
|
||||||
} else if len(cacheAgg) > 0 {
|
} else if len(cacheAgg) > 0 {
|
||||||
|
c.Logger.Debug("using hourly cache for daily aggregation", "date", dayStart.Format("2006-01-02"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
|
||||||
aggMap = cacheAgg
|
aggMap = cacheAgg
|
||||||
snapTimes = cacheTimes
|
snapTimes = cacheTimes
|
||||||
totalSamples = len(cacheTimes)
|
totalSamples = len(cacheTimes)
|
||||||
@@ -247,6 +254,7 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
if errScan != nil {
|
if errScan != nil {
|
||||||
return errScan
|
return errScan
|
||||||
}
|
}
|
||||||
|
c.Logger.Debug("scanned hourly tables for daily aggregation", "date", dayStart.Format("2006-01-02"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
|
||||||
if len(aggMap) == 0 {
|
if len(aggMap) == 0 {
|
||||||
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
|
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
@@ -259,26 +267,99 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
|
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range aggMap {
|
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
|
||||||
if v.creation == 0 {
|
var nextSnapshotTable string
|
||||||
v.creation = v.firstSeen
|
nextSnapshotRows, nextErr := c.Database.DB().QueryxContext(ctx, `
|
||||||
|
SELECT table_name
|
||||||
|
FROM snapshot_registry
|
||||||
|
WHERE snapshot_type = 'hourly' AND snapshot_time >= ?
|
||||||
|
ORDER BY snapshot_time ASC
|
||||||
|
LIMIT 1
|
||||||
|
`, dayEnd.Unix())
|
||||||
|
if nextErr == nil {
|
||||||
|
if nextSnapshotRows.Next() {
|
||||||
|
if scanErr := nextSnapshotRows.Scan(&nextSnapshotTable); scanErr != nil {
|
||||||
|
nextSnapshotTable = ""
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Infer deletion as the first snapshot time after lastSeen where the VM is absent.
|
nextSnapshotRows.Close()
|
||||||
|
}
|
||||||
|
nextPresence := make(map[string]struct{})
|
||||||
|
if nextSnapshotTable != "" && db.TableExists(ctx, dbConn, nextSnapshotTable) {
|
||||||
|
q := fmt.Sprintf(`SELECT "VmId","VmUuid","Name" FROM %s WHERE "Vcenter" = ?`, nextSnapshotTable)
|
||||||
|
q = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), q)
|
||||||
|
rows, err := dbConn.QueryxContext(ctx, q, c.Settings.Values.Settings.VcenterAddresses[0])
|
||||||
|
if err == nil {
|
||||||
|
for rows.Next() {
|
||||||
|
var vmId, vmUuid, name sql.NullString
|
||||||
|
if err := rows.Scan(&vmId, &vmUuid, &name); err == nil {
|
||||||
|
if vmId.Valid {
|
||||||
|
nextPresence["id:"+vmId.String] = struct{}{}
|
||||||
|
}
|
||||||
|
if vmUuid.Valid {
|
||||||
|
nextPresence["uuid:"+vmUuid.String] = struct{}{}
|
||||||
|
}
|
||||||
|
if name.Valid {
|
||||||
|
nextPresence["name:"+name.String] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var maxSnap int64
|
||||||
|
if len(snapTimes) > 0 {
|
||||||
|
maxSnap = snapTimes[len(snapTimes)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range aggMap {
|
||||||
|
// Infer deletion only after seeing at least two consecutive absent snapshots after lastSeen.
|
||||||
|
if maxSnap > 0 && len(v.seen) > 0 && v.lastSeen < maxSnap {
|
||||||
|
c.Logger.Debug("inferring deletion window", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "snapshots", len(snapTimes))
|
||||||
|
}
|
||||||
|
consecutiveMisses := 0
|
||||||
|
firstMiss := int64(0)
|
||||||
for _, t := range snapTimes {
|
for _, t := range snapTimes {
|
||||||
if t <= v.lastSeen {
|
if t <= v.lastSeen {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, ok := v.seen[t]; !ok {
|
if _, ok := v.seen[t]; ok {
|
||||||
v.deletion = t
|
consecutiveMisses = 0
|
||||||
|
firstMiss = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
consecutiveMisses++
|
||||||
|
if firstMiss == 0 {
|
||||||
|
firstMiss = t
|
||||||
|
}
|
||||||
|
if consecutiveMisses >= 2 {
|
||||||
|
v.deletion = firstMiss
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if v.deletion == 0 && firstMiss > 0 {
|
||||||
|
// Not enough consecutive misses within the day; try to use the first snapshot of the next day to confirm.
|
||||||
|
if nextSnapshotTable != "" && len(nextPresence) > 0 {
|
||||||
|
_, presentByID := nextPresence["id:"+v.key.VmId]
|
||||||
|
_, presentByUUID := nextPresence["uuid:"+v.key.VmUuid]
|
||||||
|
_, presentByName := nextPresence["name:"+v.key.Name]
|
||||||
|
if !presentByID && !presentByUUID && !presentByName {
|
||||||
|
v.deletion = firstMiss
|
||||||
|
c.Logger.Debug("cross-day deletion inferred from next snapshot", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "deletion", firstMiss, "next_table", nextSnapshotTable)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v.deletion == 0 {
|
||||||
|
c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert aggregated rows.
|
// Insert aggregated rows.
|
||||||
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples)
|
||||||
|
|
||||||
// Persist rollup cache for monthly aggregation.
|
// Persist rollup cache for monthly aggregation.
|
||||||
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
|
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
|
||||||
@@ -303,7 +384,13 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
|
c.Logger.Debug("Finished daily inventory aggregation (Go path)",
|
||||||
|
"summary_table", summaryTable,
|
||||||
|
"duration", time.Since(jobStart),
|
||||||
|
"tables_scanned", len(hourlyTables),
|
||||||
|
"rows_written", rowCount,
|
||||||
|
"total_samples", totalSamples,
|
||||||
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -265,6 +265,7 @@ func mergeMonthlyAgg(dst, src *monthlyAggVal) {
|
|||||||
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
||||||
dst.creation = src.creation
|
dst.creation = src.creation
|
||||||
}
|
}
|
||||||
|
// If creation is unknown in all daily summaries, leave it zero for reports (VM trace handles approximation separately).
|
||||||
if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) {
|
if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) {
|
||||||
dst.deletion = src.deletion
|
dst.deletion = src.deletion
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user