Compare commits
1 Commits
7b7afbf1d5
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| b4c52e296c |
@@ -1017,8 +1017,11 @@ WITH snapshots AS (
|
|||||||
MIN(s."SnapshotTime") AS first_present,
|
MIN(s."SnapshotTime") AS first_present,
|
||||||
MAX(s."SnapshotTime") AS last_present,
|
MAX(s."SnapshotTime") AS last_present,
|
||||||
COUNT(*) AS samples_present,
|
COUNT(*) AS samples_present,
|
||||||
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
|
s."Datacenter", s."Cluster", s."Folder",
|
||||||
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
|
AVG(COALESCE(s."ProvisionedDisk",0)) AS avg_disk,
|
||||||
|
AVG(COALESCE(s."VcpuCount",0)) AS avg_vcpu_raw,
|
||||||
|
AVG(COALESCE(s."RamGB",0)) AS avg_ram_raw,
|
||||||
|
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid",
|
||||||
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
|
SUM(CASE WHEN s."VcpuCount" IS NOT NULL THEN s."VcpuCount" ELSE 0 END) AS sum_vcpu,
|
||||||
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
|
SUM(CASE WHEN s."RamGB" IS NOT NULL THEN s."RamGB" ELSE 0 END) AS sum_ram,
|
||||||
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
|
SUM(CASE WHEN s."ProvisionedDisk" IS NOT NULL THEN s."ProvisionedDisk" ELSE 0 END) AS sum_disk,
|
||||||
@@ -1030,8 +1033,8 @@ WITH snapshots AS (
|
|||||||
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
|
LEFT JOIN inventory inv ON inv."VmId" = s."VmId" AND inv."Vcenter" = s."Vcenter"
|
||||||
GROUP BY
|
GROUP BY
|
||||||
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
s."InventoryId", s."Name", s."Vcenter", s."VmId", s."EventKey", s."CloudId",
|
||||||
s."Datacenter", s."Cluster", s."Folder", s."ProvisionedDisk", s."VcpuCount",
|
s."Datacenter", s."Cluster", s."Folder",
|
||||||
s."RamGB", s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
|
s."IsTemplate", s."PoweredOn", s."SrmPlaceholder", s."VmUuid"
|
||||||
)
|
)
|
||||||
INSERT INTO %s (
|
INSERT INTO %s (
|
||||||
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
||||||
@@ -1062,8 +1065,32 @@ SELECT
|
|||||||
ORDER BY s2."SnapshotTime" DESC
|
ORDER BY s2."SnapshotTime" DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
) AS "ResourcePool",
|
) AS "ResourcePool",
|
||||||
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
|
agg."Datacenter", agg."Cluster", agg."Folder",
|
||||||
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
(
|
||||||
|
SELECT s2."ProvisionedDisk"
|
||||||
|
FROM snapshots s2
|
||||||
|
WHERE s2."VmId" = agg."VmId"
|
||||||
|
AND s2."Vcenter" = agg."Vcenter"
|
||||||
|
ORDER BY s2."SnapshotTime" DESC
|
||||||
|
LIMIT 1
|
||||||
|
) AS "ProvisionedDisk",
|
||||||
|
(
|
||||||
|
SELECT s2."VcpuCount"
|
||||||
|
FROM snapshots s2
|
||||||
|
WHERE s2."VmId" = agg."VmId"
|
||||||
|
AND s2."Vcenter" = agg."Vcenter"
|
||||||
|
ORDER BY s2."SnapshotTime" DESC
|
||||||
|
LIMIT 1
|
||||||
|
) AS "VcpuCount",
|
||||||
|
(
|
||||||
|
SELECT s2."RamGB"
|
||||||
|
FROM snapshots s2
|
||||||
|
WHERE s2."VmId" = agg."VmId"
|
||||||
|
AND s2."Vcenter" = agg."Vcenter"
|
||||||
|
ORDER BY s2."SnapshotTime" DESC
|
||||||
|
LIMIT 1
|
||||||
|
) AS "RamGB",
|
||||||
|
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
||||||
agg.samples_present AS "SamplesPresent",
|
agg.samples_present AS "SamplesPresent",
|
||||||
CASE WHEN totals.total_samples > 0
|
CASE WHEN totals.total_samples > 0
|
||||||
THEN 1.0 * agg.sum_vcpu / totals.total_samples
|
THEN 1.0 * agg.sum_vcpu / totals.total_samples
|
||||||
@@ -1105,8 +1132,8 @@ FROM agg
|
|||||||
CROSS JOIN totals
|
CROSS JOIN totals
|
||||||
GROUP BY
|
GROUP BY
|
||||||
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
agg."InventoryId", agg."Name", agg."Vcenter", agg."VmId", agg."EventKey", agg."CloudId",
|
||||||
agg."Datacenter", agg."Cluster", agg."Folder", agg."ProvisionedDisk", agg."VcpuCount",
|
agg."Datacenter", agg."Cluster", agg."Folder",
|
||||||
agg."RamGB", agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
agg."IsTemplate", agg."PoweredOn", agg."SrmPlaceholder", agg."VmUuid",
|
||||||
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
agg.any_creation, agg.any_deletion, agg.first_present, agg.last_present,
|
||||||
totals.total_samples;
|
totals.total_samples;
|
||||||
`, unionQuery, tableName)
|
`, unionQuery, tableName)
|
||||||
|
|||||||
@@ -2,13 +2,8 @@ package tasks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
|
||||||
"runtime"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
"vctp/db"
|
"vctp/db"
|
||||||
"vctp/internal/metrics"
|
"vctp/internal/metrics"
|
||||||
@@ -61,17 +56,6 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If enabled, use the Go fan-out/reduce path to parallelize aggregation.
|
|
||||||
if os.Getenv("DAILY_AGG_GO") == "1" {
|
|
||||||
if err := c.aggregateDailySummaryGo(ctx, dayStart, dayEnd, summaryTable, force); err != nil {
|
|
||||||
c.Logger.Warn("go-based daily aggregation failed, falling back to SQL path", "error", err)
|
|
||||||
} else {
|
|
||||||
metrics.RecordDailyAggregation(time.Since(jobStart), nil)
|
|
||||||
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -180,379 +164,3 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
|||||||
func dailySummaryTableName(t time.Time) (string, error) {
|
func dailySummaryTableName(t time.Time) (string, error) {
|
||||||
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
return db.SafeTableName(fmt.Sprintf("inventory_daily_summary_%s", t.Format("20060102")))
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregateDailySummaryGo performs daily aggregation by reading hourly tables in parallel,
|
|
||||||
// reducing in Go, and writing the summary table. It mirrors the outputs of the SQL path
|
|
||||||
// as closely as possible while improving CPU utilization on multi-core hosts.
|
|
||||||
func (c *CronTask) aggregateDailySummaryGo(ctx context.Context, dayStart, dayEnd time.Time, summaryTable string, force bool) error {
|
|
||||||
jobStart := time.Now()
|
|
||||||
dbConn := c.Database.DB()
|
|
||||||
|
|
||||||
hourlySnapshots, err := report.SnapshotRecordsWithFallback(ctx, c.Database, "hourly", "inventory_hourly_", "epoch", dayStart, dayEnd)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hourlySnapshots = filterRecordsInRange(hourlySnapshots, dayStart, dayEnd)
|
|
||||||
hourlySnapshots = filterSnapshotsWithRows(ctx, dbConn, hourlySnapshots)
|
|
||||||
if len(hourlySnapshots) == 0 {
|
|
||||||
return fmt.Errorf("no hourly snapshot tables found for %s", dayStart.Format("2006-01-02"))
|
|
||||||
}
|
|
||||||
|
|
||||||
hourlyTables := make([]string, 0, len(hourlySnapshots))
|
|
||||||
for _, snapshot := range hourlySnapshots {
|
|
||||||
hourlyTables = append(hourlyTables, snapshot.TableName)
|
|
||||||
if err := db.EnsureSnapshotIndexes(ctx, dbConn, snapshot.TableName); err != nil {
|
|
||||||
c.Logger.Warn("failed to ensure indexes on hourly table", "table", snapshot.TableName, "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
unionQuery, err := buildUnionQuery(hourlyTables, summaryUnionColumns, templateExclusionFilter())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear existing summary if forcing.
|
|
||||||
if rowsExist, err := db.TableHasRows(ctx, dbConn, summaryTable); err != nil {
|
|
||||||
return err
|
|
||||||
} else if rowsExist && !force {
|
|
||||||
c.Logger.Debug("Daily summary already exists, skipping aggregation (Go path)", "summary_table", summaryTable)
|
|
||||||
return nil
|
|
||||||
} else if rowsExist && force {
|
|
||||||
if err := clearTable(ctx, dbConn, summaryTable); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
totalSamples := len(hourlyTables)
|
|
||||||
aggMap, err := c.scanHourlyTablesParallel(ctx, hourlySnapshots, totalSamples)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(aggMap) == 0 {
|
|
||||||
return fmt.Errorf("no VM records aggregated for %s", dayStart.Format("2006-01-02"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert aggregated rows.
|
|
||||||
if err := c.insertDailyAggregates(ctx, summaryTable, aggMap, totalSamples); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Refine lifecycle with existing SQL helper to pick up first-after deletions.
|
|
||||||
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
|
||||||
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
|
||||||
}
|
|
||||||
|
|
||||||
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
|
||||||
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
|
||||||
}
|
|
||||||
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
|
||||||
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
|
||||||
}
|
|
||||||
if err := c.generateReport(ctx, summaryTable); err != nil {
|
|
||||||
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Logger.Debug("Finished daily inventory aggregation (Go path)", "summary_table", summaryTable, "duration", time.Since(jobStart))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type dailyAggKey struct {
|
|
||||||
Vcenter string
|
|
||||||
VmId string
|
|
||||||
VmUuid string
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
type dailyAggVal struct {
|
|
||||||
key dailyAggKey
|
|
||||||
resourcePool string
|
|
||||||
datacenter string
|
|
||||||
cluster string
|
|
||||||
folder string
|
|
||||||
isTemplate string
|
|
||||||
poweredOn string
|
|
||||||
srmPlaceholder string
|
|
||||||
creation int64
|
|
||||||
firstSeen int64
|
|
||||||
lastSeen int64
|
|
||||||
sumVcpu int64
|
|
||||||
sumRam int64
|
|
||||||
sumDisk float64
|
|
||||||
samples int64
|
|
||||||
tinHits int64
|
|
||||||
bronzeHits int64
|
|
||||||
silverHits int64
|
|
||||||
goldHits int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CronTask) scanHourlyTablesParallel(ctx context.Context, snapshots []report.SnapshotRecord, totalSamples int) (map[dailyAggKey]*dailyAggVal, error) {
|
|
||||||
agg := make(map[dailyAggKey]*dailyAggVal, 1024)
|
|
||||||
mu := sync.Mutex{}
|
|
||||||
workers := runtime.NumCPU()
|
|
||||||
if workers < 2 {
|
|
||||||
workers = 2
|
|
||||||
}
|
|
||||||
if workers > len(snapshots) {
|
|
||||||
workers = len(snapshots)
|
|
||||||
}
|
|
||||||
|
|
||||||
jobs := make(chan report.SnapshotRecord, len(snapshots))
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
for i := 0; i < workers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for snap := range jobs {
|
|
||||||
rows, err := c.scanHourlyTable(ctx, snap)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Warn("failed to scan hourly table", "table", snap.TableName, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
mu.Lock()
|
|
||||||
for k, v := range rows {
|
|
||||||
if existing, ok := agg[k]; ok {
|
|
||||||
mergeDailyAgg(existing, v)
|
|
||||||
} else {
|
|
||||||
agg[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mu.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
for _, snap := range snapshots {
|
|
||||||
jobs <- snap
|
|
||||||
}
|
|
||||||
close(jobs)
|
|
||||||
wg.Wait()
|
|
||||||
return agg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func mergeDailyAgg(dst, src *dailyAggVal) {
|
|
||||||
if src.creation > 0 && (dst.creation == 0 || src.creation < dst.creation) {
|
|
||||||
dst.creation = src.creation
|
|
||||||
}
|
|
||||||
if dst.firstSeen == 0 || (src.firstSeen > 0 && src.firstSeen < dst.firstSeen) {
|
|
||||||
dst.firstSeen = src.firstSeen
|
|
||||||
}
|
|
||||||
if src.lastSeen > dst.lastSeen {
|
|
||||||
dst.lastSeen = src.lastSeen
|
|
||||||
dst.resourcePool = src.resourcePool
|
|
||||||
dst.datacenter = src.datacenter
|
|
||||||
dst.cluster = src.cluster
|
|
||||||
dst.folder = src.folder
|
|
||||||
dst.isTemplate = src.isTemplate
|
|
||||||
dst.poweredOn = src.poweredOn
|
|
||||||
dst.srmPlaceholder = src.srmPlaceholder
|
|
||||||
}
|
|
||||||
dst.sumVcpu += src.sumVcpu
|
|
||||||
dst.sumRam += src.sumRam
|
|
||||||
dst.sumDisk += src.sumDisk
|
|
||||||
dst.samples += src.samples
|
|
||||||
dst.tinHits += src.tinHits
|
|
||||||
dst.bronzeHits += src.bronzeHits
|
|
||||||
dst.silverHits += src.silverHits
|
|
||||||
dst.goldHits += src.goldHits
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CronTask) scanHourlyTable(ctx context.Context, snap report.SnapshotRecord) (map[dailyAggKey]*dailyAggVal, error) {
|
|
||||||
dbConn := c.Database.DB()
|
|
||||||
query := fmt.Sprintf(`
|
|
||||||
SELECT
|
|
||||||
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
|
||||||
COALESCE("ProvisionedDisk",0) AS disk,
|
|
||||||
COALESCE("VcpuCount",0) AS vcpu,
|
|
||||||
COALESCE("RamGB",0) AS ram,
|
|
||||||
COALESCE("CreationTime",0) AS creation,
|
|
||||||
COALESCE("DeletionTime",0) AS deletion,
|
|
||||||
COALESCE("IsTemplate",'FALSE') AS is_template,
|
|
||||||
COALESCE("PoweredOn",'FALSE') AS powered_on,
|
|
||||||
COALESCE("SrmPlaceholder",'FALSE') AS srm_placeholder,
|
|
||||||
COALESCE("SnapshotTime",0) AS snapshot_time
|
|
||||||
FROM %s
|
|
||||||
`, snap.TableName)
|
|
||||||
rows, err := dbConn.QueryxContext(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
out := make(map[dailyAggKey]*dailyAggVal, 256)
|
|
||||||
for rows.Next() {
|
|
||||||
var (
|
|
||||||
name, vcenter, vmId, vmUuid, resourcePool string
|
|
||||||
dc, cluster, folder sql.NullString
|
|
||||||
disk sql.NullFloat64
|
|
||||||
vcpu, ram sql.NullInt64
|
|
||||||
creation, deletion, snapshotTime sql.NullInt64
|
|
||||||
isTemplate, poweredOn, srmPlaceholder sql.NullString
|
|
||||||
)
|
|
||||||
if err := rows.Scan(&name, &vcenter, &vmId, &vmUuid, &resourcePool, &dc, &cluster, &folder, &disk, &vcpu, &ram, &creation, &deletion, &isTemplate, &poweredOn, &srmPlaceholder, &snapshotTime); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if vcenter == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Skip templates.
|
|
||||||
if strings.EqualFold(strings.TrimSpace(isTemplate.String), "true") || isTemplate.String == "1" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
key := dailyAggKey{
|
|
||||||
Vcenter: vcenter,
|
|
||||||
VmId: strings.TrimSpace(vmId),
|
|
||||||
VmUuid: strings.TrimSpace(vmUuid),
|
|
||||||
Name: strings.TrimSpace(name),
|
|
||||||
}
|
|
||||||
if key.VmId == "" && key.VmUuid == "" && key.Name == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if key.VmId == "" {
|
|
||||||
key.VmId = key.VmUuid
|
|
||||||
}
|
|
||||||
pool := strings.ToLower(strings.TrimSpace(resourcePool))
|
|
||||||
hitTin := btoi(pool == "tin")
|
|
||||||
hitBronze := btoi(pool == "bronze")
|
|
||||||
hitSilver := btoi(pool == "silver")
|
|
||||||
hitGold := btoi(pool == "gold")
|
|
||||||
row := &dailyAggVal{
|
|
||||||
key: key,
|
|
||||||
resourcePool: resourcePool,
|
|
||||||
datacenter: dc.String,
|
|
||||||
cluster: cluster.String,
|
|
||||||
folder: folder.String,
|
|
||||||
isTemplate: isTemplate.String,
|
|
||||||
poweredOn: poweredOn.String,
|
|
||||||
srmPlaceholder: srmPlaceholder.String,
|
|
||||||
creation: int64OrZero(creation),
|
|
||||||
firstSeen: int64OrZero(snapshotTime),
|
|
||||||
lastSeen: int64OrZero(snapshotTime),
|
|
||||||
sumVcpu: vcpu.Int64,
|
|
||||||
sumRam: ram.Int64,
|
|
||||||
sumDisk: disk.Float64,
|
|
||||||
samples: 1,
|
|
||||||
tinHits: hitTin,
|
|
||||||
bronzeHits: hitBronze,
|
|
||||||
silverHits: hitSilver,
|
|
||||||
goldHits: hitGold,
|
|
||||||
}
|
|
||||||
out[key] = row
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CronTask) insertDailyAggregates(ctx context.Context, table string, agg map[dailyAggKey]*dailyAggVal, totalSamples int) error {
|
|
||||||
dbConn := c.Database.DB()
|
|
||||||
tx, err := dbConn.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
driver := strings.ToLower(dbConn.DriverName())
|
|
||||||
placeholders := makePlaceholders(driver, 29)
|
|
||||||
insert := fmt.Sprintf(`
|
|
||||||
INSERT INTO %s (
|
|
||||||
"Name","Vcenter","VmId","VmUuid","ResourcePool","Datacenter","Cluster","Folder",
|
|
||||||
"ProvisionedDisk","VcpuCount","RamGB","IsTemplate","PoweredOn","SrmPlaceholder",
|
|
||||||
"CreationTime","DeletionTime","SamplesPresent","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk",
|
|
||||||
"AvgIsPresent","PoolTinPct","PoolBronzePct","PoolSilverPct","PoolGoldPct","Tin","Bronze","Silver","Gold"
|
|
||||||
) VALUES (%s)
|
|
||||||
`, table, placeholders)
|
|
||||||
|
|
||||||
for _, v := range agg {
|
|
||||||
if v.samples == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
avgVcpu := float64(v.sumVcpu) / float64(v.samples)
|
|
||||||
avgRam := float64(v.sumRam) / float64(v.samples)
|
|
||||||
avgDisk := v.sumDisk / float64(v.samples)
|
|
||||||
total := float64(totalSamples)
|
|
||||||
avgPresent := 0.0
|
|
||||||
tinPct := 0.0
|
|
||||||
bronzePct := 0.0
|
|
||||||
silverPct := 0.0
|
|
||||||
goldPct := 0.0
|
|
||||||
if total > 0 {
|
|
||||||
avgPresent = float64(v.samples) / total
|
|
||||||
tinPct = float64(v.tinHits) * 100 / total
|
|
||||||
bronzePct = float64(v.bronzeHits) * 100 / total
|
|
||||||
silverPct = float64(v.silverHits) * 100 / total
|
|
||||||
goldPct = float64(v.goldHits) * 100 / total
|
|
||||||
}
|
|
||||||
args := []interface{}{
|
|
||||||
v.key.Name,
|
|
||||||
v.key.Vcenter,
|
|
||||||
nullIfEmpty(v.key.VmId),
|
|
||||||
nullIfEmpty(v.key.VmUuid),
|
|
||||||
v.resourcePool,
|
|
||||||
nullIfEmpty(v.datacenter),
|
|
||||||
nullIfEmpty(v.cluster),
|
|
||||||
nullIfEmpty(v.folder),
|
|
||||||
v.sumDisk,
|
|
||||||
v.sumVcpu,
|
|
||||||
v.sumRam,
|
|
||||||
v.isTemplate,
|
|
||||||
v.poweredOn,
|
|
||||||
v.srmPlaceholder,
|
|
||||||
v.creation,
|
|
||||||
int64(0), // deletion time refined later
|
|
||||||
v.samples,
|
|
||||||
avgVcpu,
|
|
||||||
avgRam,
|
|
||||||
avgDisk,
|
|
||||||
avgPresent,
|
|
||||||
tinPct, bronzePct, silverPct, goldPct,
|
|
||||||
float64(v.tinHits), float64(v.bronzeHits), float64(v.silverHits), float64(v.goldHits),
|
|
||||||
}
|
|
||||||
if driver != "sqlite" {
|
|
||||||
// Postgres expects primitive types, nulls are handled by pq via nil.
|
|
||||||
for i, a := range args {
|
|
||||||
if s, ok := a.(string); ok && s == "" {
|
|
||||||
args[i] = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, err := tx.ExecContext(ctx, insert, args...); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
func int64OrZero(v sql.NullInt64) int64 {
|
|
||||||
if v.Valid {
|
|
||||||
return v.Int64
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func nullIfEmpty(s string) interface{} {
|
|
||||||
if strings.TrimSpace(s) == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func makePlaceholders(driver string, n int) string {
|
|
||||||
if driver == "sqlite" {
|
|
||||||
parts := make([]string, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
parts[i] = "?"
|
|
||||||
}
|
|
||||||
return strings.Join(parts, ",")
|
|
||||||
}
|
|
||||||
parts := make([]string, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
parts[i] = fmt.Sprintf("$%d", i+1)
|
|
||||||
}
|
|
||||||
return strings.Join(parts, ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
func btoi(b bool) int64 {
|
|
||||||
if b {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user