740 lines
27 KiB
Go
740 lines
27 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"vctp/db"
|
|
"vctp/internal/metrics"
|
|
"vctp/internal/report"
|
|
)
|
|
|
|
// RunVcenterMonthlyAggregate summarizes the previous month's daily snapshots.
|
|
func (c *CronTask) RunVcenterMonthlyAggregate(ctx context.Context, logger *slog.Logger) (err error) {
|
|
jobTimeout := durationFromSeconds(c.Settings.Values.Settings.MonthlyJobTimeoutSeconds, 20*time.Minute)
|
|
return c.runAggregateJob(ctx, "monthly_aggregate", jobTimeout, func(jobCtx context.Context) error {
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
logger.Info("Monthly summary job finished", "duration", time.Since(startedAt))
|
|
}()
|
|
now := time.Now()
|
|
firstOfThisMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
|
|
targetMonth := firstOfThisMonth.AddDate(0, -1, 0)
|
|
return c.aggregateMonthlySummary(jobCtx, targetMonth, false)
|
|
})
|
|
}
|
|
|
|
func (c *CronTask) AggregateMonthlySummary(ctx context.Context, month time.Time, force bool) error {
|
|
return c.aggregateMonthlySummary(ctx, month, force)
|
|
}
|
|
|
|
func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time.Time, force bool) error {
|
|
jobStart := time.Now()
|
|
if err := report.EnsureSnapshotRegistry(ctx, c.Database); err != nil {
|
|
return err
|
|
}
|
|
|
|
granularity := strings.ToLower(strings.TrimSpace(c.Settings.Values.Settings.MonthlyAggregationGranularity))
|
|
if granularity == "" {
|
|
granularity = "hourly"
|
|
}
|
|
if granularity != "hourly" && granularity != "daily" {
|
|
c.Logger.Warn("unknown monthly aggregation granularity; defaulting to hourly", "granularity", granularity)
|
|
granularity = "hourly"
|
|
}
|
|
|
|
monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, targetMonth.Location())
|
|
monthEnd := monthStart.AddDate(0, 1, 0)
|
|
dbConn := c.Database.DB()
|
|
db.SetPostgresWorkMem(ctx, dbConn, c.Settings.Values.Settings.PostgresWorkMemMB)
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
useGoAgg := os.Getenv("MONTHLY_AGG_GO") == "1"
|
|
if !useGoAgg && granularity == "hourly" && driver == "sqlite" {
|
|
c.Logger.Warn("SQL monthly aggregation is slow on sqlite; overriding to Go path", "granularity", granularity)
|
|
useGoAgg = true
|
|
}
|
|
|
|
var snapshots []report.SnapshotRecord
|
|
var unionColumns []string
|
|
if granularity == "daily" {
|
|
dailySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "daily", "inventory_daily_summary_", "20060102", monthStart, monthEnd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dailySnapshots = filterRecordsInRange(dailySnapshots, monthStart, monthEnd)
|
|
dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots)
|
|
snapshots = dailySnapshots
|
|
unionColumns = monthlyUnionColumns
|
|
} else {
|
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", monthStart, monthEnd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hourlySnapshots = filterRecordsInRange(hourlySnapshots, monthStart, monthEnd)
|
|
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
|
snapshots = hourlySnapshots
|
|
unionColumns = summaryUnionColumns
|
|
}
|
|
if len(snapshots) == 0 {
|
|
return fmt.Errorf("no %s snapshot tables found for %s", granularity, targetMonth.Format("2006-01"))
|
|
}
|
|
|
|
monthlyTable, err := monthlySummaryTableName(targetMonth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := db.EnsureSummaryTable(ctx, dbConn, monthlyTable); err != nil {
|
|
return err
|
|
}
|
|
if rowsExist, err := db.TableHasRows(ctx, dbConn, monthlyTable); err != nil {
|
|
return err
|
|
} else if rowsExist && !force {
|
|
c.Logger.Debug("Monthly summary already exists, skipping aggregation", "summary_table", monthlyTable)
|
|
return nil
|
|
} else if rowsExist && force {
|
|
if err := clearTable(ctx, dbConn, monthlyTable); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Optional Go-based aggregation path.
|
|
if useGoAgg {
|
|
if granularity == "daily" {
|
|
c.Logger.Debug("Using go implementation of monthly aggregation (daily)")
|
|
if err := c.aggregateMonthlySummaryGo(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
|
|
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
|
|
} else {
|
|
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
|
|
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
|
|
return nil
|
|
}
|
|
} else if granularity == "hourly" {
|
|
c.Logger.Debug("Using go implementation of monthly aggregation (hourly)")
|
|
if err := c.aggregateMonthlySummaryGoHourly(ctx, monthStart, monthEnd, monthlyTable, snapshots); err != nil {
|
|
c.Logger.Warn("go-based monthly aggregation failed, falling back to SQL path", "error", err)
|
|
} else {
|
|
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
|
|
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", monthlyTable)
|
|
return nil
|
|
}
|
|
} else {
|
|
c.Logger.Warn("MONTHLY_AGG_GO is set but granularity is unsupported; using SQL path", "granularity", granularity)
|
|
}
|
|
}
|
|
|
|
tables := make([]string, 0, len(snapshots))
|
|
for _, snapshot := range snapshots {
|
|
tables = append(tables, snapshot.TableName)
|
|
}
|
|
unionQuery, err := buildUnionQuery(tables, unionColumns, templateExclusionFilter())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
monthlyTotals, err := db.SnapshotTotalsForUnion(ctx, dbConn, unionQuery)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to calculate monthly totals", "error", err, "month", targetMonth.Format("2006-01"))
|
|
} else {
|
|
c.Logger.Info("Monthly snapshot totals",
|
|
"month", targetMonth.Format("2006-01"),
|
|
"vm_count", monthlyTotals.VmCount,
|
|
"vcpu_total", monthlyTotals.VcpuTotal,
|
|
"ram_total_gb", monthlyTotals.RamTotal,
|
|
"disk_total_gb", monthlyTotals.DiskTotal,
|
|
)
|
|
}
|
|
|
|
var insertQuery string
|
|
if granularity == "daily" {
|
|
insertQuery, err = db.BuildMonthlySummaryInsert(monthlyTable, unionQuery)
|
|
} else {
|
|
insertQuery, err = db.BuildDailySummaryInsert(monthlyTable, unionQuery)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
|
|
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
|
|
return err
|
|
}
|
|
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary", "error", err, "table", monthlyTable)
|
|
} else {
|
|
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied)
|
|
}
|
|
if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, monthlyTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window", "error", err, "table", monthlyTable)
|
|
}
|
|
rowCount, err := db.TableRowCount(ctx, dbConn, monthlyTable)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", monthlyTable)
|
|
}
|
|
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", monthlyTable, targetMonth, rowCount); err != nil {
|
|
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", monthlyTable)
|
|
}
|
|
|
|
db.AnalyzeTableIfPostgres(ctx, dbConn, monthlyTable)
|
|
|
|
if err := c.generateReport(ctx, monthlyTable); err != nil {
|
|
c.Logger.Warn("failed to generate monthly report", "error", err, "table", monthlyTable)
|
|
metrics.RecordMonthlyAggregation(time.Since(jobStart), err)
|
|
return err
|
|
}
|
|
|
|
c.Logger.Debug("Finished monthly inventory aggregation", "summary_table", monthlyTable)
|
|
metrics.RecordMonthlyAggregation(time.Since(jobStart), nil)
|
|
return nil
|
|
}
|
|
|
|
func monthlySummaryTableName(t time.Time) (string, error) {
|
|
return db.SafeTableName(fmt.Sprintf("inventory_monthly_summary_%s", t.Format("200601")))
|
|
}
|
|
|
|
// aggregateMonthlySummaryGoHourly aggregates hourly snapshots directly into the monthly summary table.
|
|
func (c *CronTask) aggregateMonthlySummaryGoHourly(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, hourlySnapshots []report.SnapshotRecord) error {
|
|
jobStart := time.Now()
|
|
dbConn := c.Database.DB()
|
|
|
|
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(hourlySnapshots) == 0 {
|
|
return fmt.Errorf("no hourly snapshot tables found for %s", monthStart.Format("2006-01"))
|
|
}
|
|
|
|
totalSamples := len(hourlySnapshots)
|
|
var (
|
|
aggMap map[dailyAggKey]*dailyAggVal
|
|
snapTimes []int64
|
|
)
|
|
|
|
if db.TableExists(ctx, dbConn, "vm_hourly_stats") {
|
|
cacheAgg, cacheTimes, cacheErr := c.scanHourlyCache(ctx, monthStart, monthEnd)
|
|
if cacheErr != nil {
|
|
c.Logger.Warn("failed to use hourly cache, falling back to table scans", "error", cacheErr)
|
|
} else if len(cacheAgg) > 0 {
|
|
c.Logger.Debug("using hourly cache for monthly aggregation", "month", monthStart.Format("2006-01"), "snapshots", len(cacheTimes), "vm_count", len(cacheAgg))
|
|
aggMap = cacheAgg
|
|
snapTimes = cacheTimes
|
|
totalSamples = len(cacheTimes)
|
|
}
|
|
}
|
|
|
|
if aggMap == nil {
|
|
var errScan error
|
|
aggMap, errScan = c.scanHourlyTablesParallel(ctx, hourlySnapshots)
|
|
if errScan != nil {
|
|
return errScan
|
|
}
|
|
c.Logger.Debug("scanned hourly tables for monthly aggregation", "month", monthStart.Format("2006-01"), "tables", len(hourlySnapshots), "vm_count", len(aggMap))
|
|
if len(aggMap) == 0 {
|
|
return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01"))
|
|
}
|
|
|
|
snapTimes = make([]int64, 0, len(hourlySnapshots))
|
|
for _, snap := range hourlySnapshots {
|
|
snapTimes = append(snapTimes, snap.SnapshotTime.Unix())
|
|
}
|
|
sort.Slice(snapTimes, func(i, j int) bool { return snapTimes[i] < snapTimes[j] })
|
|
}
|
|
|
|
lifecycleDeletions := c.applyLifecycleDeletions(ctx, aggMap, monthStart, monthEnd)
|
|
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", lifecycleDeletions)
|
|
|
|
inventoryDeletions := c.applyInventoryDeletions(ctx, aggMap, monthStart, monthEnd)
|
|
c.Logger.Info("Monthly aggregation deletion times", "source_inventory", inventoryDeletions)
|
|
|
|
if len(snapTimes) > 0 {
|
|
maxSnap := snapTimes[len(snapTimes)-1]
|
|
inferredDeletions := 0
|
|
for _, v := range aggMap {
|
|
if v.deletion != 0 {
|
|
continue
|
|
}
|
|
consecutiveMisses := 0
|
|
firstMiss := int64(0)
|
|
for _, t := range snapTimes {
|
|
if t <= v.lastSeen {
|
|
continue
|
|
}
|
|
if _, ok := v.seen[t]; ok {
|
|
consecutiveMisses = 0
|
|
firstMiss = 0
|
|
continue
|
|
}
|
|
consecutiveMisses++
|
|
if firstMiss == 0 {
|
|
firstMiss = t
|
|
}
|
|
if consecutiveMisses >= 2 {
|
|
v.deletion = firstMiss
|
|
inferredDeletions++
|
|
break
|
|
}
|
|
}
|
|
if v.deletion == 0 && v.lastSeen < maxSnap && firstMiss > 0 {
|
|
c.Logger.Debug("pending deletion inference (insufficient consecutive misses)", "vm_id", v.key.VmId, "vm_uuid", v.key.VmUuid, "name", v.key.Name, "last_seen", v.lastSeen, "first_missing_snapshot", firstMiss)
|
|
}
|
|
}
|
|
c.Logger.Info("Monthly aggregation deletion times", "source_inferred", inferredDeletions)
|
|
}
|
|
|
|
totalSamplesByVcenter := sampleCountsByVcenter(aggMap)
|
|
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil {
|
|
return err
|
|
}
|
|
if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go hourly)", "error", err, "table", summaryTable)
|
|
}
|
|
|
|
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
|
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to count monthly summary rows (Go hourly)", "error", err, "table", summaryTable)
|
|
}
|
|
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil {
|
|
c.Logger.Warn("failed to register monthly snapshot (Go hourly)", "error", err, "table", summaryTable)
|
|
}
|
|
if err := c.generateReport(ctx, summaryTable); err != nil {
|
|
c.Logger.Warn("failed to generate monthly report (Go hourly)", "error", err, "table", summaryTable)
|
|
return err
|
|
}
|
|
|
|
c.Logger.Debug("Finished monthly inventory aggregation (Go hourly)",
|
|
"summary_table", summaryTable,
|
|
"duration", time.Since(jobStart),
|
|
"tables_scanned", len(hourlySnapshots),
|
|
"rows_written", rowCount,
|
|
"total_samples", totalSamples,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// aggregateMonthlySummaryGo mirrors the SQL-based monthly aggregation but performs the work in Go,
|
|
// reading daily summaries in parallel and reducing them to a single monthly summary table.
|
|
func (c *CronTask) aggregateMonthlySummaryGo(ctx context.Context, monthStart, monthEnd time.Time, summaryTable string, dailySnapshots []report.SnapshotRecord) error {
|
|
jobStart := time.Now()
|
|
dbConn := c.Database.DB()
|
|
|
|
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build union query for lifecycle refinement after inserts.
|
|
dailyTables := make([]string, 0, len(dailySnapshots))
|
|
for _, snapshot := range dailySnapshots {
|
|
dailyTables = append(dailyTables, snapshot.TableName)
|
|
}
|
|
unionQuery, err := buildUnionQuery(dailyTables, monthlyUnionColumns, templateExclusionFilter())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
aggMap, err := c.scanDailyTablesParallel(ctx, dailySnapshots)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(aggMap) == 0 {
|
|
cacheAgg, cacheErr := c.scanDailyRollup(ctx, monthStart, monthEnd)
|
|
if cacheErr == nil && len(cacheAgg) > 0 {
|
|
aggMap = cacheAgg
|
|
} else if cacheErr != nil {
|
|
c.Logger.Warn("failed to read daily rollup cache; using table scan", "error", cacheErr)
|
|
}
|
|
}
|
|
if len(aggMap) == 0 {
|
|
return fmt.Errorf("no VM records aggregated for %s", monthStart.Format("2006-01"))
|
|
}
|
|
|
|
if err := c.insertMonthlyAggregates(ctx, summaryTable, aggMap); err != nil {
|
|
return err
|
|
}
|
|
|
|
if applied, err := db.ApplyLifecycleDeletionToSummary(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
c.Logger.Warn("failed to apply lifecycle deletions to monthly summary (Go)", "error", err, "table", summaryTable)
|
|
} else {
|
|
c.Logger.Info("Monthly aggregation deletion times", "source_lifecycle_cache", applied)
|
|
}
|
|
|
|
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
|
c.Logger.Warn("failed to refine creation/deletion times (monthly Go)", "error", err, "table", summaryTable)
|
|
}
|
|
if err := db.UpdateSummaryPresenceByWindow(ctx, dbConn, summaryTable, monthStart.Unix(), monthEnd.Unix()); err != nil {
|
|
c.Logger.Warn("failed to update monthly AvgIsPresent from lifecycle window (Go)", "error", err, "table", summaryTable)
|
|
}
|
|
|
|
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
|
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
|
if err != nil {
|
|
c.Logger.Warn("unable to count monthly summary rows", "error", err, "table", summaryTable)
|
|
}
|
|
if err := report.RegisterSnapshot(ctx, c.Database, "monthly", summaryTable, monthStart, rowCount); err != nil {
|
|
c.Logger.Warn("failed to register monthly snapshot", "error", err, "table", summaryTable)
|
|
}
|
|
if err := c.generateReport(ctx, summaryTable); err != nil {
|
|
c.Logger.Warn("failed to generate monthly report (Go)", "error", err, "table", summaryTable)
|
|
return err
|
|
}
|
|
|
|
c.Logger.Debug("Finished monthly inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
|
|
return nil
|
|
}
|
|
|
|
func (c *CronTask) scanDailyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
|
|
agg := make(map[monthlyAggKey]*monthlyAggVal, 1024)
|
|
mu := sync.Mutex{}
|
|
workers := runtime.NumCPU()
|
|
if workers < 2 {
|
|
workers = 2
|
|
}
|
|
if workers > len(snapshots) {
|
|
workers = len(snapshots)
|
|
}
|
|
|
|
jobs := make(chan report.SnapshotRecord, len(snapshots))
|
|
wg := sync.WaitGroup{}
|
|
for i := 0; i < workers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for snap := range jobs {
|
|
rows, err := c.scanDailyTable(ctx, snap)
|
|
if err != nil {
|
|
c.Logger.Warn("failed to scan daily summary", "table", snap.TableName, "error", err)
|
|
continue
|
|
}
|
|
mu.Lock()
|
|
for k, v := range rows {
|
|
if existing, ok := agg[k]; ok {
|
|
mergeMonthlyAgg(existing, v)
|
|
} else {
|
|
agg[k] = v
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
for _, snap := range snapshots {
|
|
jobs <- snap
|
|
}
|
|
close(jobs)
|
|
wg.Wait()
|
|
return agg, nil
|
|
}
|
|
|
|
func mergeMonthlyAgg(dst, src *monthlyAggVal) {
|
|
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
|
dst.creation = src.creation
|
|
}
|
|
// If creation is unknown in all daily summaries, leave it zero for reports (VM trace handles approximation separately).
|
|
if src.deletion > 0 && (dst.deletion == 0 || src.deletion < dst.deletion) {
|
|
dst.deletion = src.deletion
|
|
}
|
|
if src.lastSnapshot.After(dst.lastSnapshot) {
|
|
dst.lastSnapshot = src.lastSnapshot
|
|
if src.inventoryId != 0 {
|
|
dst.inventoryId = src.inventoryId
|
|
}
|
|
dst.resourcePool = src.resourcePool
|
|
dst.datacenter = src.datacenter
|
|
dst.cluster = src.cluster
|
|
dst.folder = src.folder
|
|
dst.isTemplate = src.isTemplate
|
|
dst.poweredOn = src.poweredOn
|
|
dst.srmPlaceholder = src.srmPlaceholder
|
|
dst.provisioned = src.provisioned
|
|
dst.vcpuCount = src.vcpuCount
|
|
dst.ramGB = src.ramGB
|
|
dst.eventKey = src.eventKey
|
|
dst.cloudId = src.cloudId
|
|
}
|
|
|
|
dst.samplesPresent += src.samplesPresent
|
|
dst.totalSamples += src.totalSamples
|
|
dst.sumVcpu += src.sumVcpu
|
|
dst.sumRam += src.sumRam
|
|
dst.sumDisk += src.sumDisk
|
|
dst.tinWeighted += src.tinWeighted
|
|
dst.bronzeWeighted += src.bronzeWeighted
|
|
dst.silverWeighted += src.silverWeighted
|
|
dst.goldWeighted += src.goldWeighted
|
|
}
|
|
|
|
func (c *CronTask) scanDailyTable(ctx context.Context, snap report.SnapshotRecord) (map[monthlyAggKey]*monthlyAggVal, error) {
|
|
dbConn := c.Database.DB()
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
"InventoryId",
|
|
"Name","Vcenter","VmId","VmUuid","EventKey","CloudId","ResourcePool","Datacenter","Cluster","Folder",
|
|
COALESCE("ProvisionedDisk",0) AS disk,
|
|
COALESCE("VcpuCount",0) AS vcpu,
|
|
COALESCE("RamGB",0) AS ram,
|
|
COALESCE("CreationTime",0) AS creation,
|
|
COALESCE("DeletionTime",0) AS deletion,
|
|
COALESCE("SamplesPresent",0) AS samples_present,
|
|
"AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","AvgIsPresent",
|
|
"PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct",
|
|
"Tin","Bronze","Silver","Gold","IsTemplate","PoweredOn","SrmPlaceholder"
|
|
FROM %s
|
|
`, snap.TableName)
|
|
|
|
rows, err := dbConn.QueryxContext(ctx, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
result := make(map[monthlyAggKey]*monthlyAggVal, 256)
|
|
for rows.Next() {
|
|
var (
|
|
inventoryId sql.NullInt64
|
|
name, vcenter, vmId, vmUuid string
|
|
eventKey, cloudId sql.NullString
|
|
resourcePool, datacenter, cluster, folder sql.NullString
|
|
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
|
disk, avgVcpu, avgRam, avgDisk sql.NullFloat64
|
|
avgIsPresent sql.NullFloat64
|
|
poolTin, poolBronze, poolSilver, poolGold sql.NullFloat64
|
|
tinPct, bronzePct, silverPct, goldPct sql.NullFloat64
|
|
vcpu, ram sql.NullInt64
|
|
creation, deletion sql.NullInt64
|
|
samplesPresent sql.NullInt64
|
|
)
|
|
|
|
if err := rows.Scan(
|
|
&inventoryId,
|
|
&name, &vcenter, &vmId, &vmUuid, &eventKey, &cloudId, &resourcePool, &datacenter, &cluster, &folder,
|
|
&disk, &vcpu, &ram, &creation, &deletion, &samplesPresent,
|
|
&avgVcpu, &avgRam, &avgDisk, &avgIsPresent,
|
|
&poolTin, &poolBronze, &poolSilver, &poolGold,
|
|
&tinPct, &bronzePct, &silverPct, &goldPct,
|
|
&isTemplate, &poweredOn, &srmPlaceholder,
|
|
); err != nil {
|
|
c.Logger.Warn("failed to scan daily summary row", "table", snap.TableName, "error", err)
|
|
continue
|
|
}
|
|
|
|
templateVal := strings.TrimSpace(isTemplate.String)
|
|
if strings.EqualFold(templateVal, "true") || templateVal == "1" {
|
|
continue
|
|
}
|
|
|
|
key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}
|
|
agg := &monthlyAggVal{
|
|
key: key,
|
|
inventoryId: inventoryId.Int64,
|
|
eventKey: eventKey.String,
|
|
cloudId: cloudId.String,
|
|
resourcePool: resourcePool.String,
|
|
datacenter: datacenter.String,
|
|
cluster: cluster.String,
|
|
folder: folder.String,
|
|
isTemplate: isTemplate.String,
|
|
poweredOn: poweredOn.String,
|
|
srmPlaceholder: srmPlaceholder.String,
|
|
provisioned: disk.Float64,
|
|
vcpuCount: vcpu.Int64,
|
|
ramGB: ram.Int64,
|
|
creation: creation.Int64,
|
|
deletion: deletion.Int64,
|
|
lastSnapshot: snap.SnapshotTime,
|
|
samplesPresent: samplesPresent.Int64,
|
|
}
|
|
|
|
totalSamplesDay := float64(samplesPresent.Int64)
|
|
if avgIsPresent.Valid && avgIsPresent.Float64 > 0 {
|
|
totalSamplesDay = float64(samplesPresent.Int64) / avgIsPresent.Float64
|
|
}
|
|
agg.totalSamples = totalSamplesDay
|
|
if avgVcpu.Valid {
|
|
agg.sumVcpu = avgVcpu.Float64 * totalSamplesDay
|
|
}
|
|
if avgRam.Valid {
|
|
agg.sumRam = avgRam.Float64 * totalSamplesDay
|
|
}
|
|
if avgDisk.Valid {
|
|
agg.sumDisk = avgDisk.Float64 * totalSamplesDay
|
|
}
|
|
if poolTin.Valid {
|
|
agg.tinWeighted = (poolTin.Float64 / 100.0) * totalSamplesDay
|
|
}
|
|
if poolBronze.Valid {
|
|
agg.bronzeWeighted = (poolBronze.Float64 / 100.0) * totalSamplesDay
|
|
}
|
|
if poolSilver.Valid {
|
|
agg.silverWeighted = (poolSilver.Float64 / 100.0) * totalSamplesDay
|
|
}
|
|
if poolGold.Valid {
|
|
agg.goldWeighted = (poolGold.Float64 / 100.0) * totalSamplesDay
|
|
}
|
|
|
|
result[key] = agg
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// scanDailyRollup aggregates monthly data from vm_daily_rollup cache.
|
|
func (c *CronTask) scanDailyRollup(ctx context.Context, start, end time.Time) (map[monthlyAggKey]*monthlyAggVal, error) {
|
|
dbConn := c.Database.DB()
|
|
if !db.TableExists(ctx, dbConn, "vm_daily_rollup") {
|
|
return map[monthlyAggKey]*monthlyAggVal{}, nil
|
|
}
|
|
query := `
|
|
SELECT
|
|
"Date","Vcenter","VmId","VmUuid","Name","CreationTime","DeletionTime",
|
|
"SamplesPresent","TotalSamples","SumVcpu","SumRam","SumDisk",
|
|
"TinHits","BronzeHits","SilverHits","GoldHits",
|
|
"LastResourcePool","LastDatacenter","LastCluster","LastFolder",
|
|
"LastProvisionedDisk","LastVcpuCount","LastRamGB","IsTemplate","PoweredOn","SrmPlaceholder"
|
|
FROM vm_daily_rollup
|
|
WHERE "Date" >= ? AND "Date" < ?
|
|
`
|
|
bind := dbConn.Rebind(query)
|
|
rows, err := dbConn.QueryxContext(ctx, bind, start.Unix(), end.Unix())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
agg := make(map[monthlyAggKey]*monthlyAggVal, 512)
|
|
for rows.Next() {
|
|
var (
|
|
date sql.NullInt64
|
|
vcenter, vmId, vmUuid, name string
|
|
creation, deletion sql.NullInt64
|
|
samplesPresent, totalSamples sql.NullInt64
|
|
sumVcpu, sumRam, sumDisk sql.NullFloat64
|
|
tinHits, bronzeHits, silverHits, goldHits sql.NullInt64
|
|
lastPool, lastDc, lastCluster, lastFolder sql.NullString
|
|
lastDisk, lastVcpu, lastRam sql.NullFloat64
|
|
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
|
)
|
|
if err := rows.Scan(
|
|
&date, &vcenter, &vmId, &vmUuid, &name, &creation, &deletion,
|
|
&samplesPresent, &totalSamples, &sumVcpu, &sumRam, &sumDisk,
|
|
&tinHits, &bronzeHits, &silverHits, &goldHits,
|
|
&lastPool, &lastDc, &lastCluster, &lastFolder,
|
|
&lastDisk, &lastVcpu, &lastRam, &isTemplate, &poweredOn, &srmPlaceholder,
|
|
); err != nil {
|
|
continue
|
|
}
|
|
templateVal := strings.TrimSpace(isTemplate.String)
|
|
if strings.EqualFold(templateVal, "true") || templateVal == "1" {
|
|
continue
|
|
}
|
|
key := monthlyAggKey{Vcenter: vcenter, VmId: vmId, VmUuid: vmUuid, Name: name}
|
|
val := &monthlyAggVal{
|
|
key: key,
|
|
resourcePool: lastPool.String,
|
|
datacenter: lastDc.String,
|
|
cluster: lastCluster.String,
|
|
folder: lastFolder.String,
|
|
isTemplate: isTemplate.String,
|
|
poweredOn: poweredOn.String,
|
|
srmPlaceholder: srmPlaceholder.String,
|
|
provisioned: lastDisk.Float64,
|
|
vcpuCount: int64(lastVcpu.Float64),
|
|
ramGB: int64(lastRam.Float64),
|
|
creation: creation.Int64,
|
|
deletion: deletion.Int64,
|
|
lastSnapshot: time.Unix(date.Int64, 0),
|
|
samplesPresent: samplesPresent.Int64,
|
|
totalSamples: float64(totalSamples.Int64),
|
|
sumVcpu: sumVcpu.Float64,
|
|
sumRam: sumRam.Float64,
|
|
sumDisk: sumDisk.Float64,
|
|
tinWeighted: float64(tinHits.Int64),
|
|
bronzeWeighted: float64(bronzeHits.Int64),
|
|
silverWeighted: float64(silverHits.Int64),
|
|
goldWeighted: float64(goldHits.Int64),
|
|
}
|
|
if existing, ok := agg[key]; ok {
|
|
mergeMonthlyAgg(existing, val)
|
|
} else {
|
|
agg[key] = val
|
|
}
|
|
}
|
|
return agg, rows.Err()
|
|
}
|
|
|
|
func (c *CronTask) insertMonthlyAggregates(ctx context.Context, summaryTable string, aggMap map[monthlyAggKey]*monthlyAggVal) error {
|
|
dbConn := c.Database.DB()
|
|
columns := []string{
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SamplesPresent",
|
|
"AvgVcpuCount", "AvgRamGB", "AvgProvisionedDisk", "AvgIsPresent",
|
|
"PoolTinPct", "PoolBronzePct", "PoolSilverPct", "PoolGoldPct",
|
|
"Tin", "Bronze", "Silver", "Gold",
|
|
}
|
|
placeholders := make([]string, len(columns))
|
|
for i := range columns {
|
|
placeholders[i] = "?"
|
|
}
|
|
stmtText := fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, summaryTable, strings.Join(columns, ","), strings.Join(placeholders, ","))
|
|
stmtText = dbConn.Rebind(stmtText)
|
|
|
|
tx, err := dbConn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stmt, err := tx.PreparexContext(ctx, stmtText)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, v := range aggMap {
|
|
inventoryVal := sql.NullInt64{}
|
|
if v.inventoryId != 0 {
|
|
inventoryVal = sql.NullInt64{Int64: v.inventoryId, Valid: true}
|
|
}
|
|
avgVcpu := sql.NullFloat64{}
|
|
avgRam := sql.NullFloat64{}
|
|
avgDisk := sql.NullFloat64{}
|
|
avgIsPresent := sql.NullFloat64{}
|
|
tinPct := sql.NullFloat64{}
|
|
bronzePct := sql.NullFloat64{}
|
|
silverPct := sql.NullFloat64{}
|
|
goldPct := sql.NullFloat64{}
|
|
|
|
if v.totalSamples > 0 {
|
|
avgVcpu = sql.NullFloat64{Float64: v.sumVcpu / v.totalSamples, Valid: true}
|
|
avgRam = sql.NullFloat64{Float64: v.sumRam / v.totalSamples, Valid: true}
|
|
avgDisk = sql.NullFloat64{Float64: v.sumDisk / v.totalSamples, Valid: true}
|
|
avgIsPresent = sql.NullFloat64{Float64: float64(v.samplesPresent) / v.totalSamples, Valid: true}
|
|
tinPct = sql.NullFloat64{Float64: 100.0 * v.tinWeighted / v.totalSamples, Valid: true}
|
|
bronzePct = sql.NullFloat64{Float64: 100.0 * v.bronzeWeighted / v.totalSamples, Valid: true}
|
|
silverPct = sql.NullFloat64{Float64: 100.0 * v.silverWeighted / v.totalSamples, Valid: true}
|
|
goldPct = sql.NullFloat64{Float64: 100.0 * v.goldWeighted / v.totalSamples, Valid: true}
|
|
}
|
|
|
|
if _, err := stmt.ExecContext(ctx,
|
|
inventoryVal,
|
|
v.key.Name, v.key.Vcenter, v.key.VmId, v.eventKey, v.cloudId, v.creation, v.deletion,
|
|
v.resourcePool, v.datacenter, v.cluster, v.folder, v.provisioned, v.vcpuCount, v.ramGB,
|
|
v.isTemplate, v.poweredOn, v.srmPlaceholder, v.key.VmUuid, v.samplesPresent,
|
|
avgVcpu, avgRam, avgDisk, avgIsPresent,
|
|
tinPct, bronzePct, silverPct, goldPct,
|
|
tinPct, bronzePct, silverPct, goldPct,
|
|
); err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|