fix daily aggregation sample count
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -1293,7 +1293,9 @@ func BuildDailySummaryInsert(tableName string, unionQuery string) (string, error
|
|||||||
WITH snapshots AS (
|
WITH snapshots AS (
|
||||||
%s
|
%s
|
||||||
), totals AS (
|
), totals AS (
|
||||||
SELECT COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot FROM snapshots
|
SELECT "Vcenter", COUNT(DISTINCT "SnapshotTime") AS total_samples, MAX("SnapshotTime") AS max_snapshot
|
||||||
|
FROM snapshots
|
||||||
|
GROUP BY "Vcenter"
|
||||||
), agg AS (
|
), agg AS (
|
||||||
SELECT
|
SELECT
|
||||||
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
||||||
@@ -1449,13 +1451,13 @@ SELECT
|
|||||||
THEN 100.0 * agg.gold_hits / agg.samples_present
|
THEN 100.0 * agg.gold_hits / agg.samples_present
|
||||||
ELSE NULL END AS "Gold"
|
ELSE NULL END AS "Gold"
|
||||||
FROM agg
|
FROM agg
|
||||||
CROSS JOIN totals
|
JOIN totals ON totals."Vcenter" = agg."Vcenter"
|
||||||
GROUP BY
|
GROUP BY
|
||||||
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
||||||
agg."Datacenter", agg."Cluster", agg."Folder",
|
agg."Datacenter", agg."Cluster", agg."Folder",
|
||||||
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
||||||
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
||||||
totals.total_samples;
|
totals.total_samples, totals.max_snapshot;
|
||||||
`, unionQuery, tableName)
|
`, unionQuery, tableName)
|
||||||
return insert, nil
|
return insert, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -361,14 +361,16 @@ LIMIT 1
|
|||||||
}
|
}
|
||||||
c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions)
|
c.Logger.Info("Daily aggregation deletion times", "source_inferred", inferredDeletions)
|
||||||
|
|
||||||
|
totalSamplesByVcenter := sampleCountsByVcenter(aggMap)
|
||||||
|
|
||||||
// Insert aggregated rows.
|
// Insert aggregated rows.
|
||||||
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples, totalSamplesByVcenter); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples)
|
c.Logger.Debug("inserted daily aggregates", "table", summaryTable, "rows", len(aggMap), "total_samples", totalSamples)
|
||||||
|
|
||||||
// Persist rollup cache for monthly aggregation.
|
// Persist rollup cache for monthly aggregation.
|
||||||
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples); err != nil {
|
if err := c.persistDailyRollup(ctx, dayStart.Unix(), aggMap, totalSamples, totalSamplesByVcenter); err != nil {
|
||||||
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
|
c.Logger.Warn("failed to persist daily rollup cache", "error", err, "date", dayStart.Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -736,7 +738,7 @@ WHERE "SnapshotTime" >= ? AND "SnapshotTime" < ?`
|
|||||||
return agg, snapTimes, rows.Err()
|
return agg, snapTimes, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error {
|
||||||
dbConn := c.Database.DB()
|
dbConn := c.Database.DB()
|
||||||
tx, err := dbConn.Beginx()
|
tx, err := dbConn.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -759,7 +761,11 @@ INSERT INTO %s (
|
|||||||
if v.samples == 0 {
|
if v.samples == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
total := float64(totalSamples)
|
vcTotal := totalSamplesByVcenter[v.key.Vcenter]
|
||||||
|
if vcTotal <= 0 {
|
||||||
|
vcTotal = totalSamples
|
||||||
|
}
|
||||||
|
total := float64(vcTotal)
|
||||||
avgVcpu := 0.0
|
avgVcpu := 0.0
|
||||||
avgRam := 0.0
|
avgRam := 0.0
|
||||||
avgDisk := 0.0
|
avgDisk := 0.0
|
||||||
@@ -858,12 +864,16 @@ func btoi(b bool) int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation.
|
// persistDailyRollup stores per-day aggregates into vm_daily_rollup to speed monthly aggregation.
|
||||||
func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg map[dailyAggKey]*dailyAggVal, totalSamples int, totalSamplesByVcenter map[string]int) error {
|
||||||
dbConn := c.Database.DB()
|
dbConn := c.Database.DB()
|
||||||
for _, v := range agg {
|
for _, v := range agg {
|
||||||
if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" {
|
if strings.EqualFold(strings.TrimSpace(v.isTemplate), "true") || v.isTemplate == "1" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
vcTotal := totalSamplesByVcenter[v.key.Vcenter]
|
||||||
|
if vcTotal <= 0 {
|
||||||
|
vcTotal = totalSamples
|
||||||
|
}
|
||||||
row := db.VmDailyRollupRow{
|
row := db.VmDailyRollupRow{
|
||||||
Vcenter: v.key.Vcenter,
|
Vcenter: v.key.Vcenter,
|
||||||
VmId: v.key.VmId,
|
VmId: v.key.VmId,
|
||||||
@@ -872,7 +882,7 @@ func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg ma
|
|||||||
CreationTime: v.creation,
|
CreationTime: v.creation,
|
||||||
DeletionTime: v.deletion,
|
DeletionTime: v.deletion,
|
||||||
SamplesPresent: v.samples,
|
SamplesPresent: v.samples,
|
||||||
TotalSamples: int64(totalSamples),
|
TotalSamples: int64(vcTotal),
|
||||||
SumVcpu: float64(v.sumVcpu),
|
SumVcpu: float64(v.sumVcpu),
|
||||||
SumRam: float64(v.sumRam),
|
SumRam: float64(v.sumRam),
|
||||||
SumDisk: v.sumDisk,
|
SumDisk: v.sumDisk,
|
||||||
@@ -897,3 +907,25 @@ func (c *CronTask) persistDailyRollup(ctx context.Context, dayUnix int64, agg ma
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sampleCountsByVcenter(agg map[dailyAggKey]*dailyAggVal) map[string]int {
|
||||||
|
vcenterTimes := make(map[string]map[int64]struct{}, 8)
|
||||||
|
for _, v := range agg {
|
||||||
|
if v.key.Vcenter == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
set := vcenterTimes[v.key.Vcenter]
|
||||||
|
if set == nil {
|
||||||
|
set = make(map[int64]struct{}, len(v.seen))
|
||||||
|
vcenterTimes[v.key.Vcenter] = set
|
||||||
|
}
|
||||||
|
for t := range v.seen {
|
||||||
|
set[t] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
counts := make(map[string]int, len(vcenterTimes))
|
||||||
|
for vc, set := range vcenterTimes {
|
||||||
|
counts[vc] = len(set)
|
||||||
|
}
|
||||||
|
return counts
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user