improve aggregation logic
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-14 14:10:28 +11:00
parent b9ab34db0a
commit cfc4efee0e
2 changed files with 69 additions and 14 deletions

View File

@@ -664,6 +664,21 @@ func validateTableName(name string) error {
return nil
}
func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, error) {
if err := validateTableName(table); err != nil {
return false, err
}
query := fmt.Sprintf(`SELECT 1 FROM %s LIMIT 1`, table)
var exists int
if err := dbConn.GetContext(ctx, &exists, query); err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
func tableColumns(ctx context.Context, dbConn *sqlx.DB, tableName string) ([]string, error) {
driver := strings.ToLower(dbConn.DriverName())
switch driver {
@@ -908,6 +923,9 @@ func buildHourlyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotR
if err := validateTableName(record.TableName); err != nil {
return nil, err
}
if rowsExist, err := tableHasRows(ctx, dbConn, record.TableName); err != nil || !rowsExist {
continue
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,
@@ -955,6 +973,9 @@ func buildDailyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotRe
if err := validateTableName(record.TableName); err != nil {
return nil, err
}
if rowsExist, err := tableHasRows(ctx, dbConn, record.TableName); err != nil || !rowsExist {
continue
}
query := fmt.Sprintf(`
SELECT
COUNT(DISTINCT "VmId") AS vm_count,

View File

@@ -170,6 +170,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
if err != nil {
return err
}
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
if len(hourlySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
}
@@ -202,6 +203,7 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
prevEnd := dayStart
prevSnapshots, err := report.ListSnapshotsByRange(ctx, c.Database, "hourly", prevStart, prevEnd)
if err == nil && len(prevSnapshots) > 0 {
prevSnapshots = filterSnapshotsWithRows(ctx, dbConn, prevSnapshots)
prevTables := make([]string, 0, len(prevSnapshots))
for _, snapshot := range prevSnapshots {
prevTables = append(prevTables, snapshot.TableName)
@@ -228,6 +230,9 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
}
insertQuery := fmt.Sprintf(`
WITH snapshots AS (
%s
)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
@@ -240,7 +245,16 @@ SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId",
COALESCE(NULLIF("CreationTime", 0), MIN(CASE WHEN "IsPresent" = 'TRUE' THEN "SnapshotTime" END), 0) AS "CreationTime",
"DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
(
SELECT s2."ResourcePool"
FROM snapshots s2
WHERE s2."VmId" = snapshots."VmId"
AND s2."Vcenter" = snapshots."Vcenter"
AND s2."IsPresent" = 'TRUE'
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END) AS "SamplesPresent",
AVG(CASE WHEN "IsPresent" = 'TRUE' AND "VcpuCount" IS NOT NULL THEN "VcpuCount" END) AS "AvgVcpuCount",
@@ -263,14 +277,12 @@ SELECT
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "Silver",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "Gold"
FROM (
%s
) snapshots
FROM snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, summaryTable, unionQuery)
`, unionQuery, summaryTable)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate daily inventory", "error", err, "date", dayStart.Format("2006-01-02"))
@@ -311,6 +323,9 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
if err != nil {
return err
}
dbConn := c.Database.DB()
dailySnapshots = filterSnapshotsWithRows(ctx, dbConn, dailySnapshots)
if len(dailySnapshots) == 0 {
return fmt.Errorf("no hourly snapshot tables found for %s", targetMonth.Format("2006-01"))
}
@@ -320,7 +335,6 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
return err
}
dbConn := c.Database.DB()
if err := ensureMonthlySummaryTable(ctx, dbConn, monthlyTable); err != nil {
return err
}
@@ -349,7 +363,7 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
`"InventoryId"`, `"Name"`, `"Vcenter"`, `"VmId"`, `"EventKey"`, `"CloudId"`, `"CreationTime"`,
`"DeletionTime"`, `"ResourcePool"`, `"Datacenter"`, `"Cluster"`, `"Folder"`,
`"ProvisionedDisk"`, `"VcpuCount"`, `"RamGB"`, `"IsTemplate"`, `"PoweredOn"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"IsPresent"`,
`"SrmPlaceholder"`, `"VmUuid"`, `"SnapshotTime"`, `"IsPresent"`,
}, templateExclusionFilter())
if strings.TrimSpace(unionQuery) == "" {
return fmt.Errorf("no valid daily snapshot tables found for %s", targetMonth.Format("2006-01"))
@@ -369,6 +383,9 @@ func (c *CronTask) aggregateMonthlySummary(ctx context.Context, targetMonth time
}
insertQuery := fmt.Sprintf(`
WITH snapshots AS (
%s
)
INSERT INTO %s (
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
@@ -379,7 +396,16 @@ INSERT INTO %s (
)
SELECT
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
(
SELECT s2."ResourcePool"
FROM snapshots s2
WHERE s2."VmId" = snapshots."VmId"
AND s2."Vcenter" = snapshots."Vcenter"
AND s2."IsPresent" = 'TRUE'
ORDER BY s2."SnapshotTime" DESC
LIMIT 1
) AS "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid",
AVG(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" END) AS "AvgVcpuCount",
AVG(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" END) AS "AvgRamGB",
@@ -401,14 +427,12 @@ SELECT
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "Silver",
100.0 * SUM(CASE WHEN "IsPresent" = 'TRUE' AND LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END)
/ NULLIF(SUM(CASE WHEN "IsPresent" = 'TRUE' THEN 1 ELSE 0 END), 0) AS "Gold"
FROM (
%s
) snapshots
FROM snapshots
GROUP BY
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid";
`, monthlyTable, unionQuery)
`, unionQuery, monthlyTable)
if _, err := dbConn.ExecContext(ctx, insertQuery); err != nil {
c.Logger.Error("failed to aggregate monthly inventory", "error", err, "month", targetMonth.Format("2006-01"))
@@ -870,6 +894,16 @@ func tableHasRows(ctx context.Context, dbConn *sqlx.DB, table string) (bool, err
return true, nil
}
func filterSnapshotsWithRows(ctx context.Context, dbConn *sqlx.DB, snapshots []report.SnapshotRecord) []report.SnapshotRecord {
filtered := snapshots[:0]
for _, snapshot := range snapshots {
if rowsExist, err := tableHasRows(ctx, dbConn, snapshot.TableName); err == nil && rowsExist {
filtered = append(filtered, snapshot)
}
}
return filtered
}
type snapshotTotals struct {
VmCount int64 `db:"vm_count"`
VcpuTotal int64 `db:"vcpu_total"`