package tasks import ( "context" "fmt" "strconv" "strings" "vctp/db" "github.com/jmoiron/sqlx" ) func insertHourlyCache(ctx context.Context, dbConn *sqlx.DB, rows []InventorySnapshotRow) error { if len(rows) == 0 { return nil } if err := db.EnsureVmHourlyStats(ctx, dbConn); err != nil { return err } driver := strings.ToLower(dbConn.DriverName()) if isPostgresDriver(driver) { if len(rows) > 0 { if err := db.EnsureVmHourlyStatsPartitionForSnapshot(ctx, dbConn, rows[0].SnapshotTime); err != nil { return err } } return insertHourlyCachePostgresMultiRow(ctx, dbConn, rows) } conflict := "" verb := "INSERT INTO" if driver == "sqlite" { verb = "INSERT OR REPLACE INTO" } else { conflict = ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET "VmUuid"=EXCLUDED."VmUuid", "Name"=EXCLUDED."Name", "CreationTime"=EXCLUDED."CreationTime", "DeletionTime"=EXCLUDED."DeletionTime", "ResourcePool"=EXCLUDED."ResourcePool", "Datacenter"=EXCLUDED."Datacenter", "Cluster"=EXCLUDED."Cluster", "Folder"=EXCLUDED."Folder", "ProvisionedDisk"=EXCLUDED."ProvisionedDisk", "VcpuCount"=EXCLUDED."VcpuCount", "RamGB"=EXCLUDED."RamGB", "IsTemplate"=EXCLUDED."IsTemplate", "PoweredOn"=EXCLUDED."PoweredOn", "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` } cols := []string{ "SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", } bind := sqlx.BindType(dbConn.DriverName()) placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") stmtText := fmt.Sprintf(`%s vm_hourly_stats ("%s") VALUES (%s)%s`, verb, strings.Join(cols, `","`), placeholders, conflict) stmtText = sqlx.Rebind(bind, stmtText) tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } stmt, err := tx.PreparexContext(ctx, stmtText) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, r := range rows { args := []any{ r.SnapshotTime, r.Vcenter, r.VmId, r.VmUuid, r.Name, r.CreationTime, r.DeletionTime, r.ResourcePool, r.Datacenter, r.Cluster, r.Folder, r.ProvisionedDisk, r.VcpuCount, r.RamGB, r.IsTemplate, r.PoweredOn, r.SrmPlaceholder, } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func insertHourlyCachePostgresMultiRow(ctx context.Context, dbConn *sqlx.DB, rows []InventorySnapshotRow) error { cols := []string{ "SnapshotTime", "Vcenter", "VmId", "VmUuid", "Name", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", } conflict := ` ON CONFLICT ("Vcenter","VmId","SnapshotTime") DO UPDATE SET "VmUuid"=EXCLUDED."VmUuid", "Name"=EXCLUDED."Name", "CreationTime"=EXCLUDED."CreationTime", "DeletionTime"=EXCLUDED."DeletionTime", "ResourcePool"=EXCLUDED."ResourcePool", "Datacenter"=EXCLUDED."Datacenter", "Cluster"=EXCLUDED."Cluster", "Folder"=EXCLUDED."Folder", "ProvisionedDisk"=EXCLUDED."ProvisionedDisk", "VcpuCount"=EXCLUDED."VcpuCount", "RamGB"=EXCLUDED."RamGB", "IsTemplate"=EXCLUDED."IsTemplate", "PoweredOn"=EXCLUDED."PoweredOn", "SrmPlaceholder"=EXCLUDED."SrmPlaceholder"` tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } maxRows := postgresMaxRowsPerStatement(len(cols)) for start := 0; start < len(rows); start += maxRows { end := min(start+maxRows, len(rows)) chunk := rows[start:end] args := make([]any, 0, len(chunk)*len(cols)) for _, row := range chunk { args = append(args, row.SnapshotTime, row.Vcenter, row.VmId, row.VmUuid, row.Name, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, ) } stmt := buildPostgresMultiRowInsertSQL("vm_hourly_stats", cols, len(chunk), conflict) if _, err := tx.ExecContext(ctx, stmt, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func insertHourlyBatch(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []InventorySnapshotRow) error { if len(rows) == 0 { return nil } if _, err := db.SafeTableName(tableName); err != nil { return err } driver := strings.ToLower(dbConn.DriverName()) if isPostgresDriver(driver) { return insertHourlyBatchPostgresMultiRow(ctx, dbConn, tableName, rows) } tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } baseCols := []string{ "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", } bind := sqlx.BindType(dbConn.DriverName()) buildStmt := func(cols []string) (*sqlx.Stmt, error) { colList := `"` + strings.Join(cols, `", "`) + `"` placeholders := strings.TrimRight(strings.Repeat("?, ", len(cols)), ", ") return tx.PreparexContext(ctx, sqlx.Rebind(bind, fmt.Sprintf(`INSERT INTO %s (%s) VALUES (%s)`, tableName, colList, placeholders))) } stmt, err := buildStmt(baseCols) if err != nil { // Fallback for legacy tables that still have IsPresent. withLegacy := append(append([]string{}, baseCols...), "IsPresent") stmt, err = buildStmt(withLegacy) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, row := range rows { args := []any{ row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, "TRUE", } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } defer stmt.Close() for _, row := range rows { args := []any{ row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, } if _, err := stmt.ExecContext(ctx, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func insertHourlyBatchPostgresMultiRow(ctx context.Context, dbConn *sqlx.DB, tableName string, rows []InventorySnapshotRow) error { baseCols := []string{ "InventoryId", "Name", "Vcenter", "VmId", "EventKey", "CloudId", "CreationTime", "DeletionTime", "ResourcePool", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "VcpuCount", "RamGB", "IsTemplate", "PoweredOn", "SrmPlaceholder", "VmUuid", "SnapshotTime", } err := execHourlySnapshotInsertPostgres(ctx, dbConn, tableName, baseCols, rows, false) if err == nil { return nil } if !isLegacyIsPresentError(err) { return err } withLegacy := append(append([]string{}, baseCols...), "IsPresent") if legacyErr := execHourlySnapshotInsertPostgres(ctx, dbConn, tableName, withLegacy, rows, true); legacyErr != nil { return legacyErr } return nil } func execHourlySnapshotInsertPostgres(ctx context.Context, dbConn *sqlx.DB, tableName string, cols []string, rows []InventorySnapshotRow, includeLegacyIsPresent bool) error { tx, err := dbConn.BeginTxx(ctx, nil) if err != nil { return err } maxRows := postgresMaxRowsPerStatement(len(cols)) for start := 0; start < len(rows); start += maxRows { end := min(start+maxRows, len(rows)) chunk := rows[start:end] args := make([]any, 0, len(chunk)*len(cols)) for _, row := range chunk { args = append(args, row.InventoryId, row.Name, row.Vcenter, row.VmId, row.EventKey, row.CloudId, row.CreationTime, row.DeletionTime, row.ResourcePool, row.Datacenter, row.Cluster, row.Folder, row.ProvisionedDisk, row.VcpuCount, row.RamGB, row.IsTemplate, row.PoweredOn, row.SrmPlaceholder, row.VmUuid, row.SnapshotTime, ) if includeLegacyIsPresent { args = append(args, "TRUE") } } stmt := buildPostgresMultiRowInsertSQL(tableName, cols, len(chunk), "") if _, err := tx.ExecContext(ctx, stmt, args...); err != nil { tx.Rollback() return err } } return tx.Commit() } func isPostgresDriver(driver string) bool { switch strings.ToLower(strings.TrimSpace(driver)) { case "pgx", "postgres": return true default: return false } } func postgresMaxRowsPerStatement(colCount int) int { if colCount <= 0 { return 1 } const maxBindParams = 65535 rows := maxBindParams / colCount if rows <= 0 { return 1 } return rows } func buildPostgresMultiRowInsertSQL(tableName string, cols []string, rowCount int, suffix string) string { if rowCount <= 0 { return "" } var b strings.Builder b.WriteString(`INSERT INTO `) b.WriteString(tableName) b.WriteString(` ("`) b.WriteString(strings.Join(cols, `","`)) b.WriteString(`") VALUES `) param := 1 for row := 0; row < rowCount; row++ { if row > 0 { b.WriteString(`,`) } b.WriteString(`(`) for col := 0; col < len(cols); col++ { if col > 0 { b.WriteString(`,`) } b.WriteString(`$`) b.WriteString(strconv.Itoa(param)) param++ } b.WriteString(`)`) } if suffix != "" { b.WriteString(suffix) } return b.String() } func isLegacyIsPresentError(err error) bool { if err == nil { return false } return strings.Contains(strings.ToLower(err.Error()), "ispresent") } func dropSnapshotTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table)) return err } func clearTable(ctx context.Context, dbConn *sqlx.DB, table string) error { if _, err := db.SafeTableName(table); err != nil { return err } _, err := dbConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s", table)) if err != nil { return fmt.Errorf("failed to clear table %s: %w", table, err) } return nil }