add code for VmBeingModified endpoint
This commit is contained in:
9
db/migrations/20240915015710_events_type.sql
Normal file
9
db/migrations/20240915015710_events_type.sql
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
-- +goose Up
|
||||||
|
-- +goose StatementBegin
|
||||||
|
ALTER TABLE "Events" ADD COLUMN EventType TEXT;
|
||||||
|
-- +goose StatementEnd
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
-- +goose StatementBegin
|
||||||
|
ALTER TABLE "Updates" DROP COLUMN EventType;
|
||||||
|
-- +goose StatementEnd
|
11
internal/tasks/monitorVcenter.go
Normal file
11
internal/tasks/monitorVcenter.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// use gocron to check vcenters for VMs or updates we don't know about
|
||||||
|
func (c *CronTask) RunVcenterPoll(ctx context.Context, logger *slog.Logger) error {
|
||||||
|
return nil
|
||||||
|
}
|
@@ -21,7 +21,6 @@ type CronTask struct {
|
|||||||
// use gocron to check events in the Events table
|
// use gocron to check events in the Events table
|
||||||
func (c *CronTask) RunVmCheck(ctx context.Context, logger *slog.Logger) error {
|
func (c *CronTask) RunVmCheck(ctx context.Context, logger *slog.Logger) error {
|
||||||
var (
|
var (
|
||||||
//unixTimestamp int64
|
|
||||||
numVcpus int32
|
numVcpus int32
|
||||||
numRam int32
|
numRam int32
|
||||||
totalDiskGB float64
|
totalDiskGB float64
|
||||||
@@ -66,7 +65,6 @@ func (c *CronTask) RunVmCheck(ctx context.Context, logger *slog.Logger) error {
|
|||||||
// calculate VM properties we want to store
|
// calculate VM properties we want to store
|
||||||
if vmObject.Vm.Config != nil {
|
if vmObject.Vm.Config != nil {
|
||||||
numRam = vmObject.Vm.Config.Hardware.MemoryMB
|
numRam = vmObject.Vm.Config.Hardware.MemoryMB
|
||||||
//numVcpus = vmObject.Vm.Config.Hardware.NumCPU * vmObject.Vm.Config.Hardware.NumCoresPerSocket
|
|
||||||
numVcpus = vmObject.Vm.Config.Hardware.NumCPU
|
numVcpus = vmObject.Vm.Config.Hardware.NumCPU
|
||||||
|
|
||||||
// Calculate the total disk allocated in GB
|
// Calculate the total disk allocated in GB
|
||||||
@@ -94,7 +92,7 @@ func (c *CronTask) RunVmCheck(ctx context.Context, logger *slog.Logger) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if foundVm {
|
if foundVm {
|
||||||
c.Logger.Debug("Simulate adding to Inventory", "vm_name", evt.VmName.String, "vcpus", numVcpus, "ram", numRam, "dc", evt.DatacenterId.String)
|
c.Logger.Debug("Adding to Inventory table", "vm_name", evt.VmName.String, "vcpus", numVcpus, "ram", numRam, "dc", evt.DatacenterId.String)
|
||||||
|
|
||||||
params := queries.CreateInventoryParams{
|
params := queries.CreateInventoryParams{
|
||||||
Name: vmObject.Vm.Name,
|
Name: vmObject.Vm.Name,
|
||||||
|
22
main.go
22
main.go
@@ -78,7 +78,7 @@ func main() {
|
|||||||
bindPort = "9443"
|
bindPort = "9443"
|
||||||
}
|
}
|
||||||
bindAddress := fmt.Sprint(bindIP, ":", bindPort)
|
bindAddress := fmt.Sprint(bindIP, ":", bindPort)
|
||||||
slog.Info("Will listen on address", "ip", bindIP, "port", bindPort)
|
//logger.Info("Will listen on address", "ip", bindIP, "port", bindPort)
|
||||||
|
|
||||||
// Determine bind disable TLS
|
// Determine bind disable TLS
|
||||||
bindDisableTlsEnv := os.Getenv("BIND_DISABLE_TLS")
|
bindDisableTlsEnv := os.Getenv("BIND_DISABLE_TLS")
|
||||||
@@ -103,7 +103,7 @@ func main() {
|
|||||||
|
|
||||||
// Generate certificate if required
|
// Generate certificate if required
|
||||||
if !(utils.FileExists(tlsCertFilename) && utils.FileExists(tlsKeyFilename)) {
|
if !(utils.FileExists(tlsCertFilename) && utils.FileExists(tlsKeyFilename)) {
|
||||||
slog.Warn("Specified TLS certificate or private key do not exist", "certificate", tlsCertFilename, "tls-key", tlsKeyFilename)
|
logger.Warn("Specified TLS certificate or private key do not exist", "certificate", tlsCertFilename, "tls-key", tlsKeyFilename)
|
||||||
utils.GenerateCerts(tlsCertFilename, tlsKeyFilename)
|
utils.GenerateCerts(tlsCertFilename, tlsKeyFilename)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,7 +119,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
logger.Debug("Setting VM polling cronjob frequency to", "frequency", cronFrequency)
|
logger.Debug("Setting VM polling cronjob frequency to", "frequency", cronFrequency)
|
||||||
|
|
||||||
// start background processing
|
// start background processing for events stored in events table
|
||||||
startsAt := time.Now().Add(time.Second * 10)
|
startsAt := time.Now().Add(time.Second * 10)
|
||||||
job, err := s.NewJob(
|
job, err := s.NewJob(
|
||||||
gocron.DurationJob(cronFrequency),
|
gocron.DurationJob(cronFrequency),
|
||||||
@@ -132,9 +132,23 @@ func main() {
|
|||||||
logger.Error("failed to start cron jobs", "error", err)
|
logger.Error("failed to start cron jobs", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
logger.Debug("Created event processing cron job", "job", job.ID())
|
||||||
|
|
||||||
slog.Debug("Created cron job", "job", job)
|
startsAt2 := time.Now().Add(time.Second * 10)
|
||||||
|
job2, err := s.NewJob(
|
||||||
|
gocron.DurationJob(cronFrequency),
|
||||||
|
gocron.NewTask(func() {
|
||||||
|
c.RunVcenterPoll(ctx, logger)
|
||||||
|
}), gocron.WithSingletonMode(gocron.LimitModeReschedule),
|
||||||
|
gocron.WithStartAt(gocron.WithStartDateTime(startsAt2)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to start cron jobs", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
logger.Debug("Created vcenter polling cron job", "job", job2.ID())
|
||||||
|
|
||||||
|
// start cron scheduler
|
||||||
s.Start()
|
s.Start()
|
||||||
|
|
||||||
// Start server
|
// Start server
|
||||||
|
@@ -33,7 +33,7 @@ func (h *Handler) VmCreate(w http.ResponseWriter, r *http.Request) {
|
|||||||
h.Logger.Debug("received input data", "length", len(reqBody))
|
h.Logger.Debug("received input data", "length", len(reqBody))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode the JSON body into vmModel struct
|
// Decode the JSON body into CloudEventReceived struct
|
||||||
var event models.CloudEventReceived
|
var event models.CloudEventReceived
|
||||||
if err := json.Unmarshal(reqBody, &event); err != nil {
|
if err := json.Unmarshal(reqBody, &event); err != nil {
|
||||||
h.Logger.Error("unable to decode json", "error", err)
|
h.Logger.Error("unable to decode json", "error", err)
|
||||||
|
@@ -1,21 +1,96 @@
|
|||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
models "vctp/server/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VmModify receives the CloudEvent for a VM modification or move
|
// VmModify receives the CloudEvent for a VM modification or move
|
||||||
func (h *Handler) VmModify(w http.ResponseWriter, r *http.Request) {
|
func (h *Handler) VmModify(w http.ResponseWriter, r *http.Request) {
|
||||||
reqBody, err := io.ReadAll(r.Body)
|
reqBody, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
h.Logger.Error("Invalid data received", "error", err)
|
||||||
fmt.Fprintf(w, "Invalid data received")
|
fmt.Fprintf(w, "Invalid data received")
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
h.Logger.Debug("received input data", "length", len(reqBody))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decode the JSON body into CloudEventReceived struct
|
||||||
|
var event models.CloudEventReceived
|
||||||
|
if err := json.Unmarshal(reqBody, &event); err != nil {
|
||||||
|
h.Logger.Error("unable to decode json", "error", err)
|
||||||
|
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
h.Logger.Debug("successfully decoded JSON")
|
||||||
|
prettyPrint(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (models.ConfigChangesReceived{} == event.CloudEvent.Data.ConfigChanges) {
|
||||||
|
h.Logger.Debug("Received event contains no config change")
|
||||||
|
} else {
|
||||||
|
h.Logger.Debug("Received event contains config change info")
|
||||||
|
config := h.processConfigChanges(event.CloudEvent.Data.ConfigChanges.Modified)
|
||||||
|
prettyPrint(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// Map to hold the JSON data
|
||||||
|
var config map[string]interface{}
|
||||||
|
|
||||||
|
// Unmarshal the JSON into the map
|
||||||
|
if len(*event.CloudEvent.Data.ConfigSpec) > 0 {
|
||||||
|
err := json.Unmarshal([]byte(*event.CloudEvent.Data.ConfigSpec), &config)
|
||||||
|
if err != nil {
|
||||||
|
h.Logger.Error("unable to decode json", "error", err)
|
||||||
|
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
h.Logger.Debug("received update request", "body", string(reqBody))
|
h.Logger.Debug("received update request", "body", string(reqBody))
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
fmt.Fprintf(w, "Update Request (%d): %v\n", len(reqBody), string(reqBody))
|
fmt.Fprintf(w, "Update Request (%d): %v\n", len(reqBody), string(reqBody))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) processConfigChanges(configChanges string) []map[string]string {
|
||||||
|
// Split the string on one or more consecutive newline characters
|
||||||
|
changes := regexp.MustCompile(`\n+`).Split(configChanges, -1)
|
||||||
|
|
||||||
|
// Regular expression to match config type and the new value after '->' or '<-'
|
||||||
|
re := regexp.MustCompile(`(?P<type>[^\s]+): \d+ -[><] (?P<newValue>\d+)`)
|
||||||
|
|
||||||
|
// Result will hold a list of changes with type and new value
|
||||||
|
var result []map[string]string
|
||||||
|
|
||||||
|
for _, change := range changes {
|
||||||
|
// Trim any extra spaces and skip empty lines
|
||||||
|
change = strings.TrimSpace(change)
|
||||||
|
h.Logger.Debug("Processing config change element", "substring", change)
|
||||||
|
if change == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the matches using the regex
|
||||||
|
match := re.FindStringSubmatch(change)
|
||||||
|
if len(match) > 0 {
|
||||||
|
// Create a map with 'type' and 'newValue'
|
||||||
|
changeMap := map[string]string{
|
||||||
|
"type": match[1], // config type
|
||||||
|
"newValue": match[2], // new value after -> or <-
|
||||||
|
}
|
||||||
|
h.Logger.Debug("Adding new entry to output", "map", changeMap)
|
||||||
|
result = append(result, changeMap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type CloudEventReceived struct {
|
type CloudEventReceived struct {
|
||||||
CloudEvent struct {
|
CloudEvent struct {
|
||||||
@@ -55,10 +58,12 @@ type CloudEventReceived struct {
|
|||||||
Value string `json:"Value"`
|
Value string `json:"Value"`
|
||||||
} `json:"Vm"`
|
} `json:"Vm"`
|
||||||
} `json:"Vm"`
|
} `json:"Vm"`
|
||||||
ConfigSpec any `json:"configSpec"`
|
ConfigSpec *json.RawMessage `json:"configSpec"`
|
||||||
ConfigChanges struct {
|
ConfigChanges ConfigChangesReceived `json:"configChanges"`
|
||||||
Modified string `json:"modified"`
|
|
||||||
} `json:"configChanges"`
|
|
||||||
} `json:"data"`
|
} `json:"data"`
|
||||||
} `json:"cloudEvent"`
|
} `json:"cloudEvent"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ConfigChangesReceived struct {
|
||||||
|
Modified string `json:"modified"`
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user