backfill CreationTime in daily aggregation
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -155,6 +155,11 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
|||||||
} else {
|
} else {
|
||||||
c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied)
|
c.Logger.Info("Daily aggregation deletion times", "source_lifecycle_cache", applied)
|
||||||
}
|
}
|
||||||
|
if applied, err := db.ApplyLifecycleCreationToSummary(ctx, dbConn, summaryTable); err != nil {
|
||||||
|
c.Logger.Warn("failed to apply lifecycle creations to daily summary", "error", err, "table", summaryTable)
|
||||||
|
} else {
|
||||||
|
c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", applied)
|
||||||
|
}
|
||||||
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
||||||
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
||||||
}
|
}
|
||||||
@@ -297,6 +302,12 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd
|
|||||||
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd)
|
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, dayStart, dayEnd)
|
||||||
c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions)
|
c.Logger.Info("Daily aggregation deletion times", "source_inventory", inventoryDeletions)
|
||||||
|
|
||||||
|
lifecycleCreations := c.applyLifecycleCreations(ctx, aggMap)
|
||||||
|
c.Logger.Info("Daily aggregation creation times", "source_lifecycle_cache", lifecycleCreations)
|
||||||
|
|
||||||
|
inventoryCreations := c.applyInventoryCreations(ctx, aggMap)
|
||||||
|
c.Logger.Info("Daily aggregation creation times", "source_inventory", inventoryCreations)
|
||||||
|
|
||||||
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
|
// Get the first hourly snapshot on/after dayEnd to help confirm deletions that happen on the last snapshot of the day.
|
||||||
var nextSnapshotTable string
|
var nextSnapshotTable string
|
||||||
nextSnapshotQuery := dbConn.Rebind(`
|
nextSnapshotQuery := dbConn.Rebind(`
|
||||||
@@ -617,6 +628,97 @@ WHERE "Vcenter" = ? AND "DeletedAt" IS NOT NULL AND "DeletedAt" > 0 AND "Deleted
|
|||||||
return totalApplied
|
return totalApplied
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CronTask) applyLifecycleCreations(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int {
|
||||||
|
dbConn := c.Database.DB()
|
||||||
|
if !db.TableExists(ctx, dbConn, "vm_lifecycle_cache") {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
type aggIndex struct {
|
||||||
|
byID map[string]*dailyAggVal
|
||||||
|
byUUID map[string]*dailyAggVal
|
||||||
|
byName map[string]*dailyAggVal
|
||||||
|
}
|
||||||
|
indexes := make(map[string]*aggIndex, 8)
|
||||||
|
for k, v := range agg {
|
||||||
|
if k.Vcenter == "" || v.creation > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idx := indexes[k.Vcenter]
|
||||||
|
if idx == nil {
|
||||||
|
idx = &aggIndex{
|
||||||
|
byID: make(map[string]*dailyAggVal),
|
||||||
|
byUUID: make(map[string]*dailyAggVal),
|
||||||
|
byName: make(map[string]*dailyAggVal),
|
||||||
|
}
|
||||||
|
indexes[k.Vcenter] = idx
|
||||||
|
}
|
||||||
|
if k.VmId != "" {
|
||||||
|
idx.byID[k.VmId] = v
|
||||||
|
}
|
||||||
|
if k.VmUuid != "" {
|
||||||
|
idx.byUUID[k.VmUuid] = v
|
||||||
|
}
|
||||||
|
if name := strings.ToLower(strings.TrimSpace(k.Name)); name != "" {
|
||||||
|
idx.byName[name] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
totalApplied := 0
|
||||||
|
for vcenter, idx := range indexes {
|
||||||
|
query := `
|
||||||
|
SELECT "VmId","VmUuid","Name","FirstSeen"
|
||||||
|
FROM vm_lifecycle_cache
|
||||||
|
WHERE "Vcenter" = ? AND "FirstSeen" IS NOT NULL AND "FirstSeen" > 0
|
||||||
|
`
|
||||||
|
bind := dbConn.Rebind(query)
|
||||||
|
rows, err := dbConn.QueryxContext(ctx, bind, vcenter)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Warn("failed to load lifecycle creations", "vcenter", vcenter, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
scanned := 0
|
||||||
|
applied := 0
|
||||||
|
missed := 0
|
||||||
|
for rows.Next() {
|
||||||
|
scanned++
|
||||||
|
var vmId, vmUuid, name sql.NullString
|
||||||
|
var firstSeen sql.NullInt64
|
||||||
|
if err := rows.Scan(&vmId, &vmUuid, &name, &firstSeen); err != nil {
|
||||||
|
c.Logger.Warn("failed to scan lifecycle creation", "vcenter", vcenter, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !firstSeen.Valid || firstSeen.Int64 <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var target *dailyAggVal
|
||||||
|
if vmId.Valid {
|
||||||
|
target = idx.byID[strings.TrimSpace(vmId.String)]
|
||||||
|
}
|
||||||
|
if target == nil && vmUuid.Valid {
|
||||||
|
target = idx.byUUID[strings.TrimSpace(vmUuid.String)]
|
||||||
|
}
|
||||||
|
if target == nil && name.Valid {
|
||||||
|
target = idx.byName[strings.ToLower(strings.TrimSpace(name.String))]
|
||||||
|
}
|
||||||
|
if target == nil {
|
||||||
|
missed++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if target.creation == 0 {
|
||||||
|
target.creation = firstSeen.Int64
|
||||||
|
applied++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
c.Logger.Warn("failed to read lifecycle creations", "vcenter", vcenter, "error", err)
|
||||||
|
}
|
||||||
|
c.Logger.Debug("lifecycle cache creations applied", "vcenter", vcenter, "scanned", scanned, "applied", applied, "missed", missed)
|
||||||
|
totalApplied += applied
|
||||||
|
}
|
||||||
|
return totalApplied
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int {
|
func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAggKey]*dailyAggVal, start, end time.Time) int {
|
||||||
vcenters := make(map[string]struct{}, 8)
|
vcenters := make(map[string]struct{}, 8)
|
||||||
for k := range agg {
|
for k := range agg {
|
||||||
@@ -680,6 +782,60 @@ func (c *CronTask) applyInventoryDeletions(ctx context.Context, agg map[dailyAgg
|
|||||||
return totalApplied
|
return totalApplied
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CronTask) applyInventoryCreations(ctx context.Context, agg map[dailyAggKey]*dailyAggVal) int {
|
||||||
|
vcenters := make(map[string]struct{}, 8)
|
||||||
|
for k := range agg {
|
||||||
|
if k.Vcenter != "" {
|
||||||
|
vcenters[k.Vcenter] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
totalApplied := 0
|
||||||
|
for vcenter := range vcenters {
|
||||||
|
inventoryRows, err := c.Database.Queries().GetInventoryByVcenter(ctx, vcenter)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Warn("failed to load inventory for daily creation times", "vcenter", vcenter, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
byID := make(map[string]int64, len(inventoryRows))
|
||||||
|
byUUID := make(map[string]int64, len(inventoryRows))
|
||||||
|
byName := make(map[string]int64, len(inventoryRows))
|
||||||
|
for _, inv := range inventoryRows {
|
||||||
|
if !inv.CreationTime.Valid || inv.CreationTime.Int64 <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if inv.VmId.Valid && strings.TrimSpace(inv.VmId.String) != "" {
|
||||||
|
byID[strings.TrimSpace(inv.VmId.String)] = inv.CreationTime.Int64
|
||||||
|
}
|
||||||
|
if inv.VmUuid.Valid && strings.TrimSpace(inv.VmUuid.String) != "" {
|
||||||
|
byUUID[strings.TrimSpace(inv.VmUuid.String)] = inv.CreationTime.Int64
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(inv.Name) != "" {
|
||||||
|
byName[strings.ToLower(strings.TrimSpace(inv.Name))] = inv.CreationTime.Int64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range agg {
|
||||||
|
if k.Vcenter != vcenter || v.creation > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ts, ok := byID[k.VmId]; ok {
|
||||||
|
v.creation = ts
|
||||||
|
totalApplied++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ts, ok := byUUID[k.VmUuid]; ok {
|
||||||
|
v.creation = ts
|
||||||
|
totalApplied++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ts, ok := byName[strings.ToLower(k.Name)]; ok {
|
||||||
|
v.creation = ts
|
||||||
|
totalApplied++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return totalApplied
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
||||||
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
|
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
|
||||||
mu := sync.Mutex{}
|
mu := sync.Mutex{}
|
||||||
|
|||||||
Reference in New Issue
Block a user