Files
vctp2/internal/tasks/inventoryDatabase.go
T
nathan 8ccf5a7009
continuous-integration/drone/push Build is passing
enhance utilisation of postgres features
2026-04-20 10:19:27 +10:00

382 lines
10 KiB
Go

package tasks
import (
"context"
"fmt"
"strconv"
"strings"
"vctp/db"
"github.com/jmoiron/sqlx"
)
func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []InventorySnapshotRow) error {
if len(rows) == 0 {
return nil
}
if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
if isPostgresDriver(driver) {
if len(rows) > 0 {
if err := db.EnsureVmHourlyStatsPartitionForSnapshot(ctx, dbConn, rows[0].SnapshotTime); err != nil {
return err
}
}
return insertHourlyCachePostgresMultiRow(ctx, dbConn, rows)
}
conflict := ""
verb := "INSERT INTO"
if driver == "sqlite" {
verb = "INSERT OR REPLACE INTO"
} else {
conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET
"VmUuid"=EXCLUDED."VmUuid",
"Name"=EXCLUDED."Name",
"CreationTime"=EXCLUDED."CreationTime",
"DeletionTime"=EXCLUDED."DeletionTime",
"ResourcePool"=EXCLUDED."ResourcePool",
"Datacenter"=EXCLUDED."Datacenter",
"Cluster"=EXCLUDED."Cluster",
"Folder"=EXCLUDED."Folder",
"ProvisionedDisk"=EXCLUDED."ProvisionedDisk",
"VcpuCount"=EXCLUDED."VcpuCount",
"RamGB"=EXCLUDED."RamGB",
"IsTemplate"=EXCLUDED."IsTemplate",
"PoweredOn"=EXCLUDED."PoweredOn",
"SrmPlaceholder"=EXCLUDED."SrmPlaceholder"`
}
cols := []string{
"SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder",
}
bind := sqlx.BindType(dbConn.DriverName())
placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ")
stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict)
stmtText = sqlx.Rebind(bind, stmtText)
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
stmt, err := tx.PreparexContext(ctx, stmtText)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, r := range rows {
args := []any{
r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool,
r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder,
}
if _, err := stmt.ExecContext(ctx, args...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func insertHourlyCachePostgresMultiRow(ctx context.Context, dbConn *sqlx.DB, rows []InventorySnapshotRow) error {
cols := []string{
"SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool",
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder",
}
conflict := ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET
"VmUuid"=EXCLUDED."VmUuid",
"Name"=EXCLUDED."Name",
"CreationTime"=EXCLUDED."CreationTime",
"DeletionTime"=EXCLUDED."DeletionTime",
"ResourcePool"=EXCLUDED."ResourcePool",
"Datacenter"=EXCLUDED."Datacenter",
"Cluster"=EXCLUDED."Cluster",
"Folder"=EXCLUDED."Folder",
"ProvisionedDisk"=EXCLUDED."ProvisionedDisk",
"VcpuCount"=EXCLUDED."VcpuCount",
"RamGB"=EXCLUDED."RamGB",
"IsTemplate"=EXCLUDED."IsTemplate",
"PoweredOn"=EXCLUDED."PoweredOn",
"SrmPlaceholder"=EXCLUDED."SrmPlaceholder"`
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
maxRows := postgresMaxRowsPerStatement(len(cols))
for start := 0; start < len(rows); start += maxRows {
end := min(start+maxRows, len(rows))
chunk := rows[start:end]
args := make([]any, 0, len(chunk)*len(cols))
for _, row := range chunk {
args = append(args,
row.SnapshotTime, row.Vcenter, row.VmId, row.VmUuid, row.Name, row.CreationTime, row.DeletionTime, row.ResourcePool,
row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder,
)
}
stmt := buildPostgresMultiRowInsertSQL("vm_hourly_stats", cols, len(chunk), conflict)
if _, err := tx.ExecContext(ctx, stmt, args...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []InventorySnapshotRow) error {
if len(rows) == 0 {
return nil
}
if _, err := db.SafeTableName(tableName); err != nil {
return err
}
driver := strings.ToLower(dbConn.DriverName())
if isPostgresDriver(driver) {
return insertHourlyBatchPostgresMultiRow(ctx, dbConn, tableName, rows)
}
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
baseCols := []string{
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
}
bind := sqlx.BindType(dbConn.DriverName())
buildStmt := func(cols []string) (*sqlx.Stmt, error) {
colList := `"` + strings.Join(cols, `", "`) + `"`
placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ")
return tx.PreparexContext(ctx, sqlx.Rebind(bind, fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, tableName, colList, placeholders)))
}
stmt, err := buildStmt(baseCols)
if err != nil {
// Fallback for legacy tables that still have IsPresent.
withLegacy := append(append([]string{}, baseCols...), "IsPresent")
stmt, err = buildStmt(withLegacy)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, row := range rows {
args := []any{
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
"TRUE",
}
if _, err := stmt.ExecContext(ctx, args...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
defer stmt.Close()
for _, row := range rows {
args := []any{
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
}
if _, err := stmt.ExecContext(ctx, args...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func insertHourlyBatchPostgresMultiRow(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []InventorySnapshotRow) error {
baseCols := []string{
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
}
err := execHourlySnapshotInsertPostgres(ctx, dbConn, tableName, baseCols, rows, false)
if err == nil {
return nil
}
if !isLegacyIsPresentError(err) {
return err
}
withLegacy := append(append([]string{}, baseCols...), "IsPresent")
if legacyErr := execHourlySnapshotInsertPostgres(ctx, dbConn, tableName, withLegacy, rows, true); legacyErr != nil {
return legacyErr
}
return nil
}
func execHourlySnapshotInsertPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string, cols []string, rows []InventorySnapshotRow, includeLegacyIsPresent bool) error {
tx, err := dbConn.BeginTxx(ctx, nil)
if err != nil {
return err
}
maxRows := postgresMaxRowsPerStatement(len(cols))
for start := 0; start < len(rows); start += maxRows {
end := min(start+maxRows, len(rows))
chunk := rows[start:end]
args := make([]any, 0, len(chunk)*len(cols))
for _, row := range chunk {
args = append(args,
row.InventoryId,
row.Name,
row.Vcenter,
row.VmId,
row.EventKey,
row.CloudId,
row.CreationTime,
row.DeletionTime,
row.ResourcePool,
row.Datacenter,
row.Cluster,
row.Folder,
row.ProvisionedDisk,
row.VcpuCount,
row.RamGB,
row.IsTemplate,
row.PoweredOn,
row.SrmPlaceholder,
row.VmUuid,
row.SnapshotTime,
)
if includeLegacyIsPresent {
args = append(args, "TRUE")
}
}
stmt := buildPostgresMultiRowInsertSQL(tableName, cols, len(chunk), "")
if _, err := tx.ExecContext(ctx, stmt, args...); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func isPostgresDriver(driver string) bool {
switch strings.ToLower(strings.TrimSpace(driver)) {
case "pgx", "postgres":
return true
default:
return false
}
}
func postgresMaxRowsPerStatement(colCount int) int {
if colCount <= 0 {
return 1
}
const maxBindParams = 65535
rows := maxBindParams / colCount
if rows <= 0 {
return 1
}
return rows
}
func buildPostgresMultiRowInsertSQL(tableName string, cols []string, rowCount int, suffix string) string {
if rowCount <= 0 {
return ""
}
var b strings.Builder
b.WriteString(`INSERT INTO `)
b.WriteString(tableName)
b.WriteString(` ("`)
b.WriteString(strings.Join(cols, `","`))
b.WriteString(`") VALUES `)
param := 1
for row := 0; row < rowCount; row++ {
if row > 0 {
b.WriteString(`,`)
}
b.WriteString(`(`)
for col := 0; col < len(cols); col++ {
if col > 0 {
b.WriteString(`,`)
}
b.WriteString(`$`)
b.WriteString(strconv.Itoa(param))
param++
}
b.WriteString(`)`)
}
if suffix != "" {
b.WriteString(suffix)
}
return b.String()
}
func isLegacyIsPresentError(err error) bool {
if err == nil {
return false
}
return strings.Contains(strings.ToLower(err.Error()), "ispresent")
}
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := db.SafeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
return err
}
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
if _, err := db.SafeTableName(table); err != nil {
return err
}
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
if err != nil {
return fmt.Errorf("failed to clear table %s: %w", table, err)
}
return nil
}