diff --git a/internal/tasks/dailyAggregate.go b/internal/tasks/dailyAggregate.go index ffec5fb..f8bb4e9 100644 --- a/internal/tasks/dailyAggregate.go +++ b/internal/tasks/dailyAggregate.go @@ -243,7 +243,7 @@ func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd if aggMap == nil { var errScan error - aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples) + aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots) if errScan != nil { return errScan } @@ -341,7 +341,7 @@ type dailyAggVal struct { deletion int64 } -func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) { +func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) { agg := make(map[dailyAggKey]*dailyAggVal, 1024) mu := sync.Mutex{} workers := runtime.NumCPU() diff --git a/internal/tasks/inventorySnapshots.go b/internal/tasks/inventorySnapshots.go index 0e74ee1..a969cac 100644 --- a/internal/tasks/inventorySnapshots.go +++ b/internal/tasks/inventorySnapshots.go @@ -470,6 +470,17 @@ var summaryUnionColumns = []string{ `"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, } +// monthlyUnionColumns are the fields needed from daily summaries for monthly aggregation/refinement. +var monthlyUnionColumns = []string{ + `"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`, + `"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`, + `"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`, + `"SrmPlaceholder"`, `"VmUuid"`, + `"SamplesPresent"`, `"AvgVcpuCount"`, `"AvgRamGB"`, `"AvgProvisionedDisk"`, `"AvgIsPresent"`, + `"PoolTinPct"`, `"PoolBronzePct"`, `"PoolSilverPct"`, `"PoolGoldPct"`, + `"Tin"`, `"Bronze"`, `"Silver"`, `"Gold"`, +} + func ensureSnapshotRowID(ctx context.Context, dbConn *sqlx.DB, tableName string) error { driver := strings.ToLower(dbConn.DriverName()) switch driver { diff --git a/internal/tasks/monitorVcenter.go b/internal/tasks/monitorVcenter.go index bb97006..15b021c 100644 --- a/internal/tasks/monitorVcenter.go +++ b/internal/tasks/monitorVcenter.go @@ -130,8 +130,6 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent existingUpdateFound bool ) - // TODO - how to prevent creating a new record every polling cycle? - params := queries.CreateUpdateParams{ InventoryId: sql.NullInt64{Int64: dbVm.Iid, Valid: dbVm.Iid > 0}, } @@ -181,12 +179,8 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent } } - // TODO - should we bother to check if disk space has changed? - if updateType != "unknown" { - - // TODO query updates table to see if there is already an update of this type and the new value - + // Check if we already have an existing update record for this same change checkParams := queries.GetVmUpdatesParams{ InventoryId: sql.NullInt64{Int64: dbVm.Iid, Valid: dbVm.Iid > 0}, UpdateType: updateType, @@ -241,7 +235,6 @@ func (c *CronTask) UpdateVmInventory(vmObj *mo.VirtualMachine, vc *vcenter.Vcent // add sleep to slow down mass VM additions utils.SleepWithContext(ctx, (10 * time.Millisecond)) } - } return nil diff --git a/internal/tasks/monthlyAggregate.go b/internal/tasks/monthlyAggregate.go index b02df3e..495d6a6 100644 --- a/internal/tasks/monthlyAggregate.go +++ b/internal/tasks/monthlyAggregate.go @@ -77,7 +77,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time // Optional Go-based aggregation path. if os.Getenv("MONTHLY_AGG_GO") == "1" { c.Logger.Debug("Using go implementation of monthly aggregation") - if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots, force); err != nil { + if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, dailySnapshots); err != nil { c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err) } else { metrics.RecordMonthlyAggregation(time.Since(jobStart), nil) @@ -90,7 +90,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } - unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) + unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) if err != nil { return err } @@ -151,7 +151,7 @@ func monthlySummaryTableName(t time.Time) (string, error) { // aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go, // reading daily summaries in parallel and reducing them to a single monthly summary table. -func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord, force bool) error { +func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord) error { jobStart := time.Now() dbConn := c.Database.DB() @@ -164,7 +164,7 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo for _, snapshot := range dailySnapshots { dailyTables = append(dailyTables, snapshot.TableName) } - unionQuery, err := buildUnionQuery(dailyTables, summaryUnionColumns, templateExclusionFilter()) + unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter()) if err != nil { return err } @@ -189,9 +189,13 @@ func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, mo return err } - // Refine creation/deletion using SQL helper. - if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { - c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) + // Refine creation/deletion using SQL helper (requires SnapshotTime in union). + if strings.Contains(unionQuery, `"SnapshotTime"`) { + if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil { + c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable) + } + } else { + c.Logger.Debug("Skipping lifecycle refinement for monthly aggregation (no SnapshotTime in union)") } // Backfill missing creation times to the start of the month for rows lacking creation info. @@ -363,17 +367,18 @@ FROM %s result := make(map[monthlyAggKey]*monthlyAggVal, 256) for rows.Next() { var ( - inventoryId sql.NullInt64 - name, vcenter, vmId, vmUuid, eventKey, cloudId string - resourcePool, datacenter, cluster, folder string - isTemplate, poweredOn, srmPlaceholder string - disk, avgVcpu, avgRam, avgDisk sql.NullFloat64 - avgIsPresent sql.NullFloat64 - poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64 - tinPct, bronzePct, silverPct, goldPct sql.NullFloat64 - vcpu, ram sql.NullInt64 - creation, deletion sql.NullInt64 - samplesPresent sql.NullInt64 + inventoryId sql.NullInt64 + name, vcenter, vmId, vmUuid string + eventKey, cloudId sql.NullString + resourcePool, datacenter, cluster, folder sql.NullString + isTemplate, poweredOn, srmPlaceholder sql.NullString + disk, avgVcpu, avgRam, avgDisk sql.NullFloat64 + avgIsPresent sql.NullFloat64 + poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64 + tinPct, bronzePct, silverPct, goldPct sql.NullFloat64 + vcpu, ram sql.NullInt64 + creation, deletion sql.NullInt64 + samplesPresent sql.NullInt64 ) if err := rows.Scan( @@ -389,7 +394,8 @@ FROM %s continue } - if strings.EqualFold(strings.TrimSpace(isTemplate), "true") || strings.EqualFold(strings.TrimSpace(isTemplate), "1") { + templateVal := strings.TrimSpace(isTemplate.String) + if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } @@ -397,15 +403,15 @@ FROM %s agg := &monthlyAggVal{ key: key, inventoryId: inventoryId.Int64, - eventKey: eventKey, - cloudId: cloudId, - resourcePool: resourcePool, - datacenter: datacenter, - cluster: cluster, - folder: folder, - isTemplate: isTemplate, - poweredOn: poweredOn, - srmPlaceholder: srmPlaceholder, + eventKey: eventKey.String, + cloudId: cloudId.String, + resourcePool: resourcePool.String, + datacenter: datacenter.String, + cluster: cluster.String, + folder: folder.String, + isTemplate: isTemplate.String, + poweredOn: poweredOn.String, + srmPlaceholder: srmPlaceholder.String, provisioned: disk.Float64, vcpuCount: vcpu.Int64, ramGB: ram.Int64, @@ -492,7 +498,8 @@ WHERE "Date" >= ? AND "Date" < ? ); err != nil { continue } - if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" { + templateVal := strings.TrimSpace(isTemplate.String) + if strings.EqualFold(templateVal, "true") || templateVal == "1" { continue } key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}