Compare commits
1 Commits
main
...
7b7afbf1d5
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b7afbf1d5 |
@@ -2,8 +2,13 @@ package tasks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"vctp/db"
|
"vctp/db"
|
||||||
"vctp/internal/metrics"
|
"vctp/internal/metrics"
|
||||||
@@ -56,6 +61,17 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
|
||||||
|
if os.Getenv("DAILY_AGG_GO") == "1" {
|
||||||
|
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
|
||||||
|
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
|
||||||
|
} else {
|
||||||
|
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
|
||||||
|
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -164,3 +180,379 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
|||||||
func dailySummaryTableName(t time.Time) (string, error) {
|
func dailySummaryTableName(t time.Time) (string, error) {
|
||||||
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
|
||||||
|
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
|
||||||
|
// as closely as possible while improving CPU utilization on multi-core hosts.
|
||||||
|
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
|
||||||
|
jobStart := time.Now()
|
||||||
|
dbConn := c.Database.DB()
|
||||||
|
|
||||||
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
|
||||||
|
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
||||||
|
if len(hourlySnapshots) == 0 {
|
||||||
|
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
||||||
|
}
|
||||||
|
|
||||||
|
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
||||||
|
for _, snapshot := range hourlySnapshots {
|
||||||
|
hourlyTables = append(hourlyTables, snapshot.TableName)
|
||||||
|
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
|
||||||
|
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear existing summary if forcing.
|
||||||
|
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
||||||
|
return err
|
||||||
|
} else if rowsExist && !force {
|
||||||
|
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
|
||||||
|
return nil
|
||||||
|
} else if rowsExist && force {
|
||||||
|
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
totalSamples := len(hourlyTables)
|
||||||
|
aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(aggMap) == 0 {
|
||||||
|
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert aggregated rows.
|
||||||
|
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
|
||||||
|
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
||||||
|
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
||||||
|
}
|
||||||
|
|
||||||
|
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
||||||
|
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
||||||
|
}
|
||||||
|
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
||||||
|
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
||||||
|
}
|
||||||
|
if err := c.generateReport(ctx, summaryTable); err != nil {
|
||||||
|
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type dailyAggKey struct {
|
||||||
|
Vcenter string
|
||||||
|
VmId string
|
||||||
|
VmUuid string
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
type dailyAggVal struct {
|
||||||
|
key dailyAggKey
|
||||||
|
resourcePool string
|
||||||
|
datacenter string
|
||||||
|
cluster string
|
||||||
|
folder string
|
||||||
|
isTemplate string
|
||||||
|
poweredOn string
|
||||||
|
srmPlaceholder string
|
||||||
|
creation int64
|
||||||
|
firstSeen int64
|
||||||
|
lastSeen int64
|
||||||
|
sumVcpu int64
|
||||||
|
sumRam int64
|
||||||
|
sumDisk float64
|
||||||
|
samples int64
|
||||||
|
tinHits int64
|
||||||
|
bronzeHits int64
|
||||||
|
silverHits int64
|
||||||
|
goldHits int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) {
|
||||||
|
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
|
||||||
|
mu := sync.Mutex{}
|
||||||
|
workers := runtime.NumCPU()
|
||||||
|
if workers < 2 {
|
||||||
|
workers = 2
|
||||||
|
}
|
||||||
|
if workers > len(snapshots) {
|
||||||
|
workers = len(snapshots)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs := make(chan report.SnapshotRecord, len(snapshots))
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for snap := range jobs {
|
||||||
|
rows, err := c.scanHourlyTable(ctx, snap)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
for k, v := range rows {
|
||||||
|
if existing, ok := agg[k]; ok {
|
||||||
|
mergeDailyAgg(existing, v)
|
||||||
|
} else {
|
||||||
|
agg[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
for _, snap := range snapshots {
|
||||||
|
jobs <- snap
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
wg.Wait()
|
||||||
|
return agg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mergeDailyAgg(dst, src *dailyAggVal) {
|
||||||
|
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
||||||
|
dst.creation = src.creation
|
||||||
|
}
|
||||||
|
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
|
||||||
|
dst.firstSeen = src.firstSeen
|
||||||
|
}
|
||||||
|
if src.lastSeen > dst.lastSeen {
|
||||||
|
dst.lastSeen = src.lastSeen
|
||||||
|
dst.resourcePool = src.resourcePool
|
||||||
|
dst.datacenter = src.datacenter
|
||||||
|
dst.cluster = src.cluster
|
||||||
|
dst.folder = src.folder
|
||||||
|
dst.isTemplate = src.isTemplate
|
||||||
|
dst.poweredOn = src.poweredOn
|
||||||
|
dst.srmPlaceholder = src.srmPlaceholder
|
||||||
|
}
|
||||||
|
dst.sumVcpu += src.sumVcpu
|
||||||
|
dst.sumRam += src.sumRam
|
||||||
|
dst.sumDisk += src.sumDisk
|
||||||
|
dst.samples += src.samples
|
||||||
|
dst.tinHits += src.tinHits
|
||||||
|
dst.bronzeHits += src.bronzeHits
|
||||||
|
dst.silverHits += src.silverHits
|
||||||
|
dst.goldHits += src.goldHits
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
||||||
|
dbConn := c.Database.DB()
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT
|
||||||
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
||||||
|
COALESCE("ProvisionedDisk",0) AS disk,
|
||||||
|
COALESCE("VcpuCount",0) AS vcpu,
|
||||||
|
COALESCE("RamGB",0) AS ram,
|
||||||
|
COALESCE("CreationTime",0) AS creation,
|
||||||
|
COALESCE("DeletionTime",0) AS deletion,
|
||||||
|
COALESCE("IsTemplate",'FALSE') AS is_template,
|
||||||
|
COALESCE("PoweredOn",'FALSE') AS powered_on,
|
||||||
|
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
|
||||||
|
COALESCE("SnapshotTime",0) AS snapshot_time
|
||||||
|
FROM %s
|
||||||
|
`, snap.TableName)
|
||||||
|
rows, err := dbConn.QueryxContext(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
out := make(map[dailyAggKey]*dailyAggVal, 256)
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
name, vcenter, vmId, vmUuid, resourcePool string
|
||||||
|
dc, cluster, folder sql.NullString
|
||||||
|
disk sql.NullFloat64
|
||||||
|
vcpu, ram sql.NullInt64
|
||||||
|
creation, deletion, snapshotTime sql.NullInt64
|
||||||
|
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if vcenter == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Skip templates.
|
||||||
|
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
key := dailyAggKey{
|
||||||
|
Vcenter: vcenter,
|
||||||
|
VmId: strings.TrimSpace(vmId),
|
||||||
|
VmUuid: strings.TrimSpace(vmUuid),
|
||||||
|
Name: strings.TrimSpace(name),
|
||||||
|
}
|
||||||
|
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if key.VmId == "" {
|
||||||
|
key.VmId = key.VmUuid
|
||||||
|
}
|
||||||
|
pool := strings.ToLower(strings.TrimSpace(resourcePool))
|
||||||
|
hitTin := btoi(pool == "tin")
|
||||||
|
hitBronze := btoi(pool == "bronze")
|
||||||
|
hitSilver := btoi(pool == "silver")
|
||||||
|
hitGold := btoi(pool == "gold")
|
||||||
|
row := &dailyAggVal{
|
||||||
|
key: key,
|
||||||
|
resourcePool: resourcePool,
|
||||||
|
datacenter: dc.String,
|
||||||
|
cluster: cluster.String,
|
||||||
|
folder: folder.String,
|
||||||
|
isTemplate: isTemplate.String,
|
||||||
|
poweredOn: poweredOn.String,
|
||||||
|
srmPlaceholder: srmPlaceholder.String,
|
||||||
|
creation: int64OrZero(creation),
|
||||||
|
firstSeen: int64OrZero(snapshotTime),
|
||||||
|
lastSeen: int64OrZero(snapshotTime),
|
||||||
|
sumVcpu: vcpu.Int64,
|
||||||
|
sumRam: ram.Int64,
|
||||||
|
sumDisk: disk.Float64,
|
||||||
|
samples: 1,
|
||||||
|
tinHits: hitTin,
|
||||||
|
bronzeHits: hitBronze,
|
||||||
|
silverHits: hitSilver,
|
||||||
|
goldHits: hitGold,
|
||||||
|
}
|
||||||
|
out[key] = row
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
||||||
|
dbConn := c.Database.DB()
|
||||||
|
tx, err := dbConn.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
driver := strings.ToLower(dbConn.DriverName())
|
||||||
|
placeholders := makePlaceholders(driver, 29)
|
||||||
|
insert := fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (
|
||||||
|
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
||||||
|
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
|
||||||
|
"CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
|
||||||
|
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
|
||||||
|
) VALUES (%s)
|
||||||
|
`, table, placeholders)
|
||||||
|
|
||||||
|
for _, v := range agg {
|
||||||
|
if v.samples == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
|
||||||
|
avgRam := float64(v.sumRam) / float64(v.samples)
|
||||||
|
avgDisk := v.sumDisk / float64(v.samples)
|
||||||
|
total := float64(totalSamples)
|
||||||
|
avgPresent := 0.0
|
||||||
|
tinPct := 0.0
|
||||||
|
bronzePct := 0.0
|
||||||
|
silverPct := 0.0
|
||||||
|
goldPct := 0.0
|
||||||
|
if total > 0 {
|
||||||
|
avgPresent = float64(v.samples) / total
|
||||||
|
tinPct = float64(v.tinHits) * 100 / total
|
||||||
|
bronzePct = float64(v.bronzeHits) * 100 / total
|
||||||
|
silverPct = float64(v.silverHits) * 100 / total
|
||||||
|
goldPct = float64(v.goldHits) * 100 / total
|
||||||
|
}
|
||||||
|
args := []interface{}{
|
||||||
|
v.key.Name,
|
||||||
|
v.key.Vcenter,
|
||||||
|
nullIfEmpty(v.key.VmId),
|
||||||
|
nullIfEmpty(v.key.VmUuid),
|
||||||
|
v.resourcePool,
|
||||||
|
nullIfEmpty(v.datacenter),
|
||||||
|
nullIfEmpty(v.cluster),
|
||||||
|
nullIfEmpty(v.folder),
|
||||||
|
v.sumDisk,
|
||||||
|
v.sumVcpu,
|
||||||
|
v.sumRam,
|
||||||
|
v.isTemplate,
|
||||||
|
v.poweredOn,
|
||||||
|
v.srmPlaceholder,
|
||||||
|
v.creation,
|
||||||
|
int64(0), // deletion time refined later
|
||||||
|
v.samples,
|
||||||
|
avgVcpu,
|
||||||
|
avgRam,
|
||||||
|
avgDisk,
|
||||||
|
avgPresent,
|
||||||
|
tinPct, bronzePct, silverPct, goldPct,
|
||||||
|
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
|
||||||
|
}
|
||||||
|
if driver != "sqlite" {
|
||||||
|
// Postgres expects primitive types, nulls are handled by pq via nil.
|
||||||
|
for i, a := range args {
|
||||||
|
if s, ok := a.(string); ok && s == "" {
|
||||||
|
args[i] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func int64OrZero(v sql.NullInt64) int64 {
|
||||||
|
if v.Valid {
|
||||||
|
return v.Int64
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func nullIfEmpty(s string) interface{} {
|
||||||
|
if strings.TrimSpace(s) == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func makePlaceholders(driver string, n int) string {
|
||||||
|
if driver == "sqlite" {
|
||||||
|
parts := make([]string, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
parts[i] = "?"
|
||||||
|
}
|
||||||
|
return strings.Join(parts, ",")
|
||||||
|
}
|
||||||
|
parts := make([]string, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
parts[i] = fmt.Sprintf("$%d", i+1)
|
||||||
|
}
|
||||||
|
return strings.Join(parts, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
func btoi(b bool) int64 {
|
||||||
|
if b {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user