This commit is contained in:
@@ -33,11 +33,15 @@ func TableRowCount(ctx context.Context, dbConn *sqlx.DB, table string) (int64, e
|
||||
if err := ValidateTableName(table); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
start := time.Now()
|
||||
slog.Debug("db row count start", "table", table)
|
||||
var count int64
|
||||
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)
|
||||
if err := getLog(ctx, dbConn, &count, query); err != nil {
|
||||
slog.Debug("db row count failed", "table", table, "duration", time.Since(start), "error", err)
|
||||
return 0, err
|
||||
}
|
||||
slog.Debug("db row count complete", "table", table, "rows", count, "duration", time.Since(start))
|
||||
return count, nil
|
||||
}
|
||||
|
||||
@@ -422,11 +426,18 @@ func CheckpointSQLite(ctx context.Context, dbConn *sqlx.DB) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
start := time.Now()
|
||||
slog.Debug("sqlite checkpoint start")
|
||||
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
_, err := dbConn.ExecContext(cctx, `PRAGMA wal_checkpoint(TRUNCATE);`)
|
||||
if err != nil {
|
||||
slog.Warn("sqlite checkpoint failed", "error", err, "duration", time.Since(start))
|
||||
return err
|
||||
}
|
||||
slog.Debug("sqlite checkpoint complete", "duration", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureVmHourlyStats creates the shared per-snapshot cache table used by Go aggregations.
|
||||
func EnsureVmHourlyStats(ctx context.Context, dbConn *sqlx.DB) error {
|
||||
@@ -1311,9 +1322,13 @@ func AnalyzeTableIfPostgres(ctx context.Context, dbConn *sqlx.DB, tableName stri
|
||||
if driver != "pgx" && driver != "postgres" {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
slog.Debug("db analyze start", "table", tableName)
|
||||
if _, err := execLog(ctx, dbConn, fmt.Sprintf(`ANALYZE %s`, tableName)); err != nil {
|
||||
slog.Warn("failed to ANALYZE table", "table", tableName, "error", err)
|
||||
return
|
||||
}
|
||||
slog.Debug("db analyze complete", "table", tableName, "duration", time.Since(start))
|
||||
}
|
||||
|
||||
// SetPostgresWorkMem sets a per-session work_mem for heavy aggregations; no-op for other drivers.
|
||||
|
||||
@@ -258,9 +258,17 @@ func RegisterSnapshot(ctx context.Context, database db.Database, snapshotType st
|
||||
}
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
start := time.Now()
|
||||
slog.Debug("snapshot registry upsert start",
|
||||
"type", snapshotType,
|
||||
"table", tableName,
|
||||
"snapshot_time", snapshotTime.Unix(),
|
||||
"row_count", snapshotCount,
|
||||
)
|
||||
var err error
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
_, err = dbConn.ExecContext(ctx, `
|
||||
INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time, snapshot_count)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(table_name) DO UPDATE SET
|
||||
@@ -268,9 +276,8 @@ ON CONFLICT(table_name) DO UPDATE SET
|
||||
snapshot_type = excluded.snapshot_type,
|
||||
snapshot_count = excluded.snapshot_count
|
||||
`, snapshotType, tableName, snapshotTime.Unix(), snapshotCount)
|
||||
return err
|
||||
case "pgx", "postgres":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
_, err = dbConn.ExecContext(ctx, `
|
||||
INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time, snapshot_count)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (table_name) DO UPDATE SET
|
||||
@@ -278,10 +285,24 @@ ON CONFLICT (table_name) DO UPDATE SET
|
||||
snapshot_type = EXCLUDED.snapshot_type,
|
||||
snapshot_count = EXCLUDED.snapshot_count
|
||||
`, snapshotType, tableName, snapshotTime.Unix(), snapshotCount)
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("unsupported driver for snapshot registry: %s", driver)
|
||||
}
|
||||
if err != nil {
|
||||
slog.Warn("snapshot registry upsert failed",
|
||||
"type", snapshotType,
|
||||
"table", tableName,
|
||||
"duration", time.Since(start),
|
||||
"error", err,
|
||||
)
|
||||
return err
|
||||
}
|
||||
slog.Debug("snapshot registry upsert complete",
|
||||
"type", snapshotType,
|
||||
"table", tableName,
|
||||
"duration", time.Since(start),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func DeleteSnapshotRecord(ctx context.Context, database db.Database, tableName string) error {
|
||||
@@ -524,6 +545,8 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
if err := db.ValidateTableName(tableName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
start := time.Now()
|
||||
logger.Debug("Create table report start", "table", tableName)
|
||||
|
||||
dbConn := Database.DB()
|
||||
if strings.HasPrefix(tableName, "inventory_daily_summary_") || strings.HasPrefix(tableName, "inventory_monthly_summary_") {
|
||||
@@ -533,11 +556,13 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
}
|
||||
columns, err := tableColumns(ctx, dbConn, tableName)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to load report columns", "table", tableName, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(columns) == 0 {
|
||||
return nil, fmt.Errorf("no columns found for table %s", tableName)
|
||||
}
|
||||
logger.Debug("Report columns loaded", "table", tableName, "columns", len(columns))
|
||||
|
||||
isHourlySnapshot := strings.HasPrefix(tableName, "inventory_hourly_")
|
||||
isDailySummary := strings.HasPrefix(tableName, "inventory_daily_summary_")
|
||||
@@ -618,9 +643,11 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
if orderBy != "" {
|
||||
query = fmt.Sprintf(`%s ORDER BY "%s" %s`, query, orderBy, orderDir)
|
||||
}
|
||||
logger.Debug("Report query prepared", "table", tableName, "order_by", orderBy, "order_dir", orderDir, "template_filter", applyTemplateFilter)
|
||||
|
||||
rows, err := dbConn.QueryxContext(ctx, query)
|
||||
if err != nil {
|
||||
logger.Warn("Report query failed", "table", tableName, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
@@ -670,6 +697,7 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
for rows.Next() {
|
||||
values, err := scanRowValues(rows, len(columns))
|
||||
if err != nil {
|
||||
logger.Warn("Report row scan failed", "table", tableName, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
for colIndex, spec := range specs {
|
||||
@@ -692,8 +720,11 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
rowIndex++
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
logger.Warn("Report row iteration failed", "table", tableName, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
rowCount := rowIndex - 2
|
||||
logger.Debug("Report rows populated", "table", tableName, "rows", rowCount)
|
||||
|
||||
if err := xlsx.SetPanes(sheetName, &excelize.Panes{
|
||||
Freeze: true,
|
||||
@@ -718,14 +749,17 @@ func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Co
|
||||
}
|
||||
|
||||
addTotalsChartSheet(logger, Database, ctx, xlsx, tableName)
|
||||
logger.Debug("Report charts complete", "table", tableName)
|
||||
|
||||
if index, err := xlsx.GetSheetIndex(sheetName); err == nil {
|
||||
xlsx.SetActiveSheet(index)
|
||||
}
|
||||
|
||||
if err := xlsx.Write(&buffer); err != nil {
|
||||
logger.Warn("Report write failed", "table", tableName, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
logger.Debug("Create table report complete", "table", tableName, "rows", rowCount, "bytes", buffer.Len(), "duration", time.Since(start))
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
@@ -737,18 +771,26 @@ func SaveTableReport(logger *slog.Logger, Database db.Database, ctx context.Cont
|
||||
if strings.TrimSpace(destDir) == "" {
|
||||
return "", fmt.Errorf("destination directory is empty")
|
||||
}
|
||||
start := time.Now()
|
||||
logger.Debug("Save table report start", "table", tableName, "dest", destDir)
|
||||
if err := os.MkdirAll(destDir, 0o755); err != nil {
|
||||
logger.Warn("Report directory create failed", "table", tableName, "dest", destDir, "error", err)
|
||||
return "", fmt.Errorf("failed to create reports directory: %w", err)
|
||||
}
|
||||
logger.Debug("Report directory ready", "dest", destDir)
|
||||
|
||||
data, err := CreateTableReport(logger, Database, ctx, tableName)
|
||||
if err != nil {
|
||||
logger.Warn("Report render failed", "table", tableName, "error", err)
|
||||
return "", err
|
||||
}
|
||||
logger.Debug("Report rendered", "table", tableName, "bytes", len(data))
|
||||
filename := filepath.Join(destDir, fmt.Sprintf("%s.xlsx", tableName))
|
||||
if err := os.WriteFile(filename, data, 0o644); err != nil {
|
||||
logger.Warn("Report write failed", "table", tableName, "file", filename, "error", err)
|
||||
return "", err
|
||||
}
|
||||
logger.Debug("Save table report complete", "table", tableName, "file", filename, "duration", time.Since(start))
|
||||
return filename, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -158,22 +158,41 @@ func (c *CronTask) aggregateDailySummary(ctx context.Context, targetTime time.Ti
|
||||
if err := db.RefineCreationDeletionFromUnion(ctx, dbConn, summaryTable, unionQuery); err != nil {
|
||||
c.Logger.Warn("failed to refine creation/deletion times", "error", err, "table", summaryTable)
|
||||
}
|
||||
analyzeStart := time.Now()
|
||||
c.Logger.Debug("Analyzing daily summary table", "table", summaryTable)
|
||||
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
||||
c.Logger.Debug("Analyzed daily summary table", "table", summaryTable, "duration", time.Since(analyzeStart))
|
||||
|
||||
rowCountStart := time.Now()
|
||||
c.Logger.Debug("Counting daily summary rows", "table", summaryTable)
|
||||
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
||||
if err != nil {
|
||||
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
||||
}
|
||||
c.Logger.Debug("Counted daily summary rows", "table", summaryTable, "rows", rowCount, "duration", time.Since(rowCountStart))
|
||||
|
||||
registerStart := time.Now()
|
||||
c.Logger.Debug("Registering daily snapshot", "table", summaryTable, "date", dayStart.Format("2006-01-02"), "rows", rowCount)
|
||||
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
||||
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
||||
} else {
|
||||
c.Logger.Debug("Registered daily snapshot", "table", summaryTable, "duration", time.Since(registerStart))
|
||||
}
|
||||
|
||||
reportStart := time.Now()
|
||||
c.Logger.Debug("Generating daily report", "table", summaryTable)
|
||||
if err := c.generateReport(ctx, summaryTable); err != nil {
|
||||
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
||||
metrics.RecordDailyAggregation(time.Since(jobStart), err)
|
||||
return err
|
||||
}
|
||||
c.Logger.Debug("Generated daily report", "table", summaryTable, "duration", time.Since(reportStart))
|
||||
checkpointStart := time.Now()
|
||||
c.Logger.Debug("Checkpointing sqlite after daily aggregation", "table", summaryTable)
|
||||
if err := db.CheckpointSQLite(ctx, dbConn); err != nil {
|
||||
c.Logger.Warn("failed to checkpoint sqlite after daily aggregation", "error", err)
|
||||
} else {
|
||||
c.Logger.Debug("Checkpointed sqlite after daily aggregation", "table", summaryTable, "duration", time.Since(checkpointStart))
|
||||
}
|
||||
|
||||
c.Logger.Debug("Finished daily inventory aggregation", "summary_table", summaryTable)
|
||||
@@ -384,20 +403,39 @@ LIMIT 1
|
||||
c.Logger.Debug("refined creation/deletion times", "table", summaryTable)
|
||||
}
|
||||
|
||||
analyzeStart := time.Now()
|
||||
c.Logger.Debug("Analyzing daily summary table", "table", summaryTable)
|
||||
db.AnalyzeTableIfPostgres(ctx, dbConn, summaryTable)
|
||||
c.Logger.Debug("Analyzed daily summary table", "table", summaryTable, "duration", time.Since(analyzeStart))
|
||||
|
||||
rowCountStart := time.Now()
|
||||
c.Logger.Debug("Counting daily summary rows", "table", summaryTable)
|
||||
rowCount, err := db.TableRowCount(ctx, dbConn, summaryTable)
|
||||
if err != nil {
|
||||
c.Logger.Warn("unable to count daily summary rows", "error", err, "table", summaryTable)
|
||||
}
|
||||
c.Logger.Debug("Counted daily summary rows", "table", summaryTable, "rows", rowCount, "duration", time.Since(rowCountStart))
|
||||
|
||||
registerStart := time.Now()
|
||||
c.Logger.Debug("Registering daily snapshot", "table", summaryTable, "date", dayStart.Format("2006-01-02"), "rows", rowCount)
|
||||
if err := report.RegisterSnapshot(ctx, c.Database, "daily", summaryTable, dayStart, rowCount); err != nil {
|
||||
c.Logger.Warn("failed to register daily snapshot", "error", err, "table", summaryTable)
|
||||
} else {
|
||||
c.Logger.Debug("Registered daily snapshot", "table", summaryTable, "duration", time.Since(registerStart))
|
||||
}
|
||||
reportStart := time.Now()
|
||||
c.Logger.Debug("Generating daily report", "table", summaryTable)
|
||||
if err := c.generateReport(ctx, summaryTable); err != nil {
|
||||
c.Logger.Warn("failed to generate daily report", "error", err, "table", summaryTable)
|
||||
return err
|
||||
}
|
||||
c.Logger.Debug("Generated daily report", "table", summaryTable, "duration", time.Since(reportStart))
|
||||
checkpointStart := time.Now()
|
||||
c.Logger.Debug("Checkpointing sqlite after daily aggregation", "table", summaryTable)
|
||||
if err := db.CheckpointSQLite(ctx, dbConn); err != nil {
|
||||
c.Logger.Warn("failed to checkpoint sqlite after daily aggregation (Go path)", "error", err)
|
||||
} else {
|
||||
c.Logger.Debug("Checkpointed sqlite after daily aggregation", "table", summaryTable, "duration", time.Since(checkpointStart))
|
||||
}
|
||||
|
||||
c.Logger.Debug("Finished daily inventory aggregation (Go path)",
|
||||
|
||||
@@ -629,7 +629,14 @@ func (c *CronTask) reportsDir() string {
|
||||
|
||||
func (c *CronTask) generateReport(ctx context.Context, tableName string) error {
|
||||
dest := c.reportsDir()
|
||||
_, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest)
|
||||
start := time.Now()
|
||||
c.Logger.Debug("Report generation start", "table", tableName, "dest", dest)
|
||||
filename, err := report.SaveTableReport(c.Logger, c.Database, ctx, tableName, dest)
|
||||
if err == nil {
|
||||
c.Logger.Debug("Report generation complete", "table", tableName, "file", filename, "duration", time.Since(start))
|
||||
} else {
|
||||
c.Logger.Debug("Report generation failed", "table", tableName, "duration", time.Since(start), "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user