This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"vctp/db/queries"
|
||||
@@ -29,6 +30,39 @@ type ColumnDef struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type ensureOnceKey struct {
|
||||
dbConn *sqlx.DB
|
||||
name string
|
||||
}
|
||||
|
||||
type ensureOnceState struct {
|
||||
mu sync.Mutex
|
||||
done bool
|
||||
}
|
||||
|
||||
var ensureOnceRegistry sync.Map
|
||||
|
||||
// ensureOncePerDB runs fn once per DB connection for a given logical key.
|
||||
// The function is considered complete only when fn returns nil.
|
||||
func ensureOncePerDB(dbConn *sqlx.DB, name string, fn func() error) error {
|
||||
if dbConn == nil {
|
||||
return fmt.Errorf("db connection is nil")
|
||||
}
|
||||
key := ensureOnceKey{dbConn: dbConn, name: name}
|
||||
stateAny, _ := ensureOnceRegistry.LoadOrStore(key, &ensureOnceState{})
|
||||
state := stateAny.(*ensureOnceState)
|
||||
state.mu.Lock()
|
||||
defer state.mu.Unlock()
|
||||
if state.done {
|
||||
return nil
|
||||
}
|
||||
if err := fn(); err != nil {
|
||||
return err
|
||||
}
|
||||
state.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// TableRowCount returns COUNT(*) for a table.
|
||||
func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, error) {
|
||||
if err := ValidateTableName(table); err != nil {
|
||||
@@ -600,14 +634,27 @@ CREATE TABLE IF NOT EXISTS vm_hourly_stats (
|
||||
"SrmPlaceholder" TEXT,
|
||||
PRIMARY KEY ("Vcenter","VmId","SnapshotTime")
|
||||
);`
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`)
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmid_time_idx ON vm_hourly_stats ("VmId","SnapshotTime")`)
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_name_time_idx ON vm_hourly_stats (lower("Name"),"SnapshotTime")`)
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`)
|
||||
return nil
|
||||
return ensureOncePerDB(dbConn, "vm_hourly_stats", func() error {
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
indexQueries := []string{
|
||||
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmuuid_time_idx ON vm_hourly_stats ("VmUuid","SnapshotTime")`,
|
||||
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_vmid_time_idx ON vm_hourly_stats ("VmId","SnapshotTime")`,
|
||||
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_name_time_idx ON vm_hourly_stats (lower("Name"),"SnapshotTime")`,
|
||||
`CREATE INDEX IF NOT EXISTS vm_hourly_stats_snapshottime_idx ON vm_hourly_stats ("SnapshotTime")`,
|
||||
}
|
||||
failedIndexes := 0
|
||||
for _, q := range indexQueries {
|
||||
if _, err := execLog(ctx, dbConn, q); err != nil {
|
||||
failedIndexes++
|
||||
}
|
||||
}
|
||||
if failedIndexes > 0 {
|
||||
slog.Warn("vm_hourly_stats index ensure incomplete; continuing without retries until restart", "failed_indexes", failedIndexes)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// EnsureVmLifecycleCache creates an upsert cache for first/last seen VM info.
|
||||
@@ -624,13 +671,26 @@ CREATE TABLE IF NOT EXISTS vm_lifecycle_cache (
|
||||
"DeletedAt" BIGINT,
|
||||
PRIMARY KEY ("Vcenter","VmId","VmUuid")
|
||||
);`
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`)
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmid_idx ON vm_lifecycle_cache ("VmId")`)
|
||||
_, _ = execLog(ctx, dbConn, `CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_name_idx ON vm_lifecycle_cache (lower("Name"))`)
|
||||
return nil
|
||||
return ensureOncePerDB(dbConn, "vm_lifecycle_cache", func() error {
|
||||
if _, err := execLog(ctx, dbConn, ddl); err != nil {
|
||||
return err
|
||||
}
|
||||
indexQueries := []string{
|
||||
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmuuid_idx ON vm_lifecycle_cache ("VmUuid")`,
|
||||
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_vmid_idx ON vm_lifecycle_cache ("VmId")`,
|
||||
`CREATE INDEX IF NOT EXISTS vm_lifecycle_cache_name_idx ON vm_lifecycle_cache (lower("Name"))`,
|
||||
}
|
||||
failedIndexes := 0
|
||||
for _, q := range indexQueries {
|
||||
if _, err := execLog(ctx, dbConn, q); err != nil {
|
||||
failedIndexes++
|
||||
}
|
||||
}
|
||||
if failedIndexes > 0 {
|
||||
slog.Warn("vm_lifecycle_cache index ensure incomplete; continuing without retries until restart", "failed_indexes", failedIndexes)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// UpsertVmLifecycleCache updates first/last seen info for a VM.
|
||||
|
||||
@@ -3,6 +3,7 @@ package db
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -32,6 +33,37 @@ func indexExists(t *testing.T, dbConn *sqlx.DB, name string) bool {
|
||||
return count > 0
|
||||
}
|
||||
|
||||
func TestEnsureOncePerDBRetriesUntilSuccess(t *testing.T) {
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
attempts := 0
|
||||
run := func() error {
|
||||
attempts++
|
||||
if attempts == 1 {
|
||||
return errors.New("transient failure")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := ensureOncePerDB(dbConn, "test_once", run); err == nil {
|
||||
t.Fatal("expected first ensureOncePerDB call to fail")
|
||||
}
|
||||
if attempts != 1 {
|
||||
t.Fatalf("expected 1 attempt after first call, got %d", attempts)
|
||||
}
|
||||
if err := ensureOncePerDB(dbConn, "test_once", run); err != nil {
|
||||
t.Fatalf("expected second ensureOncePerDB call to succeed, got %v", err)
|
||||
}
|
||||
if attempts != 2 {
|
||||
t.Fatalf("expected 2 attempts after retry, got %d", attempts)
|
||||
}
|
||||
if err := ensureOncePerDB(dbConn, "test_once", run); err != nil {
|
||||
t.Fatalf("expected third ensureOncePerDB call to reuse success, got %v", err)
|
||||
}
|
||||
if attempts != 2 {
|
||||
t.Fatalf("expected no additional attempts after success, got %d", attempts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupHourlySnapshotIndexesOlderThan(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbConn := newTestSQLiteDB(t)
|
||||
|
||||
Reference in New Issue
Block a user