in depth fix of deletion/creation data
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -1079,14 +1079,15 @@ type totalsPoint struct {
|
|||||||
|
|
||||||
func buildHourlyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotRecord) ([]totalsPoint, error) {
|
func buildHourlyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotRecord) ([]totalsPoint, error) {
|
||||||
type hourBucket struct {
|
type hourBucket struct {
|
||||||
samples int
|
samples int
|
||||||
vmSum float64
|
vmSum float64
|
||||||
vcpuSum float64
|
vcpuSum float64
|
||||||
ramSum float64
|
ramSum float64
|
||||||
tinSum float64
|
presenceSum float64
|
||||||
bronzeSum float64
|
tinSum float64
|
||||||
silverSum float64
|
bronzeSum float64
|
||||||
goldSum float64
|
silverSum float64
|
||||||
|
goldSum float64
|
||||||
}
|
}
|
||||||
buckets := make(map[int64]*hourBucket)
|
buckets := make(map[int64]*hourBucket)
|
||||||
|
|
||||||
@@ -1107,19 +1108,35 @@ func buildHourlyTotals(ctx context.Context, dbConn *sqlx.DB, records []SnapshotR
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
hourStart := record.SnapshotTime.Local().Truncate(time.Hour)
|
||||||
|
hourStartUnix := hourStart.Unix()
|
||||||
|
hourEndUnix := hourStart.Add(time.Hour).Unix()
|
||||||
|
durationSeconds := float64(hourEndUnix - hourStartUnix)
|
||||||
|
startExpr := `CASE WHEN "CreationTime" IS NOT NULL AND "CreationTime" > 0 AND "CreationTime" > ? THEN "CreationTime" ELSE ? END`
|
||||||
|
endExpr := `CASE WHEN "DeletionTime" IS NOT NULL AND "DeletionTime" > 0 AND "DeletionTime" < ? THEN "DeletionTime" ELSE ? END`
|
||||||
|
overlapExpr := fmt.Sprintf(`CASE WHEN %s > %s THEN (CAST((%s - %s) AS REAL) / ?) ELSE 0 END`, endExpr, startExpr, endExpr, startExpr)
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(DISTINCT "VmId") AS vm_count,
|
COUNT(DISTINCT "VmId") AS vm_count,
|
||||||
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
COALESCE(SUM(CASE WHEN "VcpuCount" IS NOT NULL THEN "VcpuCount" ELSE 0 END), 0) AS vcpu_total,
|
||||||
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
COALESCE(SUM(CASE WHEN "RamGB" IS NOT NULL THEN "RamGB" ELSE 0 END), 0) AS ram_total,
|
||||||
1.0 AS presence_ratio,
|
COALESCE(SUM(presence), 0) AS presence_ratio,
|
||||||
COALESCE(SUM(CASE WHEN LOWER("ResourcePool") = 'tin' THEN 1 ELSE 0 END), 0) AS tin_total,
|
COALESCE(SUM(CASE WHEN pool = 'tin' THEN presence ELSE 0 END), 0) AS tin_total,
|
||||||
COALESCE(SUM(CASE WHEN LOWER("ResourcePool") = 'bronze' THEN 1 ELSE 0 END), 0) AS bronze_total,
|
COALESCE(SUM(CASE WHEN pool = 'bronze' THEN presence ELSE 0 END), 0) AS bronze_total,
|
||||||
COALESCE(SUM(CASE WHEN LOWER("ResourcePool") = 'silver' THEN 1 ELSE 0 END), 0) AS silver_total,
|
COALESCE(SUM(CASE WHEN pool = 'silver' THEN presence ELSE 0 END), 0) AS silver_total,
|
||||||
COALESCE(SUM(CASE WHEN LOWER("ResourcePool") = 'gold' THEN 1 ELSE 0 END), 0) AS gold_total
|
COALESCE(SUM(CASE WHEN pool = 'gold' THEN presence ELSE 0 END), 0) AS gold_total
|
||||||
FROM %s
|
FROM (
|
||||||
WHERE %s
|
SELECT
|
||||||
`, record.TableName, templateExclusionFilter())
|
"VmId",
|
||||||
|
"VcpuCount",
|
||||||
|
"RamGB",
|
||||||
|
LOWER(COALESCE("ResourcePool", '')) AS pool,
|
||||||
|
%s AS presence
|
||||||
|
FROM %s
|
||||||
|
WHERE %s
|
||||||
|
) t
|
||||||
|
`, overlapExpr, record.TableName, templateExclusionFilter())
|
||||||
|
query = dbConn.Rebind(query)
|
||||||
var row struct {
|
var row struct {
|
||||||
VmCount int64 `db:"vm_count"`
|
VmCount int64 `db:"vm_count"`
|
||||||
VcpuTotal int64 `db:"vcpu_total"`
|
VcpuTotal int64 `db:"vcpu_total"`
|
||||||
@@ -1130,10 +1147,17 @@ WHERE %s
|
|||||||
SilverTotal float64 `db:"silver_total"`
|
SilverTotal float64 `db:"silver_total"`
|
||||||
GoldTotal float64 `db:"gold_total"`
|
GoldTotal float64 `db:"gold_total"`
|
||||||
}
|
}
|
||||||
if err := dbConn.GetContext(ctx, &row, query); err != nil {
|
args := []interface{}{
|
||||||
|
hourEndUnix, hourEndUnix,
|
||||||
|
hourStartUnix, hourStartUnix,
|
||||||
|
hourEndUnix, hourEndUnix,
|
||||||
|
hourStartUnix, hourStartUnix,
|
||||||
|
durationSeconds,
|
||||||
|
}
|
||||||
|
if err := dbConn.GetContext(ctx, &row, query, args...); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
hourKey := record.SnapshotTime.Local().Truncate(time.Hour).Unix()
|
hourKey := hourStartUnix
|
||||||
bucket := buckets[hourKey]
|
bucket := buckets[hourKey]
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
bucket = &hourBucket{}
|
bucket = &hourBucket{}
|
||||||
@@ -1143,6 +1167,7 @@ WHERE %s
|
|||||||
bucket.vmSum += float64(row.VmCount)
|
bucket.vmSum += float64(row.VmCount)
|
||||||
bucket.vcpuSum += float64(row.VcpuTotal)
|
bucket.vcpuSum += float64(row.VcpuTotal)
|
||||||
bucket.ramSum += float64(row.RamTotal)
|
bucket.ramSum += float64(row.RamTotal)
|
||||||
|
bucket.presenceSum += row.PresenceRatio
|
||||||
bucket.tinSum += row.TinTotal
|
bucket.tinSum += row.TinTotal
|
||||||
bucket.bronzeSum += row.BronzeTotal
|
bucket.bronzeSum += row.BronzeTotal
|
||||||
bucket.silverSum += row.SilverTotal
|
bucket.silverSum += row.SilverTotal
|
||||||
@@ -1163,16 +1188,31 @@ WHERE %s
|
|||||||
}
|
}
|
||||||
denom := float64(bucket.samples)
|
denom := float64(bucket.samples)
|
||||||
vmAvg := bucket.vmSum / denom
|
vmAvg := bucket.vmSum / denom
|
||||||
|
presenceAvg := bucket.presenceSum / denom
|
||||||
|
tinAvg := bucket.tinSum / denom
|
||||||
|
bronzeAvg := bucket.bronzeSum / denom
|
||||||
|
silverAvg := bucket.silverSum / denom
|
||||||
|
goldAvg := bucket.goldSum / denom
|
||||||
|
slog.Debug(
|
||||||
|
"hourly totals bucket",
|
||||||
|
"hour_start", time.Unix(key, 0).Local().Format("2006-01-02 15:00"),
|
||||||
|
"samples", bucket.samples,
|
||||||
|
"presence_ratio", presenceAvg,
|
||||||
|
"tin_total", tinAvg,
|
||||||
|
"bronze_total", bronzeAvg,
|
||||||
|
"silver_total", silverAvg,
|
||||||
|
"gold_total", goldAvg,
|
||||||
|
)
|
||||||
points = append(points, totalsPoint{
|
points = append(points, totalsPoint{
|
||||||
Label: time.Unix(key, 0).Local().Format("2006-01-02 15:00"),
|
Label: time.Unix(key, 0).Local().Format("2006-01-02 15:00"),
|
||||||
VmCount: vmAvg,
|
VmCount: vmAvg,
|
||||||
VcpuTotal: bucket.vcpuSum / denom,
|
VcpuTotal: bucket.vcpuSum / denom,
|
||||||
RamTotal: bucket.ramSum / denom,
|
RamTotal: bucket.ramSum / denom,
|
||||||
PresenceRatio: vmAvg,
|
PresenceRatio: presenceAvg,
|
||||||
TinTotal: bucket.tinSum / denom,
|
TinTotal: tinAvg,
|
||||||
BronzeTotal: bucket.bronzeSum / denom,
|
BronzeTotal: bronzeAvg,
|
||||||
SilverTotal: bucket.silverSum / denom,
|
SilverTotal: silverAvg,
|
||||||
GoldTotal: bucket.goldSum / denom,
|
GoldTotal: goldAvg,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return points, nil
|
return points, nil
|
||||||
|
|||||||
@@ -212,6 +212,39 @@ func updateDeletionTimeInSnapshot(ctx context.Context, dbConn *sqlx.DB, table, v
|
|||||||
return rowsAffected, nil
|
return rowsAffected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateDeletionTimeInHourlyCache(ctx context.Context, dbConn *sqlx.DB, vcenter, vmID, vmUUID, name string, snapshotUnix, deletionUnix int64) (int64, error) {
|
||||||
|
if snapshotUnix <= 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
matchColumn := ""
|
||||||
|
matchValue := ""
|
||||||
|
switch {
|
||||||
|
case vmID != "":
|
||||||
|
matchColumn = "VmId"
|
||||||
|
matchValue = vmID
|
||||||
|
case vmUUID != "":
|
||||||
|
matchColumn = "VmUuid"
|
||||||
|
matchValue = vmUUID
|
||||||
|
case name != "":
|
||||||
|
matchColumn = "Name"
|
||||||
|
matchValue = name
|
||||||
|
default:
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`UPDATE vm_hourly_stats SET "DeletionTime" = ? WHERE "Vcenter" = ? AND "SnapshotTime" = ? AND "%s" = ? AND ("DeletionTime" IS NULL OR "DeletionTime" = 0 OR "DeletionTime" > ?)`, matchColumn)
|
||||||
|
query = sqlx.Rebind(sqlx.BindType(dbConn.DriverName()), query)
|
||||||
|
result, err := dbConn.ExecContext(ctx, query, deletionUnix, vcenter, snapshotUnix, matchValue, deletionUnix)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
rowsAffected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return rowsAffected, nil
|
||||||
|
}
|
||||||
|
|
||||||
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
|
// markMissingFromPrevious marks VMs that were present in the previous snapshot but missing now.
|
||||||
func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time,
|
func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB, prevTable string, vcenter string, snapshotTime time.Time,
|
||||||
currentByID map[string]InventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{},
|
currentByID map[string]InventorySnapshotRow, currentByUuid map[string]struct{}, currentByName map[string]struct{},
|
||||||
@@ -313,6 +346,13 @@ func (c *CronTask) markMissingFromPrevious(ctx context.Context, dbConn *sqlx.DB,
|
|||||||
} else if rowsAffected > 0 {
|
} else if rowsAffected > 0 {
|
||||||
tableUpdated = true
|
tableUpdated = true
|
||||||
c.Logger.Debug("updated hourly snapshot deletion time", "table", prevTable, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter, "deletion_time", delTime.Int64)
|
c.Logger.Debug("updated hourly snapshot deletion time", "table", prevTable, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter, "deletion_time", delTime.Int64)
|
||||||
|
if snapUnix, ok := parseSnapshotTime(prevTable); ok {
|
||||||
|
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, vcenter, inv.VmId.String, vmUUID, inv.Name, snapUnix, delTime.Int64); err != nil {
|
||||||
|
c.Logger.Warn("failed to update hourly cache deletion time", "error", err, "snapshot_time", snapUnix, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter)
|
||||||
|
} else if cacheRows > 0 {
|
||||||
|
c.Logger.Debug("updated hourly cache deletion time", "snapshot_time", snapUnix, "vm_id", inv.VmId.String, "vm_uuid", vmUUID, "vcenter", vcenter, "deletion_time", delTime.Int64)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable)
|
c.Logger.Debug("Detected VM missing compared to previous snapshot", "name", inv.Name, "vm_id", inv.VmId.String, "vm_uuid", inv.VmUuid.String, "vcenter", vcenter, "snapshot_time", snapshotTime, "prev_table", prevTable)
|
||||||
missing++
|
missing++
|
||||||
|
|||||||
@@ -73,6 +73,13 @@ func backfillLifecycleDeletionsToday(ctx context.Context, logger *slog.Logger, d
|
|||||||
} else if rowsAffected > 0 {
|
} else if rowsAffected > 0 {
|
||||||
updatedTables[lastSeenTable] = struct{}{}
|
updatedTables[lastSeenTable] = struct{}{}
|
||||||
logger.Debug("lifecycle backfill updated hourly snapshot deletion time", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "table", lastSeenTable, "deletion", deletion)
|
logger.Debug("lifecycle backfill updated hourly snapshot deletion time", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "table", lastSeenTable, "deletion", deletion)
|
||||||
|
if snapUnix, ok := parseSnapshotTime(lastSeenTable); ok {
|
||||||
|
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, vcenter, cand.vmID, cand.vmUUID, cand.name, snapUnix, deletion); err != nil {
|
||||||
|
logger.Warn("lifecycle backfill failed to update hourly cache deletion time", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "snapshot_time", snapUnix, "deletion", deletion, "error", err)
|
||||||
|
} else if cacheRows > 0 {
|
||||||
|
logger.Debug("lifecycle backfill updated hourly cache deletion time", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "snapshot_time", snapUnix, "deletion", deletion)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion)
|
logger.Debug("lifecycle backfill applied", "vcenter", vcenter, "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "cluster", cand.cluster, "deletion", deletion)
|
||||||
|
|||||||
@@ -887,9 +887,6 @@ func prepareDeletionCandidates(ctx context.Context, log *slog.Logger, dbConn *sq
|
|||||||
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil {
|
if err := db.MarkVmDeletedWithDetails(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime.Unix()); err != nil {
|
||||||
log.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err)
|
log.Warn("failed to mark vm deleted in lifecycle cache", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "error", err)
|
||||||
}
|
}
|
||||||
if err := db.UpsertVmLifecycleCache(ctx, dbConn, url, inv.VmId.String, inv.VmUuid.String, inv.Name, clusterName, startTime); err != nil {
|
|
||||||
log.Warn("failed to upsert vm lifecycle cache (deletion path)", "vcenter", url, "vm_id", inv.VmId, "vm_uuid", inv.VmUuid, "name", inv.Name, "error", err)
|
|
||||||
}
|
|
||||||
missingCount++
|
missingCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1100,6 +1097,13 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
reportTables[snapTable] = struct{}{}
|
reportTables[snapTable] = struct{}{}
|
||||||
deletionsMarked = true
|
deletionsMarked = true
|
||||||
log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
log.Debug("updated hourly snapshot deletion time from event", "table", snapTable, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
||||||
|
if snapUnix, ok := parseSnapshotTime(snapTable); ok {
|
||||||
|
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, cand.vmID, vmUUID, name, snapUnix, delTs.Int64); err != nil {
|
||||||
|
log.Warn("failed to update hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "error", err)
|
||||||
|
} else if cacheRows > 0 {
|
||||||
|
log.Debug("updated hourly cache deletion time from event", "snapshot_time", snapUnix, "vm_id", cand.vmID, "vm_uuid", vmUUID, "vcenter", url, "event_time", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t)
|
log.Info("refined deletion time from vcenter event", "vm_id", cand.vmID, "vm_uuid", cand.vmUUID, "name", cand.name, "vcenter", url, "event_time", t)
|
||||||
@@ -1232,6 +1236,13 @@ func (c *CronTask) captureHourlySnapshotForVcenter(ctx context.Context, startTim
|
|||||||
reportTables[tableToUpdate] = struct{}{}
|
reportTables[tableToUpdate] = struct{}{}
|
||||||
deletionsMarked = true
|
deletionsMarked = true
|
||||||
c.Logger.Debug("count-drop: updated hourly snapshot deletion time from event", "table", tableToUpdate, "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "vcenter", url, "event_time", t)
|
c.Logger.Debug("count-drop: updated hourly snapshot deletion time from event", "table", tableToUpdate, "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "vcenter", url, "event_time", t)
|
||||||
|
if snapUnix, ok := parseSnapshotTime(tableToUpdate); ok {
|
||||||
|
if cacheRows, err := updateDeletionTimeInHourlyCache(ctx, dbConn, url, vmID, inv.VmUuid.String, inv.Name, snapUnix, delTs.Int64); err != nil {
|
||||||
|
c.Logger.Warn("count-drop: failed to update hourly cache deletion time", "snapshot_time", snapUnix, "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "vcenter", url, "error", err)
|
||||||
|
} else if cacheRows > 0 {
|
||||||
|
c.Logger.Debug("count-drop: updated hourly cache deletion time", "snapshot_time", snapUnix, "vm_id", vmID, "vm_uuid", inv.VmUuid.String, "vcenter", url, "event_time", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
missingCount++
|
missingCount++
|
||||||
|
|||||||
Reference in New Issue
Block a user