change endpoint to store cloudevent in events table
Some checks failed
CI / Lint (push) Waiting to run
CI / Test (push) Waiting to run
CI / End-to-End (push) Waiting to run
CI / Publish Docker (push) Blocked by required conditions
continuous-integration/drone/push Build is failing

This commit is contained in:
2024-09-13 12:58:22 +10:00
parent 446cd30d8e
commit eecf227cd8
8 changed files with 270 additions and 53 deletions

View File

@@ -1,7 +1,7 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS "Inventory" (
"Iid" INTEGER UNIQUE,
"Iid" INTEGER PRIMARY KEY AUTOINCREMENT,
"Name" TEXT NOT NULL,
"Vcenter" TEXT NOT NULL,
"VmId" TEXT,
@@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS "Inventory" (
);
CREATE TABLE IF NOT EXISTS "Updates" (
"Uid" INTEGER UNIQUE,
"Uid" INTEGER PRIMARY KEY AUTOINCREMENT,
"InventoryId" INTEGER,
"UpdateTime" INTEGER,
"UpdateType" TEXT NOT NULL,

View File

@@ -0,0 +1,21 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS "Events" (
"Eid" INTEGER PRIMARY KEY AUTOINCREMENT,
"CloudId" TEXT NOT NULL,
"Source" TEXT NOT NULL,
"EventTime" INTEGER,
"ChainId" TEXT NOT NULL,
"VmId" TEXT,
"EventKey" TEXT,
"Datacenter" TEXT,
"ComputeResource" TEXT,
"UserName" TEXT,
"Processed" INTEGER NOT NULL DEFAULT 0
);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP TABLE "Events";
-- +goose StatementEnd

View File

@@ -8,8 +8,22 @@ import (
"database/sql"
)
type Events struct {
Eid int64
CloudId string
Source string
EventTime sql.NullInt64
ChainId string
VmId sql.NullString
EventKey sql.NullString
Datacenter sql.NullString
ComputeResource sql.NullString
UserName sql.NullString
Processed int64
}
type Inventory struct {
Iid sql.NullInt64
Iid int64
Name string
Vcenter string
VmId sql.NullString
@@ -29,7 +43,7 @@ type Inventory struct {
}
type Updates struct {
Uid sql.NullInt64
Uid int64
InventoryId sql.NullInt64
UpdateTime sql.NullInt64
UpdateType string

View File

@@ -28,4 +28,26 @@ INSERT INTO "Updates" (
) VALUES(
?, ?, ?, ?, ?, ?, ?, ?
)
RETURNING *;
RETURNING *;
-- name: CreateEvent :one
INSERT INTO "Events" (
"CloudId", "Source", "EventTime", "ChainId", "VmId", "EventKey", "Datacenter", "ComputeResource", "UserName"
) VALUES(
?, ?, ?, ?, ?, ?, ?, ?, ?
)
RETURNING *;
-- name: ListEvents :many
SELECT * FROM "Events"
ORDER BY "EventTime";
-- name: ListUnprocessedEvents :many
SELECT * FROM "Events"
WHERE "Processed" = 0
ORDER BY "EventTime";
-- name: UpdateEventsProcessed :exec
UPDATE "Events"
SET "Processed" = 1
WHERE "Eid" = sqlc.arg('eid');

View File

@@ -10,6 +10,56 @@ import (
"database/sql"
)
const createEvent = `-- name: CreateEvent :one
INSERT INTO "Events" (
"CloudId", "Source", "EventTime", "ChainId", "VmId", "EventKey", "Datacenter", "ComputeResource", "UserName"
) VALUES(
?, ?, ?, ?, ?, ?, ?, ?, ?
)
RETURNING Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed
`
type CreateEventParams struct {
CloudId string
Source string
EventTime sql.NullInt64
ChainId string
VmId sql.NullString
EventKey sql.NullString
Datacenter sql.NullString
ComputeResource sql.NullString
UserName sql.NullString
}
func (q *Queries) CreateEvent(ctx context.Context, arg CreateEventParams) (Events, error) {
row := q.db.QueryRowContext(ctx, createEvent,
arg.CloudId,
arg.Source,
arg.EventTime,
arg.ChainId,
arg.VmId,
arg.EventKey,
arg.Datacenter,
arg.ComputeResource,
arg.UserName,
)
var i Events
err := row.Scan(
&i.Eid,
&i.CloudId,
&i.Source,
&i.EventTime,
&i.ChainId,
&i.VmId,
&i.EventKey,
&i.Datacenter,
&i.ComputeResource,
&i.UserName,
&i.Processed,
)
return i, err
}
const createInventory = `-- name: CreateInventory :one
INSERT INTO "Inventory" (
"Name", "Vcenter", "VmId", "EventKey", "EventId", "CreationTime", "ResourcePool", "VmType", "Datacenter", "Cluster", "Folder", "ProvisionedDisk", "InitialVcpus", "InitialRam", "SrmPlaceholder"
@@ -230,6 +280,46 @@ func (q *Queries) GetInventoryVmId(ctx context.Context, vmid sql.NullString) (In
return i, err
}
const listEvents = `-- name: ListEvents :many
SELECT Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed FROM "Events"
ORDER BY "EventTime"
`
func (q *Queries) ListEvents(ctx context.Context) ([]Events, error) {
rows, err := q.db.QueryContext(ctx, listEvents)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Events
for rows.Next() {
var i Events
if err := rows.Scan(
&i.Eid,
&i.CloudId,
&i.Source,
&i.EventTime,
&i.ChainId,
&i.VmId,
&i.EventKey,
&i.Datacenter,
&i.ComputeResource,
&i.UserName,
&i.Processed,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listInventory = `-- name: ListInventory :many
SELECT Iid, Name, Vcenter, VmId, EventKey, EventId, CreationTime, DeletionTime, ResourcePool, VmType, Datacenter, Cluster, Folder, ProvisionedDisk, InitialVcpus, InitialRam, SrmPlaceholder FROM "Inventory"
ORDER BY "Name"
@@ -275,3 +365,55 @@ func (q *Queries) ListInventory(ctx context.Context) ([]Inventory, error) {
}
return items, nil
}
const listUnprocessedEvents = `-- name: ListUnprocessedEvents :many
SELECT Eid, CloudId, Source, EventTime, ChainId, VmId, EventKey, Datacenter, ComputeResource, UserName, Processed FROM "Events"
WHERE "Processed" = 0
ORDER BY "EventTime"
`
func (q *Queries) ListUnprocessedEvents(ctx context.Context) ([]Events, error) {
rows, err := q.db.QueryContext(ctx, listUnprocessedEvents)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Events
for rows.Next() {
var i Events
if err := rows.Scan(
&i.Eid,
&i.CloudId,
&i.Source,
&i.EventTime,
&i.ChainId,
&i.VmId,
&i.EventKey,
&i.Datacenter,
&i.ComputeResource,
&i.UserName,
&i.Processed,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateEventsProcessed = `-- name: UpdateEventsProcessed :exec
UPDATE "Events"
SET "Processed" = 1
WHERE "Eid" = ?1
`
func (q *Queries) UpdateEventsProcessed(ctx context.Context, eid int64) error {
_, err := q.db.ExecContext(ctx, updateEventsProcessed, eid)
return err
}

View File

@@ -0,0 +1,3 @@
package tasks
// use gocron to check events in the Events table

View File

@@ -220,7 +220,7 @@ func (v *Vcenter) FindVMByIDWithDatacenter(vmID string, dcID string) (*VmPropert
} else if _, ok := err.(*find.NotFoundError); !ok {
// If the error is not a NotFoundError, return it
//return nil, fmt.Errorf("failed to retrieve VM with ID %s in datacenter %s: %w", vmID, dc.Name(), err)
v.Logger.Debug("Couldn't find vm in datacenter", "vm_id", vmID, "datacenter_name", datacenter.Name())
v.Logger.Debug("Couldn't find vm in datacenter", "vm_id", vmID, "datacenter_name", dcName)
} else if err != nil {
return nil, fmt.Errorf("failed to retrieve VM: %w", err)
}

View File

@@ -11,7 +11,6 @@ import (
"strconv"
"time"
queries "vctp/db/queries"
"vctp/internal/vcenter"
models "vctp/server/models"
)
@@ -35,14 +34,14 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) {
}
// Decode the JSON body into vmModel struct
var vm models.CloudEventReceived
if err := json.Unmarshal(reqBody, &vm); err != nil {
var event models.CloudEventReceived
if err := json.Unmarshal(reqBody, &event); err != nil {
h.Logger.Error("unable to decode json", "error", err)
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
return
} else {
h.Logger.Debug("successfully decoded JSON")
prettyPrint(vm)
prettyPrint(event)
}
// Convert vmModel to CreateInventoryParams using the utility function
@@ -50,7 +49,7 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) {
//db.ConvertToSQLParams(&vm, &params)
// Parse the datetime string to a time.Time object
eventTime, err := time.Parse(time.RFC3339, vm.CloudEvent.Time)
eventTime, err := time.Parse(time.RFC3339, event.CloudEvent.Time)
if err != nil {
h.Logger.Warn("unable to convert cloud event time to timestamp", "error", err)
unixTimestamp = time.Now().Unix()
@@ -59,58 +58,74 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) {
unixTimestamp = eventTime.Unix()
}
// TODO - initiate govmomi query of source vcenter to discover related data
h.Logger.Debug("connecting to vcenter")
vc := vcenter.New(h.Logger)
vc.Login(vm.CloudEvent.Source)
//vmObject, err := vc.FindVMByName(vm.CloudEvent.Data.VM.Name)
//vmObject, err := vc.FindVMByID(vm.CloudEvent.Data.VM.VM.Value)
vmObject, err := vc.FindVMByIDWithDatacenter(vm.CloudEvent.Data.VM.VM.Value, vm.CloudEvent.Data.Datacenter.Datacenter.Value)
/*
// TODO - initiate govmomi query of source vcenter to discover related data
h.Logger.Debug("connecting to vcenter")
vc := vcenter.New(h.Logger)
vc.Login(event.CloudEvent.Source)
//vmObject, err := vc.FindVMByName(vm.CloudEvent.Data.VM.Name)
//vmObject, err := vc.FindVMByID(vm.CloudEvent.Data.VM.VM.Value)
vmObject, err := vc.FindVMByIDWithDatacenter(event.CloudEvent.Data.VM.VM.Value, event.CloudEvent.Data.Datacenter.Datacenter.Value)
if err != nil {
h.Logger.Error("Can't locate vm in vCenter", "vmID", vm.CloudEvent.Data.VM.VM.Value, "error", err)
} else if vmObject == nil {
h.Logger.Debug("didn't find VM", "vm_id", vm.CloudEvent.Data.VM.VM.Value)
numRam = 0
numVcpus = 0
datacenter = vm.CloudEvent.Data.Datacenter.Name
} else {
h.Logger.Debug("found VM")
//prettyPrint(vmObject)
// calculate VM properties we want to store
if vmObject.Vm.Config != nil {
numRam = vmObject.Vm.Config.Hardware.MemoryMB
numVcpus = vmObject.Vm.Config.Hardware.NumCPU * vmObject.Vm.Config.Hardware.NumCoresPerSocket
if err != nil {
h.Logger.Error("Can't locate vm in vCenter", "vmID", event.CloudEvent.Data.VM.VM.Value, "error", err)
} else if vmObject == nil {
h.Logger.Debug("didn't find VM", "vm_id", event.CloudEvent.Data.VM.VM.Value)
numRam = 0
numVcpus = 0
datacenter = event.CloudEvent.Data.Datacenter.Name
} else {
h.Logger.Error("Empty VM config")
}
h.Logger.Debug("found VM")
//prettyPrint(vmObject)
}
err = vc.Logout()
if err != nil {
h.Logger.Error("unable to logout of vcenter", "error", err)
}
// calculate VM properties we want to store
if vmObject.Vm.Config != nil {
numRam = vmObject.Vm.Config.Hardware.MemoryMB
numVcpus = vmObject.Vm.Config.Hardware.NumCPU * vmObject.Vm.Config.Hardware.NumCoresPerSocket
} else {
h.Logger.Error("Empty VM config")
}
}
err = vc.Logout()
if err != nil {
h.Logger.Error("unable to logout of vcenter", "error", err)
}
*/
// Create an instance of CreateInventoryParams
h.Logger.Debug("Creating database parameters")
params := queries.CreateInventoryParams{
Name: vm.CloudEvent.Data.VM.Name,
Vcenter: vm.CloudEvent.Source,
EventId: sql.NullString{String: vm.CloudEvent.ID, Valid: vm.CloudEvent.ID != ""},
EventKey: sql.NullString{String: strconv.Itoa(vm.CloudEvent.Data.Key), Valid: strconv.Itoa(vm.CloudEvent.Data.Key) != ""},
VmId: sql.NullString{String: vm.CloudEvent.Data.VM.VM.Value, Valid: vm.CloudEvent.Data.VM.VM.Value != ""},
Datacenter: sql.NullString{String: datacenter, Valid: datacenter != ""},
Cluster: sql.NullString{String: vm.CloudEvent.Data.ComputeResource.Name, Valid: vm.CloudEvent.Data.ComputeResource.Name != ""},
CreationTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0},
InitialVcpus: sql.NullInt64{Int64: int64(numVcpus), Valid: numVcpus > 0},
InitialRam: sql.NullInt64{Int64: int64(numRam), Valid: numRam > 0},
/*
params := queries.CreateInventoryParams{
Name: event.CloudEvent.Data.VM.Name,
Vcenter: event.CloudEvent.Source,
EventId: sql.NullString{String: event.CloudEvent.ID, Valid: event.CloudEvent.ID != ""},
EventKey: sql.NullString{String: strconv.Itoa(event.CloudEvent.Data.Key), Valid: strconv.Itoa(event.CloudEvent.Data.Key) != ""},
VmId: sql.NullString{String: event.CloudEvent.Data.VM.VM.Value, Valid: event.CloudEvent.Data.VM.VM.Value != ""},
Datacenter: sql.NullString{String: datacenter, Valid: datacenter != ""},
Cluster: sql.NullString{String: event.CloudEvent.Data.ComputeResource.Name, Valid: event.CloudEvent.Data.ComputeResource.Name != ""},
CreationTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0},
InitialVcpus: sql.NullInt64{Int64: int64(numVcpus), Valid: numVcpus > 0},
InitialRam: sql.NullInt64{Int64: int64(numRam), Valid: numRam > 0},
}
*/
params2 := queries.CreateEventParams{
Source: event.CloudEvent.Source,
CloudId: event.CloudEvent.ID,
EventTime: sql.NullInt64{Int64: unixTimestamp, Valid: unixTimestamp > 0},
ChainId: strconv.Itoa(event.CloudEvent.Data.ChainID),
VmId: sql.NullString{String: event.CloudEvent.Data.VM.VM.Value, Valid: event.CloudEvent.Data.VM.VM.Value != ""},
EventKey: sql.NullString{String: strconv.Itoa(event.CloudEvent.Data.Key), Valid: strconv.Itoa(event.CloudEvent.Data.Key) != ""},
Datacenter: sql.NullString{String: event.CloudEvent.Data.Datacenter.Name, Valid: event.CloudEvent.Data.Datacenter.Name != ""},
ComputeResource: sql.NullString{String: event.CloudEvent.Data.ComputeResource.Name, Valid: event.CloudEvent.Data.ComputeResource.Name != ""},
UserName: sql.NullString{String: event.CloudEvent.Data.UserName, Valid: event.CloudEvent.Data.UserName != ""},
}
h.Logger.Debug("database params", "params", params)
h.Logger.Debug("database params", "params", params2)
// Insert the new inventory record into the database
result, err := h.Database.Queries().CreateInventory(context.Background(), params)
result, err := h.Database.Queries().CreateEvent(context.Background(), params2)
if err != nil {
h.Logger.Error("unable to perform database insert", "error", err)
w.WriteHeader(http.StatusInternalServerError)