192 lines
5.0 KiB
Go
192 lines
5.0 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"vctp/db"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []InventorySnapshotRow) error {
|
|
if len(rows) == 0 {
|
|
return nil
|
|
}
|
|
if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
driver := strings.ToLower(dbConn.DriverName())
|
|
conflict := ""
|
|
verb := "INSERT INTO"
|
|
if driver == "sqlite" {
|
|
verb = "INSERT OR REPLACE INTO"
|
|
} else {
|
|
conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET
|
|
"VmUuid"=EXCLUDED."VmUuid",
|
|
"Name"=EXCLUDED."Name",
|
|
"CreationTime"=EXCLUDED."CreationTime",
|
|
"DeletionTime"=EXCLUDED."DeletionTime",
|
|
"ResourcePool"=EXCLUDED."ResourcePool",
|
|
"Datacenter"=EXCLUDED."Datacenter",
|
|
"Cluster"=EXCLUDED."Cluster",
|
|
"Folder"=EXCLUDED."Folder",
|
|
"ProvisionedDisk"=EXCLUDED."ProvisionedDisk",
|
|
"VcpuCount"=EXCLUDED."VcpuCount",
|
|
"RamGB"=EXCLUDED."RamGB",
|
|
"IsTemplate"=EXCLUDED."IsTemplate",
|
|
"PoweredOn"=EXCLUDED."PoweredOn",
|
|
"SrmPlaceholder"=EXCLUDED."SrmPlaceholder"`
|
|
}
|
|
|
|
cols := []string{
|
|
"SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool",
|
|
"Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder",
|
|
}
|
|
bind := sqlx.BindType(dbConn.DriverName())
|
|
placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ")
|
|
stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict)
|
|
stmtText = sqlx.Rebind(bind, stmtText)
|
|
|
|
tx, err := dbConn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stmt, err := tx.PreparexContext(ctx, stmtText)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, r := range rows {
|
|
args := []interface{}{
|
|
r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool,
|
|
r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder,
|
|
}
|
|
if _, err := stmt.ExecContext(ctx, args...); err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []InventorySnapshotRow) error {
|
|
if len(rows) == 0 {
|
|
return nil
|
|
}
|
|
if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil {
|
|
return err
|
|
}
|
|
tx, err := dbConn.BeginTxx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
baseCols := []string{
|
|
"InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime",
|
|
"ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount",
|
|
"RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime",
|
|
}
|
|
bind := sqlx.BindType(dbConn.DriverName())
|
|
buildStmt := func(cols []string) (*sqlx.Stmt, error) {
|
|
colList := `"` + strings.Join(cols, `", "`) + `"`
|
|
placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ")
|
|
return tx.PreparexContext(ctx, sqlx.Rebind(bind, fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, tableName, colList, placeholders)))
|
|
}
|
|
|
|
stmt, err := buildStmt(baseCols)
|
|
if err != nil {
|
|
// Fallback for legacy tables that still have IsPresent.
|
|
withLegacy := append(append([]string{}, baseCols...), "IsPresent")
|
|
stmt, err = buildStmt(withLegacy)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
for _, row := range rows {
|
|
args := []interface{}{
|
|
row.InventoryId,
|
|
row.Name,
|
|
row.Vcenter,
|
|
row.VmId,
|
|
row.EventKey,
|
|
row.CloudId,
|
|
row.CreationTime,
|
|
row.DeletionTime,
|
|
row.ResourcePool,
|
|
row.Datacenter,
|
|
row.Cluster,
|
|
row.Folder,
|
|
row.ProvisionedDisk,
|
|
row.VcpuCount,
|
|
row.RamGB,
|
|
row.IsTemplate,
|
|
row.PoweredOn,
|
|
row.SrmPlaceholder,
|
|
row.VmUuid,
|
|
row.SnapshotTime,
|
|
"TRUE",
|
|
}
|
|
if _, err := stmt.ExecContext(ctx, args...); err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, row := range rows {
|
|
args := []interface{}{
|
|
row.InventoryId,
|
|
row.Name,
|
|
row.Vcenter,
|
|
row.VmId,
|
|
row.EventKey,
|
|
row.CloudId,
|
|
row.CreationTime,
|
|
row.DeletionTime,
|
|
row.ResourcePool,
|
|
row.Datacenter,
|
|
row.Cluster,
|
|
row.Folder,
|
|
row.ProvisionedDisk,
|
|
row.VcpuCount,
|
|
row.RamGB,
|
|
row.IsTemplate,
|
|
row.PoweredOn,
|
|
row.SrmPlaceholder,
|
|
row.VmUuid,
|
|
row.SnapshotTime,
|
|
}
|
|
if _, err := stmt.ExecContext(ctx, args...); err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
|
if _, err := db.SafeTableName(table); err != nil {
|
|
return err
|
|
}
|
|
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table))
|
|
return err
|
|
}
|
|
|
|
func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error {
|
|
if _, err := db.SafeTableName(table); err != nil {
|
|
return err
|
|
}
|
|
_, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clear table %s: %w", table, err)
|
|
}
|
|
return nil
|
|
}
|