This commit is contained in:
@@ -14,6 +14,12 @@ import (
|
||||
"github.com/xuri/excelize/v2"
|
||||
)
|
||||
|
||||
type SnapshotRecord struct {
|
||||
TableName string
|
||||
SnapshotTime time.Time
|
||||
SnapshotType string
|
||||
}
|
||||
|
||||
func ListTablesByPrefix(ctx context.Context, database db.Database, prefix string) ([]string, error) {
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
@@ -59,26 +65,139 @@ ORDER BY tablename DESC
|
||||
return tables, rows.Err()
|
||||
}
|
||||
|
||||
func FormatSnapshotLabel(prefix string, tableName string) (string, bool) {
|
||||
if !strings.HasPrefix(tableName, prefix) {
|
||||
return "", false
|
||||
func EnsureSnapshotRegistry(ctx context.Context, database db.Database) error {
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS snapshot_registry (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
snapshot_type TEXT NOT NULL,
|
||||
table_name TEXT NOT NULL UNIQUE,
|
||||
snapshot_time BIGINT NOT NULL
|
||||
)
|
||||
`)
|
||||
return err
|
||||
case "pgx", "postgres":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS snapshot_registry (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
snapshot_type TEXT NOT NULL,
|
||||
table_name TEXT NOT NULL UNIQUE,
|
||||
snapshot_time BIGINT NOT NULL
|
||||
)
|
||||
`)
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("unsupported driver for snapshot registry: %s", driver)
|
||||
}
|
||||
suffix := strings.TrimPrefix(tableName, prefix)
|
||||
switch prefix {
|
||||
case "inventory_daily_":
|
||||
if t, err := time.Parse("20060102", suffix); err == nil {
|
||||
return t.Format("2006-01-02"), true
|
||||
}
|
||||
case "inventory_daily_summary_":
|
||||
if t, err := time.Parse("20060102", suffix); err == nil {
|
||||
return t.Format("2006-01-02"), true
|
||||
}
|
||||
case "inventory_monthly_summary_":
|
||||
if t, err := time.Parse("200601", suffix); err == nil {
|
||||
return t.Format("2006-01"), true
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterSnapshot(ctx context.Context, database db.Database, snapshotType string, tableName string, snapshotTime time.Time) error {
|
||||
if snapshotType == "" || tableName == "" {
|
||||
return fmt.Errorf("snapshot type or table name is empty")
|
||||
}
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
INSERT OR IGNORE INTO snapshot_registry (snapshot_type, table_name, snapshot_time)
|
||||
VALUES (?, ?, ?)
|
||||
`, snapshotType, tableName, snapshotTime.Unix())
|
||||
return err
|
||||
case "pgx", "postgres":
|
||||
_, err := dbConn.ExecContext(ctx, `
|
||||
INSERT INTO snapshot_registry (snapshot_type, table_name, snapshot_time)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (table_name) DO NOTHING
|
||||
`, snapshotType, tableName, snapshotTime.Unix())
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("unsupported driver for snapshot registry: %s", driver)
|
||||
}
|
||||
}
|
||||
|
||||
func DeleteSnapshotRecord(ctx context.Context, database db.Database, tableName string) error {
|
||||
if tableName == "" {
|
||||
return nil
|
||||
}
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
_, err := dbConn.ExecContext(ctx, `DELETE FROM snapshot_registry WHERE table_name = ?`, tableName)
|
||||
return err
|
||||
case "pgx", "postgres":
|
||||
_, err := dbConn.ExecContext(ctx, `DELETE FROM snapshot_registry WHERE table_name = $1`, tableName)
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("unsupported driver for snapshot registry: %s", driver)
|
||||
}
|
||||
}
|
||||
|
||||
func ListSnapshots(ctx context.Context, database db.Database, snapshotType string) ([]SnapshotRecord, error) {
|
||||
dbConn := database.DB()
|
||||
driver := strings.ToLower(dbConn.DriverName())
|
||||
|
||||
var rows *sqlx.Rows
|
||||
var err error
|
||||
|
||||
switch driver {
|
||||
case "sqlite":
|
||||
rows, err = dbConn.QueryxContext(ctx, `
|
||||
SELECT table_name, snapshot_time, snapshot_type
|
||||
FROM snapshot_registry
|
||||
WHERE snapshot_type = ?
|
||||
ORDER BY snapshot_time DESC, table_name DESC
|
||||
`, snapshotType)
|
||||
case "pgx", "postgres":
|
||||
rows, err = dbConn.QueryxContext(ctx, `
|
||||
SELECT table_name, snapshot_time, snapshot_type
|
||||
FROM snapshot_registry
|
||||
WHERE snapshot_type = $1
|
||||
ORDER BY snapshot_time DESC, table_name DESC
|
||||
`, snapshotType)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported driver for listing snapshots: %s", driver)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
records := make([]SnapshotRecord, 0)
|
||||
for rows.Next() {
|
||||
var (
|
||||
tableName string
|
||||
snapshotTime int64
|
||||
recordType string
|
||||
)
|
||||
if err := rows.Scan(&tableName, &snapshotTime, &recordType); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
records = append(records, SnapshotRecord{
|
||||
TableName: tableName,
|
||||
SnapshotTime: time.Unix(snapshotTime, 0),
|
||||
SnapshotType: recordType,
|
||||
})
|
||||
}
|
||||
return records, rows.Err()
|
||||
}
|
||||
|
||||
func FormatSnapshotLabel(snapshotType string, snapshotTime time.Time, tableName string) string {
|
||||
switch snapshotType {
|
||||
case "hourly":
|
||||
return snapshotTime.Format("2006-01-02 15:00")
|
||||
case "daily":
|
||||
return snapshotTime.Format("2006-01-02")
|
||||
case "monthly":
|
||||
return snapshotTime.Format("2006-01")
|
||||
default:
|
||||
return tableName
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func CreateTableReport(logger *slog.Logger, Database db.Database, ctx context.Context, tableName string) ([]byte, error) {
|
||||
|
||||
Reference in New Issue
Block a user