Skip to content

Commit

Permalink
Dynamically serve on demand configuration for Cluster discovery per r…
Browse files Browse the repository at this point in the history
…oute
  • Loading branch information
ffilippopoulos committed Jan 17, 2025
1 parent 95eac4c commit 207d42b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ spec:
admin:
address:
socket_address: { address: 127.0.0.1, port_value: 9901 }
dynamic_resources:
cds_config:
api_config_source:
api_type: DELTA_GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
static_resources:
listeners:
- name: listener_0
Expand All @@ -68,18 +61,6 @@ spec:
- envoy_grpc:
cluster_name: xds_cluster
http_filters:
- name: envoy.filters.http.on_demand
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.on_demand.v3.OnDemand
odcds:
source:
resource_api_version: V3
api_config_source:
api_type: DELTA_GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
Expand Down
48 changes: 47 additions & 1 deletion xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
on_demandv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/on_demand/v3"
routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
managerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand Down Expand Up @@ -249,6 +250,47 @@ func patchClusterDeltaEDS(cluster *clusterv3.Cluster) *clusterv3.Cluster {
return cluster
}

// patchVirtualHostForOnDemandDiscovery will patch VirtualHosts config to
// configure lazy cluster discovery per route
func patchVirtualHostForOnDemandDiscovery(vh *routev3.VirtualHost) (*routev3.VirtualHost, error) {
onDemandCds := &on_demandv3.OnDemandCds{
Source: &corev3.ConfigSource{
ResourceApiVersion: corev3.ApiVersion_V3,
ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{
ApiConfigSource: &corev3.ApiConfigSource{
ApiType: corev3.ApiConfigSource_DELTA_GRPC,
TransportApiVersion: corev3.ApiVersion_V3,
GrpcServices: []*corev3.GrpcService{
{
TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{
ClusterName: "xds_cluster",
},
},
},
},
},
},
},
}
// Create PerRouteConfig with OnDemandCds
perRouteConfig := &on_demandv3.OnDemand{
Odcds: onDemandCds,
}
// Convert PerRouteConfig to protobuf.Any
typedConfig, err := anypb.New(perRouteConfig)
if err != nil {
return nil, fmt.Errorf("Cannot create on demand typedConfig: %v", err)
}

for _, route := range vh.Routes {
route.TypedPerFilterConfig = map[string]*anypb.Any{
"envoy.filters.http.on_demand": typedConfig,
}
}
return vh, nil
}

// servicesToResources will return a set of listener, routeConfiguration and
// cluster for each service port
func servicesToResources(serviceStore XdsServiceStore, authority string) ([]types.Resource, []types.Resource, []types.Resource, error) {
Expand Down Expand Up @@ -285,7 +327,11 @@ func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string
for _, port := range s.Service.Spec.Ports {
vh := makeVirtualHost(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Retry, s.RingHashPolicies)
vh.Name = fmt.Sprintf("kube_dynamic/%s", vh.Name) // patch virtual host name to prefix with kube_dynamic as requests will be expected based on route config name
vhds[vh.Name] = vh
patchedVH, err := patchVirtualHostForOnDemandDiscovery(vh)
if err != nil {
log.Logger.Warn("Failed to patch on demand discovery configuration, skipping VirtualHost", "name", vh.Name, "error", err)
}
vhds[vh.Name] = patchedVH
cluster := makeCluster(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Policy, s.RingHash)
cls[cluster.Name] = patchClusterDeltaEDS(cluster)
}
Expand Down

0 comments on commit 207d42b

Please sign in to comment.