speed up vm trace pages
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-02-09 14:19:24 +11:00
parent c4097ca608
commit 59b16db04f
12 changed files with 702 additions and 208 deletions

View File

@@ -605,6 +605,7 @@ CREATE TABLE IF NOT EXISTS vm_hourly_stats (
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmid_time_idx ON vm_hourly_stats ("VmId","SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_name_time_idx ON vm_hourly_stats (lower("Name"),"SnapshotTime")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`)
return nil
}
@@ -627,6 +628,8 @@ CREATE TABLE IF NOT EXISTS vm_lifecycle_cache (
return err
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmid_idx ON vm_lifecycle_cache ("VmId")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_name_idx ON vm_lifecycle_cache (lower("Name"))`)
return nil
}
@@ -947,6 +950,9 @@ CREATE TABLE IF NOT EXISTS vm_daily_rollup (
}
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_date_idx ON vm_daily_rollup ("Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vcenter_date_idx ON vm_daily_rollup ("Vcenter","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vmid_date_idx ON vm_daily_rollup ("VmId","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_vmuuid_date_idx ON vm_daily_rollup ("VmUuid","Date")`)
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_daily_rollup_name_date_idx ON vm_daily_rollup (lower("Name"),"Date")`)
return nil
}
@@ -1666,6 +1672,22 @@ type VmLifecycle struct {
DeletionTime int64
}
func vmLookupPredicate(vmID, vmUUID, name string) (string, []interface{}, bool) {
vmID = strings.TrimSpace(vmID)
vmUUID = strings.TrimSpace(vmUUID)
name = strings.TrimSpace(name)
switch {
case vmID != "":
return `"VmId" = ?`, []interface{}{vmID}, true
case vmUUID != "":
return `"VmUuid" = ?`, []interface{}{vmUUID}, true
case name != "":
return `lower("Name") = ?`, []interface{}{strings.ToLower(name)}, true
default:
return "", nil, false
}
}
// FetchVmTrace returns combined hourly snapshot records for a VM (by id/uuid/name) ordered by snapshot time.
// It prefers the shared vm_hourly_stats history table and falls back to per-snapshot tables.
func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
@@ -1681,24 +1703,142 @@ func FetchVmTrace(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name strin
return fetchVmTraceFromSnapshotTables(ctx, dbConn, vmID, vmUUID, name)
}
// FetchVmTraceDaily returns one row per day for a VM, preferring vm_daily_rollup cache.
func FetchVmTraceDaily(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
if TableExists(ctx, dbConn, "vm_daily_rollup") {
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {
slog.Warn("failed to ensure vm_daily_rollup indexes", "error", err)
}
rows, err := fetchVmTraceDailyFromRollup(ctx, dbConn, vmID, vmUUID, name)
if err != nil {
slog.Warn("vm daily trace cache query failed; falling back to daily summary tables", "error", err)
} else if len(rows) > 0 {
slog.Debug("vm daily trace loaded from daily rollup cache", "row_count", len(rows))
return rows, nil
}
}
return fetchVmTraceDailyFromSummaryTables(ctx, dbConn, vmID, vmUUID, name)
}
func fetchVmTraceDailyFromRollup(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
query := fmt.Sprintf(`
SELECT "Date" AS "SnapshotTime",
COALESCE("Name",'') AS "Name",
COALESCE("Vcenter",'') AS "Vcenter",
COALESCE("VmId",'') AS "VmId",
COALESCE("VmUuid",'') AS "VmUuid",
COALESCE("LastResourcePool",'') AS "ResourcePool",
CAST(CASE
WHEN COALESCE("SamplesPresent",0) > 0 THEN ROUND(1.0 * COALESCE("SumVcpu",0) / "SamplesPresent")
ELSE COALESCE("LastVcpuCount",0)
END AS BIGINT) AS "VcpuCount",
CAST(CASE
WHEN COALESCE("SamplesPresent",0) > 0 THEN ROUND(1.0 * COALESCE("SumRam",0) / "SamplesPresent")
ELSE COALESCE("LastRamGB",0)
END AS BIGINT) AS "RamGB",
COALESCE("LastProvisionedDisk",0) AS "ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM vm_daily_rollup
WHERE %s
ORDER BY "Date"
`, matchWhere)
query = dbConn.Rebind(query)
rows := make([]VmTraceRow, 0, 128)
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
return rows, nil
}
func fetchVmTraceDailyFromSummaryTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
if !TableExists(ctx, dbConn, "snapshot_registry") {
return nil, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
}
if err := selectLog(ctx, dbConn, &tables, `
SELECT table_name, snapshot_time
FROM snapshot_registry
WHERE snapshot_type = 'daily'
ORDER BY snapshot_time
`); err != nil {
return nil, err
}
if len(tables) == 0 {
return nil, nil
}
rows := make([]VmTraceRow, 0, len(tables))
for _, t := range tables {
if err := ValidateTableName(t.TableName); err != nil {
continue
}
query := fmt.Sprintf(`
SELECT %d AS "SnapshotTime",
COALESCE("Name",'') AS "Name",
COALESCE("Vcenter",'') AS "Vcenter",
COALESCE("VmId",'') AS "VmId",
COALESCE("VmUuid",'') AS "VmUuid",
COALESCE("ResourcePool",'') AS "ResourcePool",
CAST(COALESCE("AvgVcpuCount","VcpuCount",0) AS BIGINT) AS "VcpuCount",
CAST(COALESCE("AvgRamGB","RamGB",0) AS BIGINT) AS "RamGB",
COALESCE("AvgProvisionedDisk","ProvisionedDisk",0) AS "ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM %s
WHERE %s
`, t.SnapshotTime, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var tmp []VmTraceRow
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
continue
}
rows = append(rows, tmp...)
}
sort.Slice(rows, func(i, j int) bool {
return rows[i].SnapshotTime < rows[j].SnapshotTime
})
return rows, nil
}
func fetchVmTraceFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
query := `
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
query := fmt.Sprintf(`
SELECT "SnapshotTime","Name","Vcenter","VmId","VmUuid","ResourcePool","VcpuCount","RamGB","ProvisionedDisk",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM vm_hourly_stats
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
WHERE %s
ORDER BY "SnapshotTime"
`
`, matchWhere)
query = dbConn.Rebind(query)
var rows []VmTraceRow
if err := selectLog(ctx, dbConn, &rows, query, vmID, vmUUID, name); err != nil {
if err := selectLog(ctx, dbConn, &rows, query, args...); err != nil {
return nil, err
}
return rows, nil
}
func fetchVmTraceFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) ([]VmTraceRow, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return nil, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
@@ -1716,7 +1856,6 @@ ORDER BY snapshot_time
}
rows := make([]VmTraceRow, 0, len(tables))
driver := strings.ToLower(dbConn.DriverName())
slog.Debug("vm trace scanning tables", "table_count", len(tables), "vm_id", vmID, "vm_uuid", vmUUID, "name", name)
@@ -1731,14 +1870,9 @@ SELECT %d AS "SnapshotTime",
COALESCE("CreationTime",0) AS "CreationTime",
COALESCE("DeletionTime",0) AS "DeletionTime"
FROM %s
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`, t.SnapshotTime, t.TableName)
args := []interface{}{vmID, vmUUID, name}
if driver != "sqlite" {
query = strings.Replace(query, "?", "$1", 1)
query = strings.Replace(query, "?", "$2", 1)
query = strings.Replace(query, "?", "$3", 1)
}
WHERE %s
`, t.SnapshotTime, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var tmp []VmTraceRow
if err := selectLog(ctx, dbConn, &tmp, query, args...); err != nil {
slog.Warn("vm trace query failed for table", "table", t.TableName, "error", err)
@@ -1778,6 +1912,10 @@ func FetchVmLifecycle(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name s
}
func fetchVmLifecycleFromHourlyCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, bool, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return VmLifecycle{}, false, nil
}
var row struct {
Rows int64 `db:"rows"`
Creation sql.NullInt64 `db:"creation_time"`
@@ -1793,10 +1931,10 @@ SELECT
MAX("SnapshotTime") AS last_seen,
MIN(NULLIF("DeletionTime",0)) AS deletion_time
FROM vm_hourly_stats
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
WHERE ` + matchWhere + `
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, vmID, vmUUID, name); err != nil {
if err := getLog(ctx, dbConn, &row, query, args...); err != nil {
return VmLifecycle{}, false, err
}
if row.Rows == 0 {
@@ -1820,6 +1958,10 @@ WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
}
func fetchVmLifecycleFromLifecycleCache(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, bool, error) {
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return VmLifecycle{}, false, nil
}
var row struct {
Rows int64 `db:"rows"`
FirstSeen sql.NullInt64 `db:"first_seen"`
@@ -1833,10 +1975,10 @@ SELECT
MAX(NULLIF("LastSeen",0)) AS last_seen,
MIN(NULLIF("DeletedAt",0)) AS deletion_time
FROM vm_lifecycle_cache
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
WHERE ` + matchWhere + `
`
query = dbConn.Rebind(query)
if err := getLog(ctx, dbConn, &row, query, vmID, vmUUID, name); err != nil {
if err := getLog(ctx, dbConn, &row, query, args...); err != nil {
return VmLifecycle{}, false, err
}
if row.Rows == 0 {
@@ -1884,6 +2026,10 @@ func mergeVmLifecycle(base, overlay VmLifecycle) VmLifecycle {
func fetchVmLifecycleFromSnapshotTables(ctx context.Context, dbConn *sqlx.DB, vmID, vmUUID, name string) (VmLifecycle, error) {
var lifecycle VmLifecycle
matchWhere, args, ok := vmLookupPredicate(vmID, vmUUID, name)
if !ok {
return lifecycle, nil
}
var tables []struct {
TableName string `db:"table_name"`
SnapshotTime int64 `db:"snapshot_time"`
@@ -1896,7 +2042,6 @@ ORDER BY snapshot_time
`); err != nil {
return lifecycle, err
}
driver := strings.ToLower(dbConn.DriverName())
minCreation := int64(0)
consecutiveMissing := 0
@@ -1907,14 +2052,9 @@ ORDER BY snapshot_time
query := fmt.Sprintf(`
SELECT MIN(NULLIF("CreationTime",0)) AS min_creation, COUNT(1) AS cnt
FROM %s
WHERE ("VmId" = ? OR "VmUuid" = ? OR lower("Name") = lower(?))
`, t.TableName)
args := []interface{}{vmID, vmUUID, name}
if driver != "sqlite" {
query = strings.Replace(query, "?", "$1", 1)
query = strings.Replace(query, "?", "$2", 1)
query = strings.Replace(query, "?", "$3", 1)
}
WHERE %s
`, t.TableName, matchWhere)
query = dbConn.Rebind(query)
var probe struct {
MinCreation sql.NullInt64 `db:"min_creation"`
Cnt int64 `db:"cnt"`

View File

@@ -126,6 +126,20 @@ INSERT INTO vm_hourly_stats (
if traceRows[0].SnapshotTime != 1000 || traceRows[1].SnapshotTime != 2000 {
t.Fatalf("trace rows are not sorted by snapshot time: %#v", traceRows)
}
traceRowsByName, err := FetchVmTrace(ctx, dbConn, "", "", "DEMO-VM")
if err != nil {
t.Fatalf("FetchVmTrace by name failed: %v", err)
}
if len(traceRowsByName) != 2 {
t.Fatalf("expected 2 trace rows by name, got %d", len(traceRowsByName))
}
emptyTraceRows, err := FetchVmTrace(ctx, dbConn, "", "", "")
if err != nil {
t.Fatalf("FetchVmTrace with empty identifier failed: %v", err)
}
if len(emptyTraceRows) != 0 {
t.Fatalf("expected 0 trace rows for empty identifier, got %d", len(emptyTraceRows))
}
lifecycle, err := FetchVmLifecycle(ctx, dbConn, "vm-1", "", "")
if err != nil {
@@ -143,6 +157,125 @@ INSERT INTO vm_hourly_stats (
if lifecycle.DeletionTime != 2500 {
t.Fatalf("expected DeletionTime=2500 from lifecycle cache, got %d", lifecycle.DeletionTime)
}
lifecycleByName, err := FetchVmLifecycle(ctx, dbConn, "", "", "DEMO-VM")
if err != nil {
t.Fatalf("FetchVmLifecycle by name failed: %v", err)
}
if lifecycleByName.FirstSeen != 900 || lifecycleByName.LastSeen != 2000 {
t.Fatalf("unexpected lifecycle for name lookup: %#v", lifecycleByName)
}
emptyLifecycle, err := FetchVmLifecycle(ctx, dbConn, "", "", "")
if err != nil {
t.Fatalf("FetchVmLifecycle with empty identifier failed: %v", err)
}
if emptyLifecycle.FirstSeen != 0 || emptyLifecycle.LastSeen != 0 || emptyLifecycle.CreationTime != 0 || emptyLifecycle.DeletionTime != 0 {
t.Fatalf("expected empty lifecycle for empty identifier, got %#v", emptyLifecycle)
}
}
func TestFetchVmTraceDailyFromRollup(t *testing.T) {
ctx := context.Background()
dbConn := newTestSQLiteDB(t)
if err := EnsureVmDailyRollup(ctx, dbConn); err != nil {
t.Fatalf("failed to ensure vm_daily_rollup: %v", err)
}
if err := UpsertVmDailyRollup(ctx, dbConn, 1700000000, VmDailyRollupRow{
Vcenter: "vc-a",
VmId: "vm-1",
VmUuid: "uuid-1",
Name: "demo-vm",
CreationTime: 1699999000,
SamplesPresent: 8,
SumVcpu: 32,
SumRam: 64,
LastVcpuCount: 4,
LastRamGB: 8,
LastResourcePool: "Tin",
}); err != nil {
t.Fatalf("failed to insert daily rollup row 1: %v", err)
}
if err := UpsertVmDailyRollup(ctx, dbConn, 1700086400, VmDailyRollupRow{
Vcenter: "vc-a",
VmId: "vm-1",
VmUuid: "uuid-1",
Name: "demo-vm",
CreationTime: 1699999000,
SamplesPresent: 4,
SumVcpu: 20,
SumRam: 36,
LastVcpuCount: 5,
LastRamGB: 9,
LastResourcePool: "Gold",
LastProvisionedDisk: 150.5,
}); err != nil {
t.Fatalf("failed to insert daily rollup row 2: %v", err)
}
rows, err := FetchVmTraceDaily(ctx, dbConn, "vm-1", "", "")
if err != nil {
t.Fatalf("FetchVmTraceDaily failed: %v", err)
}
if len(rows) != 2 {
t.Fatalf("expected 2 daily trace rows, got %d", len(rows))
}
if rows[0].SnapshotTime != 1700000000 || rows[0].VcpuCount != 4 || rows[0].RamGB != 8 {
t.Fatalf("unexpected first daily row: %#v", rows[0])
}
if rows[1].SnapshotTime != 1700086400 || rows[1].VcpuCount != 5 || rows[1].RamGB != 9 || rows[1].ProvisionedDisk != 150.5 {
t.Fatalf("unexpected second daily row: %#v", rows[1])
}
}
func TestFetchVmTraceDailyFallbackToSummaryTables(t *testing.T) {
ctx := context.Background()
dbConn := newTestSQLiteDB(t)
if _, err := dbConn.ExecContext(ctx, `
CREATE TABLE snapshot_registry (
snapshot_type TEXT,
table_name TEXT,
snapshot_time BIGINT
)`); err != nil {
t.Fatalf("failed to create snapshot_registry: %v", err)
}
summaryTable := "inventory_daily_summary_20260106"
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`
CREATE TABLE %s (
"Name" TEXT,
"Vcenter" TEXT,
"VmId" TEXT,
"VmUuid" TEXT,
"ResourcePool" TEXT,
"AvgVcpuCount" REAL,
"AvgRamGB" REAL,
"AvgProvisionedDisk" REAL,
"CreationTime" BIGINT,
"DeletionTime" BIGINT
)`, summaryTable)); err != nil {
t.Fatalf("failed to create summary table: %v", err)
}
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s ("Name","Vcenter","VmId","VmUuid","ResourcePool","AvgVcpuCount","AvgRamGB","AvgProvisionedDisk","CreationTime","DeletionTime")
VALUES (?,?,?,?,?,?,?,?,?,?)
`, summaryTable), "demo-vm", "vc-a", "vm-1", "uuid-1", "Silver", 3.2, 6.7, 123.4, int64(1699999000), int64(0)); err != nil {
t.Fatalf("failed to insert summary row: %v", err)
}
if _, err := dbConn.ExecContext(ctx, `INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time) VALUES (?,?,?)`, "daily", summaryTable, int64(1700500000)); err != nil {
t.Fatalf("failed to insert snapshot_registry row: %v", err)
}
rows, err := FetchVmTraceDaily(ctx, dbConn, "", "uuid-1", "")
if err != nil {
t.Fatalf("FetchVmTraceDaily fallback failed: %v", err)
}
if len(rows) != 1 {
t.Fatalf("expected 1 fallback daily row, got %d", len(rows))
}
if rows[0].SnapshotTime != 1700500000 || rows[0].VcpuCount != 3 || rows[0].RamGB != 6 {
t.Fatalf("unexpected fallback daily row: %#v", rows[0])
}
}
func TestParseHourlySnapshotUnix(t *testing.T) {