Skip to content

Commit

Permalink
feat: change event routing business logic (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasepe authored Jul 5, 2024
1 parent fbbacd1 commit 887fa4d
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 151 deletions.
20 changes: 13 additions & 7 deletions internal/router/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"time"

"github.com/krateoplatformops/eventrouter/apis/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

type advOpts struct {
httpClient *http.Client
registrationSpec v1alpha1.RegistrationSpec
eventInfo EventInfo
eventInfo corev1.Event
}

func newAdvisor(opts advOpts) *advisor {
Expand All @@ -29,7 +30,7 @@ func newAdvisor(opts advOpts) *advisor {
type advisor struct {
httpClient *http.Client
reg v1alpha1.RegistrationSpec
evt EventInfo
evt corev1.Event
}

func (c *advisor) Job() {
Expand All @@ -40,26 +41,31 @@ func (c *advisor) Job() {
}

func (c *advisor) notify() error {
compositionId := ""
if labels := c.evt.GetLabels(); len(labels) > 0 {
compositionId = labels[keyCompositionID]
}

dat, err := json.Marshal(c.evt)
if err != nil {
return fmt.Errorf("cannot encode notification (deploymentId:%s, destinationURL:%s): %w",
c.evt.DeploymentId, c.reg.Endpoint, err)
return fmt.Errorf("cannot encode notification (compositionId:%s, destinationURL:%s): %w",
compositionId, c.reg.Endpoint, err)
}

ctx, cncl := context.WithTimeout(context.Background(), time.Second*40)
defer cncl()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.reg.Endpoint, bytes.NewBuffer(dat))
if err != nil {
return fmt.Errorf("cannot create notification (deploymentId:%s, destinationURL:%s): %w",
c.evt.DeploymentId, c.reg.Endpoint, err)
return fmt.Errorf("cannot create notification (compositionId:%s, destinationURL:%s): %w",
compositionId, c.reg.Endpoint, err)
}

req.Header.Set("Content-Type", "application/json")
_, err = c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("cannot send notification (deploymentId:%s, destinationURL:%s): %w",
c.evt.DeploymentId, c.reg.Endpoint, err)
compositionId, c.reg.Endpoint, err)
}

return nil
Expand Down
57 changes: 0 additions & 57 deletions internal/router/event_info.go

This file was deleted.

33 changes: 21 additions & 12 deletions internal/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ type pusher struct {
func (c *pusher) Handle(evt corev1.Event) {
ref := &evt.InvolvedObject

deploymentId, err := findDeploymentID(c.objectResolver, ref)
compositionId, err := findCompositionID(c.objectResolver, ref)
if err != nil {
klog.ErrorS(err, "looking for deploymentId",
"involvedObject", ref.Name)
klog.ErrorS(err, "looking for composition id", "involvedObject", ref.Name)
return
}

Expand All @@ -65,29 +64,39 @@ func (c *pusher) Handle(evt corev1.Event) {
"kind", ref.Kind,
"apiGroup", evt.InvolvedObject.GroupVersionKind().Group,
"reason", evt.Reason,
"deploymentId", deploymentId)
"compositionId", compositionId)
}

err = patchWithLabels(c.objectResolver, &evt, deploymentId)
err = patchWithLabels(c.objectResolver, &evt, compositionId)
if err != nil {
klog.ErrorS(err, "unable to patch with labels",
"involvedObject", ref.Name)
klog.ErrorS(err, "unable to patch with labels", "involvedObject", ref.Name)
return
}

all, err := c.getAllRegistrations(context.Background())
if err != nil {
klog.ErrorS(err, "unable to list registrations",
"involvedObject", ref.Name)
klog.ErrorS(err, "unable to list registrations", "involvedObject", ref.Name)
return
}

msg := NewEventInfo(deploymentId, &evt)
if len(evt.ManagedFields) == 0 {
evt.ManagedFields = nil
}

labels := evt.GetLabels()
if labels == nil {
labels = map[string]string{
keyCompositionID: compositionId,
}
} else {
labels[keyCompositionID] = compositionId
}
evt.SetLabels(labels)

c.notifyAll(all, msg)
c.notifyAll(all, evt)
}

func (c *pusher) notifyAll(all map[string]v1alpha1.RegistrationSpec, evt EventInfo) {
func (c *pusher) notifyAll(all map[string]v1alpha1.RegistrationSpec, evt corev1.Event) {
for _, el := range all {
job := newAdvisor(advOpts{
httpClient: c.httpClient,
Expand Down
10 changes: 5 additions & 5 deletions internal/router/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ const (
keyPatchedBy = "krateo.io/patched-by"
)

func patchWithLabels(resolver *objects.ObjectResolver, evt *corev1.Event, deploymentId string) error {
func patchWithLabels(resolver *objects.ObjectResolver, evt *corev1.Event, compositionId string) error {
extras, err := createPatchData(map[string]string{
keyCompositionID: deploymentId,
keyCompositionID: compositionId,
keyPatchedBy: "krateo",
})
if err != nil {
Expand All @@ -31,8 +31,8 @@ func patchWithLabels(resolver *objects.ObjectResolver, evt *corev1.Event, deploy
PatchData: extras,
})
if err != nil {
if len(deploymentId) > 0 {
return fmt.Errorf("patching event with deploymentId '%s': %w", deploymentId, err)
if len(compositionId) > 0 {
return fmt.Errorf("patching event with composition id '%s': %w", compositionId, err)
} else {
return fmt.Errorf("patching event: %w", err)
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func wasPatchedByKrateo(obj *corev1.Event) bool {
return ok
}

func findDeploymentID(resolver *objects.ObjectResolver, ref *corev1.ObjectReference) (string, error) {
func findCompositionID(resolver *objects.ObjectResolver, ref *corev1.ObjectReference) (string, error) {
obj, err := resolver.ResolveReference(context.Background(), ref)
if err != nil {
return "", err
Expand Down
8 changes: 3 additions & 5 deletions internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

"github.com/krateoplatformops/eventrouter/internal/objects"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -110,9 +108,9 @@ func (er *EventRouter) onEvent(event *corev1.Event) {
return
}

if !objects.Accept(&event.InvolvedObject) {
return
}
// if !objects.Accept(&event.InvolvedObject) {
// return
// }

klog.V(4).InfoS("Received event",
"msg", event.Message,
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func main() {
flag.PrintDefaults()
}

klog.InitFlags(nil)

flag.Parse()

// Kubernetes configuration
Expand Down
1 change: 1 addition & 0 deletions manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spec:
args:
- --insecure=true
- --debug=true
- --v=6
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: false
Expand Down
17 changes: 0 additions & 17 deletions testdata/cloudevent.json

This file was deleted.

63 changes: 46 additions & 17 deletions testdata/event.mock.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,51 @@
apiVersion: v1
firstTimestamp: "2022-08-18T15:12:07Z"
involvedObject:
apiVersion: eks.aws.crossplane.io/v1alpha1
kind: NodeGroup
name: test-1-ng
resourceVersion: "17223"
uid: 6365c158-8ee1-4d36-a33a-ba3cc0958ee1
kind: Service
metadata:
name: fake-service-1
namespace: demo-system
labels:
krateo.io/composition-id: abcde12345
creationTimestamp: "2024-07-05T07:32:32Z"
#uid: 383b7f73-bdfe-4817-a06d-b38e6e655689
spec:
clusterIP: 10.96.139.206
clusterIPs:
- 10.96.139.206
externalTrafficPolicy: Cluster
internalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
ports:
- name: sse
nodePort: 30081
port: 80
protocol: TCP
targetPort: 8181
selector:
app: eventsse
sessionAffinity: None
type: NodePort
status:
loadBalancer: {}
---
apiVersion: v1
kind: Event
lastTimestamp: "2022-08-18T15:12:09Z"
message: 'cannot create EKS node group: ResourceInUseException: Cluster: test-1 is
not in a valid state'
metadata:
annotations:
external-name: test-1-ng
name: test-2-ng.170c791ccd13d0cr
namespace: default
name: fake-event-1
namespace: demo-system
#uid: 90e8c2fe-a54f-4e74-962d-8e6abbce196d
reason: CannotCreateExternalResource
type: Warning
firstTimestamp: "2024-07-05T07:33:07Z"
lastTimestamp: "2024-07-05T07:33:09Z"
message: 'Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...'
involvedObject:
apiVersion: v1
kind: Service
name: fake-service-1
namespace: demo-system
#resourceVersion: "17223"
uid: 383b7f73-bdfe-4817-a06d-b38e6e655689
reason: LoremIpsum
source:
component: managed/nodegroup
type: Warning
component: krateo
20 changes: 0 additions & 20 deletions testdata/eventinfo.json

This file was deleted.

11 changes: 0 additions & 11 deletions testdata/labels.gotemplate

This file was deleted.

0 comments on commit 887fa4d

Please sign in to comment.