avoid vcenter totals pages scanning whole database
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
386
db/helpers.go
386
db/helpers.go
@@ -1121,6 +1121,277 @@ CREATE TABLE IF NOT EXISTS vcenter_totals (
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureVcenterLatestTotalsTable creates a compact table with one latest totals row per vCenter.
|
||||
func EnsureVcenterLatestTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
var ddl string
|
||||
switch driver {
|
||||
case "pgx", "postgres":
|
||||
ddl = `
|
||||
CREATE TABLE IF NOT EXISTS vcenter_latest_totals (
|
||||
"Vcenter" TEXT PRIMARY KEY,
|
||||
"SnapshotTime" BIGINT NOT NULL,
|
||||
"VmCount" BIGINT NOT NULL,
|
||||
"VcpuTotal" BIGINT NOT NULL,
|
||||
"RamTotalGB" BIGINT NOT NULL
|
||||
);`
|
||||
default:
|
||||
ddl = `
|
||||
CREATE TABLE IF NOT EXISTS vcenter_latest_totals (
|
||||
"Vcenter" TEXT PRIMARY KEY,
|
||||
"SnapshotTime" BIGINT NOT NULL,
|
||||
"VmCount" BIGINT NOT NULL,
|
||||
"VcpuTotal" BIGINT NOT NULL,
|
||||
"RamTotalGB" BIGINT NOT NULL
|
||||
);`
|
||||
}
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertVcenterLatestTotals stores the latest totals per vCenter.
|
||||
func UpsertVcenterLatestTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime int64, vmCount, vcpuTotal, ramTotal int64) error {
|
||||
if strings.TrimSpace(vcenter) == "" {
|
||||
return fmt.Errorf("vcenter is empty")
|
||||
}
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := execLog(ctx, dbConn, `
|
||||
INSERT INTO vcenter_latest_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
|
||||
VALUES ($1,$2,$3,$4,$5)
|
||||
ON CONFLICT ("Vcenter") DO UPDATE SET
|
||||
"SnapshotTime" = EXCLUDED."SnapshotTime",
|
||||
"VmCount" = EXCLUDED."VmCount",
|
||||
"VcpuTotal" = EXCLUDED."VcpuTotal",
|
||||
"RamTotalGB" = EXCLUDED."RamTotalGB"
|
||||
WHERE EXCLUDED."SnapshotTime" >= vcenter_latest_totals."SnapshotTime"
|
||||
`, vcenter, snapshotTime, vmCount, vcpuTotal, ramTotal)
|
||||
return err
|
||||
}
|
||||
|
||||
// EnsureVcenterAggregateTotalsTable creates a compact cache for hourly/daily (and optional monthly) totals.
|
||||
func EnsureVcenterAggregateTotalsTable(ctx context.Context, dbConn *sqlx.DB) error {
|
||||
ddl := `
|
||||
CREATE TABLE IF NOT EXISTS vcenter_aggregate_totals (
|
||||
"SnapshotType" TEXT NOT NULL,
|
||||
"Vcenter" TEXT NOT NULL,
|
||||
"SnapshotTime" BIGINT NOT NULL,
|
||||
"VmCount" BIGINT NOT NULL,
|
||||
"VcpuTotal" BIGINT NOT NULL,
|
||||
"RamTotalGB" BIGINT NOT NULL,
|
||||
PRIMARY KEY ("SnapshotType","Vcenter","SnapshotTime")
|
||||
);`
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vcenter_aggregate_totals_vc_type_time_idx ON vcenter_aggregate_totals ("Vcenter","SnapshotType","SnapshotTime" DESC)`)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertVcenterAggregateTotal stores per-vCenter totals for a snapshot type/time.
|
||||
func UpsertVcenterAggregateTotal(ctx context.Context, dbConn *sqlx.DB, snapshotType, vcenter string, snapshotTime int64, vmCount, vcpuTotal, ramTotal int64) error {
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if snapshotType == "" {
|
||||
return fmt.Errorf("snapshot type is empty")
|
||||
}
|
||||
if strings.TrimSpace(vcenter) == "" {
|
||||
return fmt.Errorf("vcenter is empty")
|
||||
}
|
||||
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := execLog(ctx, dbConn, `
|
||||
INSERT INTO vcenter_aggregate_totals ("SnapshotType","Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
|
||||
VALUES ($1,$2,$3,$4,$5,$6)
|
||||
ON CONFLICT ("SnapshotType","Vcenter","SnapshotTime") DO UPDATE SET
|
||||
"VmCount" = EXCLUDED."VmCount",
|
||||
"VcpuTotal" = EXCLUDED."VcpuTotal",
|
||||
"RamTotalGB" = EXCLUDED."RamTotalGB"
|
||||
`, snapshotType, vcenter, snapshotTime, vmCount, vcpuTotal, ramTotal)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReplaceVcenterAggregateTotalsFromSummary recomputes one snapshot's per-vCenter totals from a summary table.
|
||||
func ReplaceVcenterAggregateTotalsFromSummary(ctx context.Context, dbConn *sqlx.DB, summaryTable, snapshotType string, snapshotTime int64) (int, error) {
|
||||
if err := ValidateTableName(summaryTable); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if snapshotType == "" {
|
||||
return 0, fmt.Errorf("snapshot type is empty")
|
||||
}
|
||||
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if _, err := execLog(ctx, dbConn, `
|
||||
DELETE FROM vcenter_aggregate_totals
|
||||
WHERE "SnapshotType" = $1 AND "SnapshotTime" = $2
|
||||
`, snapshotType, snapshotTime); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
"Vcenter" AS vcenter,
|
||||
COUNT(1) AS vm_count,
|
||||
CAST(COALESCE(SUM(COALESCE("AvgVcpuCount","VcpuCount")),0) AS BIGINT) AS vcpu_total,
|
||||
CAST(COALESCE(SUM(COALESCE("AvgRamGB","RamGB")),0) AS BIGINT) AS ram_total
|
||||
FROM %s
|
||||
GROUP BY "Vcenter"
|
||||
`, summaryTable)
|
||||
var rows []struct {
|
||||
Vcenter string `db:"vcenter"`
|
||||
VmCount int64 `db:"vm_count"`
|
||||
VcpuTotal int64 `db:"vcpu_total"`
|
||||
RamTotal int64 `db:"ram_total"`
|
||||
}
|
||||
if err := selectLog(ctx, dbConn, &rows, query); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
upserted := 0
|
||||
for _, row := range rows {
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, snapshotType, row.Vcenter, snapshotTime, row.VmCount, row.VcpuTotal, row.RamTotal); err != nil {
|
||||
return upserted, err
|
||||
}
|
||||
upserted++
|
||||
}
|
||||
return upserted, nil
|
||||
}
|
||||
|
||||
// SyncVcenterAggregateTotalsFromRegistry refreshes cached totals for summary snapshots listed in snapshot_registry.
|
||||
func SyncVcenterAggregateTotalsFromRegistry(ctx context.Context, dbConn *sqlx.DB, snapshotType string) (int, int, error) {
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if snapshotType == "" {
|
||||
return 0, 0, fmt.Errorf("snapshot type is empty")
|
||||
}
|
||||
if snapshotType != "daily" && snapshotType != "monthly" {
|
||||
return 0, 0, fmt.Errorf("unsupported snapshot type for summary cache sync: %s", snapshotType)
|
||||
}
|
||||
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
query := dbConn.Rebind(`
|
||||
SELECT table_name, snapshot_time
|
||||
FROM snapshot_registry
|
||||
WHERE snapshot_type = ?
|
||||
ORDER BY snapshot_time
|
||||
`)
|
||||
var snapshots []struct {
|
||||
TableName string `db:"table_name"`
|
||||
SnapshotTime int64 `db:"snapshot_time"`
|
||||
}
|
||||
if err := selectLog(ctx, dbConn, &snapshots, query, snapshotType); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
snapshotsRefreshed := 0
|
||||
rowsUpserted := 0
|
||||
failures := 0
|
||||
for _, snapshot := range snapshots {
|
||||
if err := ValidateTableName(snapshot.TableName); err != nil {
|
||||
failures++
|
||||
slog.Warn("skipping invalid summary table in snapshot registry", "snapshot_type", snapshotType, "table", snapshot.TableName, "error", err)
|
||||
continue
|
||||
}
|
||||
upserted, err := ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, snapshot.TableName, snapshotType, snapshot.SnapshotTime)
|
||||
if err != nil {
|
||||
failures++
|
||||
slog.Warn("failed to refresh vcenter aggregate cache from summary table", "snapshot_type", snapshotType, "table", snapshot.TableName, "snapshot_time", snapshot.SnapshotTime, "error", err)
|
||||
continue
|
||||
}
|
||||
snapshotsRefreshed++
|
||||
rowsUpserted += upserted
|
||||
}
|
||||
|
||||
if failures > 0 {
|
||||
return snapshotsRefreshed, rowsUpserted, fmt.Errorf("vcenter aggregate cache sync finished with %d failed snapshot(s)", failures)
|
||||
}
|
||||
return snapshotsRefreshed, rowsUpserted, nil
|
||||
}
|
||||
|
||||
// ListVcenterAggregateTotals lists cached totals by type.
|
||||
func ListVcenterAggregateTotals(ctx context.Context, dbConn *sqlx.DB, vcenter, snapshotType string, limit int) ([]VcenterTotalRow, error) {
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 200
|
||||
}
|
||||
rows := make([]VcenterTotalRow, 0, limit)
|
||||
query := `
|
||||
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
|
||||
FROM vcenter_aggregate_totals
|
||||
WHERE "Vcenter" = $1 AND "SnapshotType" = $2
|
||||
ORDER BY "SnapshotTime" DESC
|
||||
LIMIT $3
|
||||
`
|
||||
if err := selectLog(ctx, dbConn, &rows, query, vcenter, snapshotType, limit); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// ListVcenterAggregateTotalsSince lists cached totals by type from a lower-bound timestamp.
|
||||
func ListVcenterAggregateTotalsSince(ctx context.Context, dbConn *sqlx.DB, vcenter, snapshotType string, since time.Time) ([]VcenterTotalRow, error) {
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if err := EnsureVcenterAggregateTotalsTable(ctx, dbConn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows := make([]VcenterTotalRow, 0, 256)
|
||||
query := `
|
||||
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
|
||||
FROM vcenter_aggregate_totals
|
||||
WHERE "Vcenter" = $1 AND "SnapshotType" = $2 AND "SnapshotTime" >= $3
|
||||
ORDER BY "SnapshotTime" DESC
|
||||
`
|
||||
if err := selectLog(ctx, dbConn, &rows, query, vcenter, snapshotType, since.Unix()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// SyncVcenterLatestTotalsFromHistory backfills latest totals from existing vcenter_totals history.
|
||||
func SyncVcenterLatestTotalsFromHistory(ctx context.Context, dbConn *sqlx.DB) (int, error) {
|
||||
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
var rows []struct {
|
||||
Vcenter string `db:"Vcenter"`
|
||||
SnapshotTime int64 `db:"SnapshotTime"`
|
||||
VmCount int64 `db:"VmCount"`
|
||||
VcpuTotal int64 `db:"VcpuTotal"`
|
||||
RamTotalGB int64 `db:"RamTotalGB"`
|
||||
}
|
||||
if err := selectLog(ctx, dbConn, &rows, `
|
||||
SELECT t."Vcenter", t."SnapshotTime", t."VmCount", t."VcpuTotal", t."RamTotalGB"
|
||||
FROM vcenter_totals t
|
||||
JOIN (
|
||||
SELECT "Vcenter", MAX("SnapshotTime") AS max_snapshot_time
|
||||
FROM vcenter_totals
|
||||
GROUP BY "Vcenter"
|
||||
) latest
|
||||
ON latest."Vcenter" = t."Vcenter"
|
||||
AND latest.max_snapshot_time = t."SnapshotTime"
|
||||
`); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
upserted := 0
|
||||
for _, row := range rows {
|
||||
if err := UpsertVcenterLatestTotals(ctx, dbConn, row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
|
||||
return upserted, err
|
||||
}
|
||||
upserted++
|
||||
}
|
||||
return upserted, nil
|
||||
}
|
||||
|
||||
// InsertVcenterTotals records totals for a vcenter at a snapshot time.
|
||||
func InsertVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotTime time.Time, vmCount, vcpuTotal, ramTotal int64) error {
|
||||
if strings.TrimSpace(vcenter) == "" {
|
||||
@@ -1129,15 +1400,56 @@ func InsertVcenterTotals(ctx context.Context, dbConn *sqlx.DB, vcenter string, s
|
||||
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := execLog(ctx, dbConn, `
|
||||
INSERT INTO vcenter_totals ("Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB")
|
||||
VALUES ($1,$2,$3,$4,$5)
|
||||
`, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := UpsertVcenterLatestTotals(ctx, dbConn, vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", vcenter, snapshotTime.Unix(), vmCount, vcpuTotal, ramTotal); err != nil {
|
||||
slog.Warn("failed to upsert vcenter_aggregate_totals", "snapshot_type", "hourly", "vcenter", vcenter, "snapshot_time", snapshotTime.Unix(), "error", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListVcenters returns distinct vcenter URLs tracked.
|
||||
func ListVcenters(ctx context.Context, dbConn *sqlx.DB) ([]string, error) {
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err == nil {
|
||||
rows, err := dbConn.QueryxContext(ctx, `SELECT "Vcenter" FROM vcenter_latest_totals ORDER BY "Vcenter"`)
|
||||
if err == nil {
|
||||
defer rows.Close()
|
||||
out := make([]string, 0, 32)
|
||||
for rows.Next() {
|
||||
var v string
|
||||
if err := rows.Scan(&v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, v)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(out) > 0 {
|
||||
return out, nil
|
||||
}
|
||||
// Older installs may have vcenter_totals populated but no latest cache yet.
|
||||
if _, err := SyncVcenterLatestTotalsFromHistory(ctx, dbConn); err == nil {
|
||||
refreshed := make([]string, 0, 32)
|
||||
if err := selectLog(ctx, dbConn, &refreshed, `SELECT "Vcenter" FROM vcenter_latest_totals ORDER BY "Vcenter"`); err == nil && len(refreshed) > 0 {
|
||||
return refreshed, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback for older DBs before vcenter_latest_totals gets populated.
|
||||
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1187,15 +1499,66 @@ LIMIT $2`
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// ListVcenterHourlyTotalsSince returns hourly totals for a vCenter from a minimum snapshot time.
|
||||
func ListVcenterHourlyTotalsSince(ctx context.Context, dbConn *sqlx.DB, vcenter string, since time.Time) ([]VcenterTotalRow, error) {
|
||||
cachedRows, cacheErr := ListVcenterAggregateTotalsSince(ctx, dbConn, vcenter, "hourly", since)
|
||||
if cacheErr == nil && len(cachedRows) > 0 {
|
||||
return cachedRows, nil
|
||||
}
|
||||
if cacheErr != nil {
|
||||
slog.Warn("failed to read hourly totals cache", "vcenter", vcenter, "since", since.Unix(), "error", cacheErr)
|
||||
}
|
||||
|
||||
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows := make([]VcenterTotalRow, 0, 256)
|
||||
query := `
|
||||
SELECT "Vcenter","SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
|
||||
FROM vcenter_totals
|
||||
WHERE "Vcenter" = $1
|
||||
AND "SnapshotTime" >= $2
|
||||
ORDER BY "SnapshotTime" DESC`
|
||||
if err := selectLog(ctx, dbConn, &rows, query, vcenter, since.Unix()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, row := range rows {
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
|
||||
slog.Warn("failed to warm hourly totals cache", "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// ListVcenterTotalsByType returns totals for a vcenter for the requested snapshot type (hourly, daily, monthly).
|
||||
// Hourly values come from vcenter_totals; daily/monthly are derived from the summary tables referenced in snapshot_registry.
|
||||
// Prefer vcenter_aggregate_totals cache and fallback to source tables when cache is empty.
|
||||
func ListVcenterTotalsByType(ctx context.Context, dbConn *sqlx.DB, vcenter string, snapshotType string, limit int) ([]VcenterTotalRow, error) {
|
||||
snapshotType = strings.ToLower(snapshotType)
|
||||
snapshotType = strings.ToLower(strings.TrimSpace(snapshotType))
|
||||
if snapshotType == "" {
|
||||
snapshotType = "hourly"
|
||||
}
|
||||
|
||||
cachedRows, cacheErr := ListVcenterAggregateTotals(ctx, dbConn, vcenter, snapshotType, limit)
|
||||
if cacheErr == nil && len(cachedRows) > 0 {
|
||||
return cachedRows, nil
|
||||
}
|
||||
if cacheErr != nil {
|
||||
slog.Warn("failed to read vcenter aggregate totals cache", "snapshot_type", snapshotType, "vcenter", vcenter, "error", cacheErr)
|
||||
}
|
||||
|
||||
if snapshotType == "hourly" {
|
||||
return ListVcenterTotals(ctx, dbConn, vcenter, limit)
|
||||
rows, err := ListVcenterTotals(ctx, dbConn, vcenter, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, row := range rows {
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
|
||||
slog.Warn("failed to warm hourly totals cache", "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
if limit <= 0 {
|
||||
@@ -1240,6 +1603,12 @@ LIMIT $2
|
||||
RamTotalGB: agg.RamTotalGB,
|
||||
})
|
||||
}
|
||||
for _, row := range out {
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, snapshotType, row.Vcenter, row.SnapshotTime, row.VmCount, row.VcpuTotal, row.RamTotalGB); err != nil {
|
||||
slog.Warn("failed to warm vcenter aggregate totals cache", "snapshot_type", snapshotType, "vcenter", row.Vcenter, "snapshot_time", row.SnapshotTime, "error", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -1589,6 +1958,9 @@ func SyncVcenterTotalsFromSnapshots(ctx context.Context, dbConn *sqlx.DB) error
|
||||
if err := EnsureVcenterTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
|
||||
return err
|
||||
}
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
var hourlyTables []struct {
|
||||
TableName string `db:"table_name"`
|
||||
@@ -1640,6 +2012,12 @@ WHERE NOT EXISTS (
|
||||
if _, err := execLog(ctx, dbConn, insert, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
|
||||
slog.Warn("failed to backfill vcenter_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
|
||||
}
|
||||
if err := UpsertVcenterLatestTotals(ctx, dbConn, a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
|
||||
slog.Warn("failed to upsert vcenter_latest_totals", "table", ht.TableName, "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
|
||||
}
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", a.Vcenter, ht.SnapshotTime, a.VmCount, a.VcpuTotal, a.RamTotal); err != nil {
|
||||
slog.Warn("failed to upsert vcenter_aggregate_totals", "table", ht.TableName, "snapshot_type", "hourly", "vcenter", a.Vcenter, "snapshot_time", ht.SnapshotTime, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -162,3 +162,316 @@ func TestParseHourlySnapshotUnix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestVcenterLatestTotalsAndListFallback(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
if err := EnsureVcenterLatestTotalsTable(ctx, dbConn); err != nil {
|
||||
t.Fatalf("failed to ensure vcenter_latest_totals: %v", err)
|
||||
}
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-a", time.Unix(200, 0), 10, 20, 30); err != nil {
|
||||
t.Fatalf("failed to insert totals for vc-a: %v", err)
|
||||
}
|
||||
// Older snapshot should not replace latest totals.
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-a", time.Unix(100, 0), 1, 2, 3); err != nil {
|
||||
t.Fatalf("failed to insert older totals for vc-a: %v", err)
|
||||
}
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-b", time.Unix(300, 0), 11, 21, 31); err != nil {
|
||||
t.Fatalf("failed to insert totals for vc-b: %v", err)
|
||||
}
|
||||
|
||||
vcenters, err := ListVcenters(ctx, dbConn)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenters failed: %v", err)
|
||||
}
|
||||
if len(vcenters) != 2 || vcenters[0] != "vc-a" || vcenters[1] != "vc-b" {
|
||||
t.Fatalf("unexpected vcenter list: %#v", vcenters)
|
||||
}
|
||||
|
||||
var latest struct {
|
||||
SnapshotTime int64 `db:"SnapshotTime"`
|
||||
VmCount int64 `db:"VmCount"`
|
||||
VcpuTotal int64 `db:"VcpuTotal"`
|
||||
RamTotalGB int64 `db:"RamTotalGB"`
|
||||
}
|
||||
if err := dbConn.GetContext(ctx, &latest, `
|
||||
SELECT "SnapshotTime","VmCount","VcpuTotal","RamTotalGB"
|
||||
FROM vcenter_latest_totals
|
||||
WHERE "Vcenter" = ?
|
||||
`, "vc-a"); err != nil {
|
||||
t.Fatalf("failed to query latest totals for vc-a: %v", err)
|
||||
}
|
||||
if latest.SnapshotTime != 200 || latest.VmCount != 10 || latest.VcpuTotal != 20 || latest.RamTotalGB != 30 {
|
||||
t.Fatalf("unexpected latest totals for vc-a: %#v", latest)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListVcenterHourlyTotalsSince(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
base := time.Unix(1_700_000_000, 0)
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-a", base.AddDate(0, 0, -60), 1, 2, 3); err != nil {
|
||||
t.Fatalf("failed to insert old totals: %v", err)
|
||||
}
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-a", base.AddDate(0, 0, -10), 10, 20, 30); err != nil {
|
||||
t.Fatalf("failed to insert recent totals: %v", err)
|
||||
}
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-b", base.AddDate(0, 0, -5), 100, 200, 300); err != nil {
|
||||
t.Fatalf("failed to insert other-vcenter totals: %v", err)
|
||||
}
|
||||
|
||||
rows, err := ListVcenterHourlyTotalsSince(ctx, dbConn, "vc-a", base.AddDate(0, 0, -45))
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterHourlyTotalsSince failed: %v", err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
t.Fatalf("expected 1 row for vc-a since cutoff, got %d", len(rows))
|
||||
}
|
||||
if rows[0].SnapshotTime != base.AddDate(0, 0, -10).Unix() || rows[0].VmCount != 10 {
|
||||
t.Fatalf("unexpected row returned: %#v", rows[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertVcenterTotalsUpsertsHourlyAggregate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
snapshotTime := time.Unix(1_700_000_500, 0)
|
||||
if err := InsertVcenterTotals(ctx, dbConn, "vc-a", snapshotTime, 12, 24, 48); err != nil {
|
||||
t.Fatalf("InsertVcenterTotals failed: %v", err)
|
||||
}
|
||||
|
||||
rows, err := ListVcenterAggregateTotals(ctx, dbConn, "vc-a", "hourly", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterAggregateTotals failed: %v", err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
t.Fatalf("expected 1 hourly aggregate row, got %d", len(rows))
|
||||
}
|
||||
if rows[0].SnapshotTime != snapshotTime.Unix() || rows[0].VmCount != 12 || rows[0].VcpuTotal != 24 || rows[0].RamTotalGB != 48 {
|
||||
t.Fatalf("unexpected hourly aggregate row: %#v", rows[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestListVcenterHourlyTotalsSinceUsesAggregateCache(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
base := time.Unix(1_700_000_000, 0)
|
||||
if err := UpsertVcenterAggregateTotal(ctx, dbConn, "hourly", "vc-a", base.Unix(), 7, 14, 21); err != nil {
|
||||
t.Fatalf("UpsertVcenterAggregateTotal failed: %v", err)
|
||||
}
|
||||
|
||||
rows, err := ListVcenterHourlyTotalsSince(ctx, dbConn, "vc-a", base.Add(-24*time.Hour))
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterHourlyTotalsSince failed: %v", err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
t.Fatalf("expected 1 cached row, got %d", len(rows))
|
||||
}
|
||||
if rows[0].SnapshotTime != base.Unix() || rows[0].VmCount != 7 || rows[0].VcpuTotal != 14 || rows[0].RamTotalGB != 21 {
|
||||
t.Fatalf("unexpected cached hourly row: %#v", rows[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplaceVcenterAggregateTotalsFromSummary(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
summaryTable := "inventory_daily_summary_20260101"
|
||||
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE %s (
|
||||
"Vcenter" TEXT NOT NULL,
|
||||
"Name" TEXT,
|
||||
"VmId" TEXT,
|
||||
"VmUuid" TEXT,
|
||||
"AvgVcpuCount" REAL,
|
||||
"VcpuCount" BIGINT,
|
||||
"AvgRamGB" REAL,
|
||||
"RamGB" BIGINT
|
||||
)`, summaryTable)); err != nil {
|
||||
t.Fatalf("failed to create summary table: %v", err)
|
||||
}
|
||||
insertSQL := fmt.Sprintf(`
|
||||
INSERT INTO %s ("Vcenter","Name","VmId","VmUuid","AvgVcpuCount","AvgRamGB")
|
||||
VALUES (?,?,?,?,?,?)
|
||||
`, summaryTable)
|
||||
rows := [][]interface{}{
|
||||
{"vc-a", "vm-1", "1", "u1", 2.0, 4.0},
|
||||
{"vc-a", "vm-2", "2", "u2", 3.0, 5.0},
|
||||
{"vc-b", "vm-3", "3", "u3", 1.0, 2.0},
|
||||
}
|
||||
for _, args := range rows {
|
||||
if _, err := dbConn.ExecContext(ctx, insertSQL, args...); err != nil {
|
||||
t.Fatalf("failed to insert summary row: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
upserted, err := ReplaceVcenterAggregateTotalsFromSummary(ctx, dbConn, summaryTable, "daily", 1_700_010_000)
|
||||
if err != nil {
|
||||
t.Fatalf("ReplaceVcenterAggregateTotalsFromSummary failed: %v", err)
|
||||
}
|
||||
if upserted != 2 {
|
||||
t.Fatalf("expected 2 vcenter aggregate rows, got %d", upserted)
|
||||
}
|
||||
|
||||
vcA, err := ListVcenterAggregateTotals(ctx, dbConn, "vc-a", "daily", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterAggregateTotals(vc-a) failed: %v", err)
|
||||
}
|
||||
if len(vcA) != 1 {
|
||||
t.Fatalf("expected 1 vc-a daily row, got %d", len(vcA))
|
||||
}
|
||||
if vcA[0].SnapshotTime != 1_700_010_000 || vcA[0].VmCount != 2 || vcA[0].VcpuTotal != 5 || vcA[0].RamTotalGB != 9 {
|
||||
t.Fatalf("unexpected vc-a daily aggregate row: %#v", vcA[0])
|
||||
}
|
||||
|
||||
vcB, err := ListVcenterAggregateTotalsSince(ctx, dbConn, "vc-b", "daily", time.Unix(1_700_009_000, 0))
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterAggregateTotalsSince(vc-b) failed: %v", err)
|
||||
}
|
||||
if len(vcB) != 1 || vcB[0].VmCount != 1 || vcB[0].VcpuTotal != 1 || vcB[0].RamTotalGB != 2 {
|
||||
t.Fatalf("unexpected vc-b daily aggregate row: %#v", vcB)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListVcenterTotalsByTypeDailyFallbackWarmsCache(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
if _, err := dbConn.ExecContext(ctx, `
|
||||
CREATE TABLE snapshot_registry (
|
||||
snapshot_type TEXT,
|
||||
table_name TEXT,
|
||||
snapshot_time BIGINT
|
||||
)`); err != nil {
|
||||
t.Fatalf("failed to create snapshot_registry: %v", err)
|
||||
}
|
||||
|
||||
summaryTable := "inventory_daily_summary_20260102"
|
||||
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE %s (
|
||||
"Vcenter" TEXT NOT NULL,
|
||||
"Name" TEXT,
|
||||
"VmId" TEXT,
|
||||
"VmUuid" TEXT,
|
||||
"AvgVcpuCount" REAL,
|
||||
"VcpuCount" BIGINT,
|
||||
"AvgRamGB" REAL,
|
||||
"RamGB" BIGINT
|
||||
)`, summaryTable)); err != nil {
|
||||
t.Fatalf("failed to create summary table: %v", err)
|
||||
}
|
||||
insertSQL := fmt.Sprintf(`
|
||||
INSERT INTO %s ("Vcenter","Name","VmId","VmUuid","AvgVcpuCount","AvgRamGB")
|
||||
VALUES (?,?,?,?,?,?)
|
||||
`, summaryTable)
|
||||
for _, args := range [][]interface{}{
|
||||
{"vc-a", "vm-1", "1", "u1", 4.0, 8.0},
|
||||
{"vc-a", "vm-2", "2", "u2", 2.0, 6.0},
|
||||
} {
|
||||
if _, err := dbConn.ExecContext(ctx, insertSQL, args...); err != nil {
|
||||
t.Fatalf("failed to insert summary row: %v", err)
|
||||
}
|
||||
}
|
||||
if _, err := dbConn.ExecContext(ctx, `INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time) VALUES (?,?,?)`, "daily", summaryTable, int64(1_700_020_000)); err != nil {
|
||||
t.Fatalf("failed to insert snapshot_registry row: %v", err)
|
||||
}
|
||||
|
||||
rows, err := ListVcenterTotalsByType(ctx, dbConn, "vc-a", "daily", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterTotalsByType failed: %v", err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
t.Fatalf("expected 1 daily row, got %d", len(rows))
|
||||
}
|
||||
if rows[0].SnapshotTime != 1_700_020_000 || rows[0].VmCount != 2 || rows[0].VcpuTotal != 6 || rows[0].RamTotalGB != 14 {
|
||||
t.Fatalf("unexpected daily totals row: %#v", rows[0])
|
||||
}
|
||||
|
||||
cached, err := ListVcenterAggregateTotals(ctx, dbConn, "vc-a", "daily", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterAggregateTotals failed: %v", err)
|
||||
}
|
||||
if len(cached) != 1 || cached[0].SnapshotTime != 1_700_020_000 || cached[0].VmCount != 2 {
|
||||
t.Fatalf("expected warmed daily cache row, got %#v", cached)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncVcenterAggregateTotalsFromRegistry(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
if _, err := dbConn.ExecContext(ctx, `
|
||||
CREATE TABLE snapshot_registry (
|
||||
snapshot_type TEXT,
|
||||
table_name TEXT,
|
||||
snapshot_time BIGINT
|
||||
)`); err != nil {
|
||||
t.Fatalf("failed to create snapshot_registry: %v", err)
|
||||
}
|
||||
|
||||
table1 := "inventory_daily_summary_20260103"
|
||||
table2 := "inventory_daily_summary_20260104"
|
||||
for _, table := range []string{table1, table2} {
|
||||
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE %s (
|
||||
"Vcenter" TEXT NOT NULL,
|
||||
"Name" TEXT,
|
||||
"VmId" TEXT,
|
||||
"VmUuid" TEXT,
|
||||
"AvgVcpuCount" REAL,
|
||||
"VcpuCount" BIGINT,
|
||||
"AvgRamGB" REAL,
|
||||
"RamGB" BIGINT
|
||||
)`, table)); err != nil {
|
||||
t.Fatalf("failed to create summary table %s: %v", table, err)
|
||||
}
|
||||
}
|
||||
insert1 := fmt.Sprintf(`INSERT INTO %s ("Vcenter","Name","VmId","VmUuid","AvgVcpuCount","AvgRamGB") VALUES (?,?,?,?,?,?)`, table1)
|
||||
insert2 := fmt.Sprintf(`INSERT INTO %s ("Vcenter","Name","VmId","VmUuid","AvgVcpuCount","AvgRamGB") VALUES (?,?,?,?,?,?)`, table2)
|
||||
for _, args := range [][]interface{}{
|
||||
{"vc-a", "vm-1", "1", "u1", 2.0, 4.0},
|
||||
{"vc-b", "vm-2", "2", "u2", 3.0, 5.0},
|
||||
} {
|
||||
if _, err := dbConn.ExecContext(ctx, insert1, args...); err != nil {
|
||||
t.Fatalf("failed to insert row into %s: %v", table1, err)
|
||||
}
|
||||
}
|
||||
if _, err := dbConn.ExecContext(ctx, insert2, "vc-a", "vm-3", "3", "u3", 4.0, 6.0); err != nil {
|
||||
t.Fatalf("failed to insert row into %s: %v", table2, err)
|
||||
}
|
||||
if _, err := dbConn.ExecContext(ctx, `INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time) VALUES (?,?,?)`, "daily", table1, int64(1_700_030_000)); err != nil {
|
||||
t.Fatalf("failed to insert snapshot_registry row for table1: %v", err)
|
||||
}
|
||||
if _, err := dbConn.ExecContext(ctx, `INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time) VALUES (?,?,?)`, "daily", table2, int64(1_700_040_000)); err != nil {
|
||||
t.Fatalf("failed to insert snapshot_registry row for table2: %v", err)
|
||||
}
|
||||
|
||||
snapshotsRefreshed, rowsUpserted, err := SyncVcenterAggregateTotalsFromRegistry(ctx, dbConn, "daily")
|
||||
if err != nil {
|
||||
t.Fatalf("SyncVcenterAggregateTotalsFromRegistry failed: %v", err)
|
||||
}
|
||||
if snapshotsRefreshed != 2 {
|
||||
t.Fatalf("expected 2 snapshots refreshed, got %d", snapshotsRefreshed)
|
||||
}
|
||||
if rowsUpserted != 3 {
|
||||
t.Fatalf("expected 3 rows upserted, got %d", rowsUpserted)
|
||||
}
|
||||
|
||||
rows, err := ListVcenterAggregateTotals(ctx, dbConn, "vc-a", "daily", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("ListVcenterAggregateTotals failed: %v", err)
|
||||
}
|
||||
if len(rows) != 2 {
|
||||
t.Fatalf("expected 2 daily rows for vc-a, got %d", len(rows))
|
||||
}
|
||||
if rows[0].SnapshotTime != 1_700_040_000 || rows[0].VmCount != 1 || rows[0].VcpuTotal != 4 || rows[0].RamTotalGB != 6 {
|
||||
t.Fatalf("unexpected latest vc-a daily row: %#v", rows[0])
|
||||
}
|
||||
if rows[1].SnapshotTime != 1_700_030_000 || rows[1].VmCount != 1 || rows[1].VcpuTotal != 2 || rows[1].RamTotalGB != 4 {
|
||||
t.Fatalf("unexpected older vc-a daily row: %#v", rows[1])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user