From eecf227cd8afbb431841ca1f9185ac0cca5a126b Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Fri, 13 Sep 2024 12:58:22 +1000 Subject: [PATCH] change endpoint to store cloudevent in events table --- db/migrations/20240912012927_init.sql | 4 +- db/migrations/20240913021038_events.sql | 21 ++++ db/queries/models.go | 18 ++- db/queries/query.sql | 24 +++- db/queries/query.sql.go | 142 ++++++++++++++++++++++++ internal/tasks/processEvents.go | 3 + internal/vcenter/vcenter.go | 2 +- server/handler/vmCreate.go | 109 ++++++++++-------- 8 files changed, 270 insertions(+), 53 deletions(-) create mode 100644 db/migrations/20240913021038_events.sql create mode 100644 internal/tasks/processEvents.go diff --git a/db/migrations/20240912012927_init.sql b/db/migrations/20240912012927_init.sql index acf5cf8..e391e1f 100644 --- a/db/migrations/20240912012927_init.sql +++ b/db/migrations/20240912012927_init.sql @@ -1,7 +1,7 @@ -- +goose Up -- +goose StatementBegin CREATE TABLE IF NOT EXISTS "Inventory" ( - "Iid" INTEGER UNIQUE, + "Iid" INTEGER PRIMARY KEY AUTOINCREMENT, "Name" TEXT NOT NULL, "Vcenter" TEXT NOT NULL, "VmId" TEXT, @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS "Inventory" ( ); CREATE TABLE IF NOT EXISTS "Updates" ( - "Uid" INTEGER UNIQUE, + "Uid" INTEGER PRIMARY KEY AUTOINCREMENT, "InventoryId" INTEGER, "UpdateTime" INTEGER, "UpdateType" TEXT NOT NULL, diff --git a/db/migrations/20240913021038_events.sql b/db/migrations/20240913021038_events.sql new file mode 100644 index 0000000..6a16364 --- /dev/null +++ b/db/migrations/20240913021038_events.sql @@ -0,0 +1,21 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS "Events" ( + "Eid" INTEGER PRIMARY KEY AUTOINCREMENT, + "CloudId" TEXT NOT NULL, + "Source" TEXT NOT NULL, + "EventTime" INTEGER, + "ChainId" TEXT NOT NULL, + "VmId" TEXT, + "EventKey" TEXT, + "Datacenter" TEXT, + "ComputeResource" TEXT, + "UserName" TEXT, + "Processed" INTEGER NOT NULL DEFAULT 0 +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE "Events"; +-- +goose StatementEnd \ No newline at end of file diff --git a/db/queries/models.go b/db/queries/models.go index 35b29ae..a6792a6 100644 --- a/db/queries/models.go +++ b/db/queries/models.go @@ -8,8 +8,22 @@ import ( "database/sql" ) +type Events struct { + Eid int64 + CloudId string + Source string + EventTime sql.NullInt64 + ChainId string + VmId sql.NullString + EventKey sql.NullString + Datacenter sql.NullString + ComputeResource sql.NullString + UserName sql.NullString + Processed int64 +} + type Inventory struct { - Iid sql.NullInt64 + Iid int64 Name string Vcenter string VmId sql.NullString @@ -29,7 +43,7 @@ type Inventory struct { } type Updates struct { - Uid sql.NullInt64 + Uid int64 InventoryId sql.NullInt64 UpdateTime sql.NullInt64 UpdateType string diff --git a/db/queries/query.sql b/db/queries/query.sql index 10949f2..7f23c4f 100644 --- a/db/queries/query.sql +++ b/db/queries/query.sql @@ -28,4 +28,26 @@ INSERT INTO "Updates" ( ) VALUES( ?, ?, ?, ?, ?, ?, ?, ? ) -RETURNING *; \ No newline at end of file +RETURNING *; + +-- name: CreateEvent :one +INSERT INTO "Events" ( + "CloudId", "Source", "EventTime", "ChainId", "VmId", "EventKey", "Datacenter", "ComputeResource", "UserName" +) VALUES( + ?, ?, ?, ?, ?, ?, ?, ?, ? +) +RETURNING *; + +-- name: ListEvents :many +SELECT * FROM "Events" +ORDER BY "EventTime"; + +-- name: ListUnprocessedEvents :many +SELECT * FROM "Events" +WHERE "Processed" = 0 +ORDER BY "EventTime"; + +-- name: UpdateEventsProcessed :exec +UPDATE "Events" +SET "Processed" = 1 +WHERE "Eid" = sqlc.arg('eid'); \ No newline at end of file diff --git a/db/queries/query.sql.go b/db/queries/query.sql.go index d09f797..186500c 100644 --- a/db/queries/query.sql.go +++ b/db/queries/query.sql.go @@ -10,6 +10,56 @@ import ( "database/sql" ) +const createEvent = `-- name: CreateEvent :one +INSERT INTO "Events" ( + "CloudId", "Source", "EventTime", "ChainId", "VmId", "EventKey", "Datacenter", "ComputeResource", "UserName" +) VALUES( + ?, ?, ?, ?, ?, ?, ?, ?, ? +) +RETURNING Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed +` + +type CreateEventParams struct { + CloudId string + Source string + EventTime sql.NullInt64 + ChainId string + VmId sql.NullString + EventKey sql.NullString + Datacenter sql.NullString + ComputeResource sql.NullString + UserName sql.NullString +} + +func (q *Queries) CreateEvent(ctx context.Context, arg CreateEventParams) (Events, error) { + row := q.db.QueryRowContext(ctx, createEvent, + arg.CloudId, + arg.Source, + arg.EventTime, + arg.ChainId, + arg.VmId, + arg.EventKey, + arg.Datacenter, + arg.ComputeResource, + arg.UserName, + ) + var i Events + err := row.Scan( + &i.Eid, + &i.CloudId, + &i.Source, + &i.EventTime, + &i.ChainId, + &i.VmId, + &i.EventKey, + &i.Datacenter, + &i.ComputeResource, + &i.UserName, + &i.Processed, + ) + return i, err +} + const createInventory = `-- name: CreateInventory :one INSERT INTO "Inventory" ( "Name", "Vcenter", "VmId", "EventKey", "EventId", "CreationTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "SrmPlaceholder" @@ -230,6 +280,46 @@ func (q *Queries) GetInventoryVmId(ctx context.Context, vmid sql.NullString) (In return i, err } +const listEvents = `-- name: ListEvents :many +SELECT Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed FROM "Events" +ORDER BY "EventTime" +` + +func (q *Queries) ListEvents(ctx context.Context) ([]Events, error) { + rows, err := q.db.QueryContext(ctx, listEvents) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Events + for rows.Next() { + var i Events + if err := rows.Scan( + &i.Eid, + &i.CloudId, + &i.Source, + &i.EventTime, + &i.ChainId, + &i.VmId, + &i.EventKey, + &i.Datacenter, + &i.ComputeResource, + &i.UserName, + &i.Processed, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listInventory = `-- name: ListInventory :many SELECT Iid, Name, Vcenter, VmId, EventKey, EventId, CreationTime, DeletionTime, ResourcePool, VmType, Datacenter, Cluster, Folder, ProvisionedDisk, InitialVcpus, InitialRam, SrmPlaceholder FROM "Inventory" ORDER BY "Name" @@ -275,3 +365,55 @@ func (q *Queries) ListInventory(ctx context.Context) ([]Inventory, error) { } return items, nil } + +const listUnprocessedEvents = `-- name: ListUnprocessedEvents :many +SELECT Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed FROM "Events" +WHERE "Processed" = 0 +ORDER BY "EventTime" +` + +func (q *Queries) ListUnprocessedEvents(ctx context.Context) ([]Events, error) { + rows, err := q.db.QueryContext(ctx, listUnprocessedEvents) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Events + for rows.Next() { + var i Events + if err := rows.Scan( + &i.Eid, + &i.CloudId, + &i.Source, + &i.EventTime, + &i.ChainId, + &i.VmId, + &i.EventKey, + &i.Datacenter, + &i.ComputeResource, + &i.UserName, + &i.Processed, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateEventsProcessed = `-- name: UpdateEventsProcessed :exec +UPDATE "Events" +SET "Processed" = 1 +WHERE "Eid" = ?1 +` + +func (q *Queries) UpdateEventsProcessed(ctx context.Context, eid int64) error { + _, err := q.db.ExecContext(ctx, updateEventsProcessed, eid) + return err +} diff --git a/internal/tasks/processEvents.go b/internal/tasks/processEvents.go new file mode 100644 index 0000000..5cfe120 --- /dev/null +++ b/internal/tasks/processEvents.go @@ -0,0 +1,3 @@ +package tasks + +// use gocron to check events in the Events table diff --git a/internal/vcenter/vcenter.go b/internal/vcenter/vcenter.go index d5ef3cf..5a045b9 100644 --- a/internal/vcenter/vcenter.go +++ b/internal/vcenter/vcenter.go @@ -220,7 +220,7 @@ func (v *Vcenter) FindVMByIDWithDatacenter(vmID string, dcID string) (*VmPropert } else if _, ok := err.(*find.NotFoundError); !ok { // If the error is not a NotFoundError, return it //return nil, fmt.Errorf("failed to retrieve VM with ID %s in datacenter %s: %w", vmID, dc.Name(), err) - v.Logger.Debug("Couldn't find vm in datacenter", "vm_id", vmID, "datacenter_name", datacenter.Name()) + v.Logger.Debug("Couldn't find vm in datacenter", "vm_id", vmID, "datacenter_name", dcName) } else if err != nil { return nil, fmt.Errorf("failed to retrieve VM: %w", err) } diff --git a/server/handler/vmCreate.go b/server/handler/vmCreate.go index e49d020..e642f9f 100644 --- a/server/handler/vmCreate.go +++ b/server/handler/vmCreate.go @@ -11,7 +11,6 @@ import ( "strconv" "time" queries "vctp/db/queries" - "vctp/internal/vcenter" models "vctp/server/models" ) @@ -35,14 +34,14 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) { } // Decode the JSON body into vmModel struct - var vm models.CloudEventReceived - if err := json.Unmarshal(reqBody, &vm); err != nil { + var event models.CloudEventReceived + if err := json.Unmarshal(reqBody, &event); err != nil { h.Logger.Error("unable to decode json", "error", err) http.Error(w, "Invalid JSON body", http.StatusBadRequest) return } else { h.Logger.Debug("successfully decoded JSON") - prettyPrint(vm) + prettyPrint(event) } // Convert vmModel to CreateInventoryParams using the utility function @@ -50,7 +49,7 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) { //db.ConvertToSQLParams(&vm, ¶ms) // Parse the datetime string to a time.Time object - eventTime, err := time.Parse(time.RFC3339, vm.CloudEvent.Time) + eventTime, err := time.Parse(time.RFC3339, event.CloudEvent.Time) if err != nil { h.Logger.Warn("unable to convert cloud event time to timestamp", "error", err) unixTimestamp = time.Now().Unix() @@ -59,58 +58,74 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) { unixTimestamp = eventTime.Unix() } - // TODO - initiate govmomi query of source vcenter to discover related data - h.Logger.Debug("connecting to vcenter") - vc := vcenter.New(h.Logger) - vc.Login(vm.CloudEvent.Source) - //vmObject, err := vc.FindVMByName(vm.CloudEvent.Data.VM.Name) - //vmObject, err := vc.FindVMByID(vm.CloudEvent.Data.VM.VM.Value) - vmObject, err := vc.FindVMByIDWithDatacenter(vm.CloudEvent.Data.VM.VM.Value, vm.CloudEvent.Data.Datacenter.Datacenter.Value) + /* + // TODO - initiate govmomi query of source vcenter to discover related data + h.Logger.Debug("connecting to vcenter") + vc := vcenter.New(h.Logger) + vc.Login(event.CloudEvent.Source) + //vmObject, err := vc.FindVMByName(vm.CloudEvent.Data.VM.Name) + //vmObject, err := vc.FindVMByID(vm.CloudEvent.Data.VM.VM.Value) + vmObject, err := vc.FindVMByIDWithDatacenter(event.CloudEvent.Data.VM.VM.Value, event.CloudEvent.Data.Datacenter.Datacenter.Value) - if err != nil { - h.Logger.Error("Can't locate vm in vCenter", "vmID", vm.CloudEvent.Data.VM.VM.Value, "error", err) - } else if vmObject == nil { - h.Logger.Debug("didn't find VM", "vm_id", vm.CloudEvent.Data.VM.VM.Value) - numRam = 0 - numVcpus = 0 - datacenter = vm.CloudEvent.Data.Datacenter.Name - } else { - h.Logger.Debug("found VM") - //prettyPrint(vmObject) - - // calculate VM properties we want to store - if vmObject.Vm.Config != nil { - numRam = vmObject.Vm.Config.Hardware.MemoryMB - numVcpus = vmObject.Vm.Config.Hardware.NumCPU * vmObject.Vm.Config.Hardware.NumCoresPerSocket + if err != nil { + h.Logger.Error("Can't locate vm in vCenter", "vmID", event.CloudEvent.Data.VM.VM.Value, "error", err) + } else if vmObject == nil { + h.Logger.Debug("didn't find VM", "vm_id", event.CloudEvent.Data.VM.VM.Value) + numRam = 0 + numVcpus = 0 + datacenter = event.CloudEvent.Data.Datacenter.Name } else { - h.Logger.Error("Empty VM config") - } + h.Logger.Debug("found VM") + //prettyPrint(vmObject) - } - err = vc.Logout() - if err != nil { - h.Logger.Error("unable to logout of vcenter", "error", err) - } + // calculate VM properties we want to store + if vmObject.Vm.Config != nil { + numRam = vmObject.Vm.Config.Hardware.MemoryMB + numVcpus = vmObject.Vm.Config.Hardware.NumCPU * vmObject.Vm.Config.Hardware.NumCoresPerSocket + } else { + h.Logger.Error("Empty VM config") + } + + } + err = vc.Logout() + if err != nil { + h.Logger.Error("unable to logout of vcenter", "error", err) + } + */ // Create an instance of CreateInventoryParams h.Logger.Debug("Creating database parameters") - params := queries.CreateInventoryParams{ - Name: vm.CloudEvent.Data.VM.Name, - Vcenter: vm.CloudEvent.Source, - EventId: sql.NullString{String: vm.CloudEvent.ID, Valid: vm.CloudEvent.ID != ""}, - EventKey: sql.NullString{String: strconv.Itoa(vm.CloudEvent.Data.Key), Valid: strconv.Itoa(vm.CloudEvent.Data.Key) != ""}, - VmId: sql.NullString{String: vm.CloudEvent.Data.VM.VM.Value, Valid: vm.CloudEvent.Data.VM.VM.Value != ""}, - Datacenter: sql.NullString{String: datacenter, Valid: datacenter != ""}, - Cluster: sql.NullString{String: vm.CloudEvent.Data.ComputeResource.Name, Valid: vm.CloudEvent.Data.ComputeResource.Name != ""}, - CreationTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0}, - InitialVcpus: sql.NullInt64{Int64: int64(numVcpus), Valid: numVcpus > 0}, - InitialRam: sql.NullInt64{Int64: int64(numRam), Valid: numRam > 0}, + /* + params := queries.CreateInventoryParams{ + Name: event.CloudEvent.Data.VM.Name, + Vcenter: event.CloudEvent.Source, + EventId: sql.NullString{String: event.CloudEvent.ID, Valid: event.CloudEvent.ID != ""}, + EventKey: sql.NullString{String: strconv.Itoa(event.CloudEvent.Data.Key), Valid: strconv.Itoa(event.CloudEvent.Data.Key) != ""}, + VmId: sql.NullString{String: event.CloudEvent.Data.VM.VM.Value, Valid: event.CloudEvent.Data.VM.VM.Value != ""}, + Datacenter: sql.NullString{String: datacenter, Valid: datacenter != ""}, + Cluster: sql.NullString{String: event.CloudEvent.Data.ComputeResource.Name, Valid: event.CloudEvent.Data.ComputeResource.Name != ""}, + CreationTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0}, + InitialVcpus: sql.NullInt64{Int64: int64(numVcpus), Valid: numVcpus > 0}, + InitialRam: sql.NullInt64{Int64: int64(numRam), Valid: numRam > 0}, + } + */ + + params2 := queries.CreateEventParams{ + Source: event.CloudEvent.Source, + CloudId: event.CloudEvent.ID, + EventTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0}, + ChainId: strconv.Itoa(event.CloudEvent.Data.ChainID), + VmId: sql.NullString{String: event.CloudEvent.Data.VM.VM.Value, Valid: event.CloudEvent.Data.VM.VM.Value != ""}, + EventKey: sql.NullString{String: strconv.Itoa(event.CloudEvent.Data.Key), Valid: strconv.Itoa(event.CloudEvent.Data.Key) != ""}, + Datacenter: sql.NullString{String: event.CloudEvent.Data.Datacenter.Name, Valid: event.CloudEvent.Data.Datacenter.Name != ""}, + ComputeResource: sql.NullString{String: event.CloudEvent.Data.ComputeResource.Name, Valid: event.CloudEvent.Data.ComputeResource.Name != ""}, + UserName: sql.NullString{String: event.CloudEvent.Data.UserName, Valid: event.CloudEvent.Data.UserName != ""}, } - h.Logger.Debug("database params", "params", params) + h.Logger.Debug("database params", "params", params2) // Insert the new inventory record into the database - result, err := h.Database.Queries().CreateInventory(context.Background(), params) + result, err := h.Database.Queries().CreateEvent(context.Background(), params2) if err != nil { h.Logger.Error("unable to perform database insert", "error", err) w.WriteHeader(http.StatusInternalServerError)