diff --git a/hack/docs/instancetypes_gen/main.go b/hack/docs/instancetypes_gen/main.go index 941cb6d0e27f..59b655c34771 100644 --- a/hack/docs/instancetypes_gen/main.go +++ b/hack/docs/instancetypes_gen/main.go @@ -130,6 +130,7 @@ below are the resources available with some assumptions and after the instance o subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval)) instanceTypeProvider := instancetype.NewDefaultProvider( cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval), + cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval), ec2api, subnetProvider, instancetype.NewDefaultResolver( diff --git a/hack/tools/launchtemplate_counter/main.go b/hack/tools/launchtemplate_counter/main.go index a859dfc57273..e75be08c82d2 100644 --- a/hack/tools/launchtemplate_counter/main.go +++ b/hack/tools/launchtemplate_counter/main.go @@ -58,6 +58,7 @@ func main() { subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval)) instanceTypeProvider := instancetype.NewDefaultProvider( cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval), + cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval), ec2api, subnetProvider, instancetype.NewDefaultResolver( diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 8f8c98d48c7b..6a6b5b1ad6be 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -38,6 +38,9 @@ const ( // SSMGetParametersByPathTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI // releases, so we should expect this to be updated relatively infrequently. SSMGetParametersByPathTTL = 24 * time.Hour + // DiscoveredCapacityCacheTTL is the time to drop discovered resource capacity data per-instance type + // if it is not updated by a node creation event or refreshed during controller reconciliation + DiscoveredCapacityCacheTTL = 60 * 24 * time.Hour ) const ( diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 9f44f5670b95..cb8ba2d5d6e7 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -27,6 +27,7 @@ import ( nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status" nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" + controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" @@ -68,6 +69,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Sess nodeclaimtagging.NewController(kubeClient, instanceProvider), controllerspricing.NewController(pricingProvider), controllersinstancetype.NewController(instanceTypeProvider), + controllersinstancetypecapacity.NewController(kubeClient, instanceTypeProvider), status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")), } if options.FromContext(ctx).InterruptionQueue != "" { diff --git a/pkg/controllers/providers/instancetype/capacity/controller.go b/pkg/controllers/providers/instancetype/capacity/controller.go new file mode 100644 index 000000000000..8aa86ed0c54f --- /dev/null +++ b/pkg/controllers/providers/instancetype/capacity/controller.go @@ -0,0 +1,80 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacity + +import ( + "context" + "fmt" + + "github.com/awslabs/operatorpkg/reasonable" + corev1 "k8s.io/api/core/v1" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/operator/injection" + + "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" +) + +type Controller struct { + kubeClient client.Client + instancetypeProvider *instancetype.DefaultProvider +} + +func NewController(kubeClient client.Client, instancetypeProvider *instancetype.DefaultProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + instancetypeProvider: instancetypeProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "providers.instancetype.capacity") + if err := c.instancetypeProvider.UpdateInstanceTypeCapacityFromNode(ctx, c.kubeClient, node); err != nil { + return reconcile.Result{}, fmt.Errorf("updating discovered capacity cache, %w", err) + } + return reconcile.Result{}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("providers.instancetype.capacity"). + For(&corev1.Node{}, builder.WithPredicates(predicate.TypedFuncs[client.Object]{ + // Only trigger reconciliation once a node becomes registered. This is an optimization to omit no-op reconciliations and reduce lock contention on the cache. + UpdateFunc: func(e event.TypedUpdateEvent[client.Object]) bool { + if e.ObjectOld.GetLabels()[karpv1.NodeRegisteredLabelKey] != "" { + return false + } + return e.ObjectNew.GetLabels()[karpv1.NodeRegisteredLabelKey] == "true" + }, + // Reconcile against all Nodes added to the informer cache in a registered state. This allows us to hydrate the discovered capacity cache on controller startup. + CreateFunc: func(e event.TypedCreateEvent[client.Object]) bool { + return e.Object.GetLabels()[karpv1.NodeRegisteredLabelKey] == "true" + }, + DeleteFunc: func(e event.TypedDeleteEvent[client.Object]) bool { return false }, + GenericFunc: func(e event.TypedGenericEvent[client.Object]) bool { return false }, + })). + WithOptions(controller.Options{ + RateLimiter: reasonable.RateLimiter(), + MaxConcurrentReconciles: 1, + }). + Complete(reconcile.AsReconciler(m.GetClient(), c)) +} diff --git a/pkg/controllers/providers/instancetype/capacity/suite_test.go b/pkg/controllers/providers/instancetype/capacity/suite_test.go new file mode 100644 index 000000000000..2fa853d7c660 --- /dev/null +++ b/pkg/controllers/providers/instancetype/capacity/suite_test.go @@ -0,0 +1,168 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacity_test + +import ( + "context" + "fmt" + "math" + "testing" + + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/utils/resources" + + controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity" + "github.com/aws/karpenter-provider-aws/pkg/fake" + + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + + corev1 "k8s.io/api/core/v1" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + coretest "sigs.k8s.io/karpenter/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + . "sigs.k8s.io/karpenter/pkg/utils/testing" + + "github.com/aws/karpenter-provider-aws/pkg/apis" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/test" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var controller *controllersinstancetypecapacity.Controller + +var nodeClass *v1.EC2NodeClass +var nodeClaim *karpv1.NodeClaim +var node *corev1.Node + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "CapacityCache") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(coretest.NodeClaimFieldIndexer(ctx))) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{ + VMMemoryOverheadPercent: lo.ToPtr[float64](0.075), + })) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) + nodeClass = test.EC2NodeClass() + nodeClaim = coretest.NodeClaim() + node = coretest.Node() + controller = controllersinstancetypecapacity.NewController(env.Client, awsEnv.InstanceTypesProvider) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + awsEnv.Reset() + ec2InstanceTypeInfo := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypeInfo) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypeInfo, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = Describe("CapacityCache", func() { + BeforeEach(func() { + ExpectApplied(ctx, env.Client, nodeClass) + + node = coretest.Node(coretest.NodeOptions{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + corev1.LabelInstanceTypeStable: "t3.medium", + karpv1.NodeRegisteredLabelKey: "true", + }, + }, + Capacity: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", 3840)), + }, + }) + ExpectApplied(ctx, env.Client, node) + + nodeClaim = &karpv1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nodeclaim", + }, + Spec: karpv1.NodeClaimSpec{ + NodeClassRef: &karpv1.NodeClassReference{ + Name: nodeClass.Name, + }, + Requirements: make([]karpv1.NodeSelectorRequirementWithMinValues, 0), + }, + Status: karpv1.NodeClaimStatus{ + NodeName: node.Name, + ImageID: nodeClass.Status.AMIs[0].ID, + }, + } + ExpectApplied(ctx, env.Client, nodeClaim) + }) + It("should update discovered capacity based on existing nodes", func() { + ExpectObjectReconciled(ctx, env.Client, controller, node) + instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + i, ok := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool { + return i.Name == "t3.medium" + }) + Expect(ok).To(BeTrue()) + Expect(i.Capacity.Memory().Value()).To(Equal(node.Status.Capacity.Memory().Value()), "Expected capacity to match discovered node capacity") + }) + It("should use VM_MEMORY_OVERHEAD_PERCENT calculation after AMI update", func() { + ExpectObjectReconciled(ctx, env.Client, controller, node) + + // Update NodeClass AMI and list instance-types. Cached values from prior AMI should no longer be used. + nodeClass.Status.AMIs[0].ID = "ami-new-test-id" + ExpectApplied(ctx, env.Client, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, controller, node) + instanceTypesNoCache, err := awsEnv.InstanceTypesProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + i, ok := lo.Find(instanceTypesNoCache, func(i *cloudprovider.InstanceType) bool { + return i.Name == "t3.medium" + }) + Expect(ok).To(BeTrue()) + + // Calculate memory capacity based on VM_MEMORY_OVERHEAD_PERCENT and output from DescribeInstanceType + mem := resources.Quantity(fmt.Sprintf("%dMi", 8192)) // Reported memory from fake.MakeInstances() + mem.Sub(resource.MustParse(fmt.Sprintf("%dMi", int64(math.Ceil(float64(mem.Value())*options.FromContext(ctx).VMMemoryOverheadPercent/1024/1024))))) + Expect(i.Capacity.Memory().Value()).To(Equal(mem.Value()), "Expected capacity to match VMMemoryOverheadPercent calculation") + }) +}) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 44f394f3a5c7..2b5d9885202f 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -171,6 +171,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont ) instanceTypeProvider := instancetype.NewDefaultProvider( cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval), + cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval), ec2api, subnetProvider, instancetype.NewDefaultResolver(*sess.Config.Region, pricingProvider, unavailableOfferingsCache), diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 4a5ef2c0f7d5..9731b925d263 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -48,7 +48,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) { fs.StringVar(&o.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "[REQUIRED] The kubernetes cluster name for resource discovery.") fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.") fs.BoolVarWithEnv(&o.IsolatedVPC, "isolated-vpc", "ISOLATED_VPC", false, "If true, then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS on-demand pricing endpoint.") - fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.") + fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types when cached information is unavailable.") fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.") fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.") } diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index dca16eb8bb29..b0c28a8eecd0 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -20,6 +20,13 @@ import ( "sync" "sync/atomic" + "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/scheduling" + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + + "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/mitchellh/hashstructure/v2" "github.com/patrickmn/go-cache" "github.com/prometheus/client_golang/prometheus" @@ -60,27 +67,30 @@ type DefaultProvider struct { muInstanceTypesOfferings sync.RWMutex instanceTypesOfferings map[string]sets.Set[string] - instanceTypesCache *cache.Cache - cm *pretty.ChangeMonitor + instanceTypesCache *cache.Cache + discoveredCapacityCache *cache.Cache + cm *pretty.ChangeMonitor // instanceTypesSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types instanceTypesSeqNum uint64 // instanceTypesOfferingsSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types instanceTypesOfferingsSeqNum uint64 } -func NewDefaultProvider(instanceTypesCache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, instanceTypesResolver Resolver) *DefaultProvider { +func NewDefaultProvider(instanceTypesCache *cache.Cache, discoveredCapacityCache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, instanceTypesResolver Resolver) *DefaultProvider { return &DefaultProvider{ - ec2api: ec2api, - subnetProvider: subnetProvider, - instanceTypesInfo: []*ec2.InstanceTypeInfo{}, - instanceTypesOfferings: map[string]sets.Set[string]{}, - instanceTypesResolver: instanceTypesResolver, - instanceTypesCache: instanceTypesCache, - cm: pretty.NewChangeMonitor(), - instanceTypesSeqNum: 0, + ec2api: ec2api, + subnetProvider: subnetProvider, + instanceTypesInfo: []*ec2.InstanceTypeInfo{}, + instanceTypesOfferings: map[string]sets.Set[string]{}, + instanceTypesResolver: instanceTypesResolver, + instanceTypesCache: instanceTypesCache, + discoveredCapacityCache: discoveredCapacityCache, + cm: pretty.NewChangeMonitor(), + instanceTypesSeqNum: 0, } } +//nolint:gocyclo func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) { p.muInstanceTypesInfo.RLock() p.muInstanceTypesOfferings.RLock() @@ -104,9 +114,13 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) // Compute fully initialized instance types hash key subnetZonesHash, _ := hashstructure.Hash(subnetZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) - key := fmt.Sprintf("%d-%d-%016x-%s", + // Compute hash key against node class AMIs (used to force cache rebuild when AMIs change) + amiHash, _ := hashstructure.Hash(nodeClass.Status.AMIs, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + + key := fmt.Sprintf("%d-%d-%016x-%016x-%016x", p.instanceTypesSeqNum, p.instanceTypesOfferingsSeqNum, + amiHash, subnetZonesHash, p.instanceTypesResolver.CacheKey(nodeClass), ) @@ -153,6 +167,9 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1.EC2NodeClass) }) it := p.instanceTypesResolver.Resolve(ctx, i, zoneData, nodeClass) + if cached, ok := p.discoveredCapacityCache.Get(fmt.Sprintf("%s-%016x", it.Name, amiHash)); ok { + it.Capacity[corev1.ResourceMemory] = cached.(resource.Quantity) + } for _, of := range it.Offerings { instanceTypeOfferingAvailable.With(prometheus.Labels{ instanceTypeLabel: it.Name, @@ -242,8 +259,45 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error return nil } +func (p *DefaultProvider) UpdateInstanceTypeCapacityFromNode(ctx context.Context, kubeClient client.Client, node *corev1.Node) error { + nodeClaim, err := nodeutils.NodeClaimForNode(ctx, kubeClient, node) + if err != nil { + return fmt.Errorf("failed to get nodeclaim for node, %w", err) + } + + nodeClass := &v1.EC2NodeClass{} + if err = kubeClient.Get(ctx, client.ObjectKey{Name: nodeClaim.Spec.NodeClassRef.Name}, nodeClass); err != nil { + return fmt.Errorf("failed to get ec2nodeclass, %w", err) + } + + // Get mappings for most recent AMIs + instanceTypeName := node.Labels[corev1.LabelInstanceTypeStable] + amiMap := amifamily.MapToInstanceTypes([]*cloudprovider.InstanceType{{ + Name: instanceTypeName, + Requirements: scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...), + }}, nodeClass.Status.AMIs) + // Ensure NodeClaim AMI is current + if !lo.ContainsBy(amiMap[nodeClaim.Status.ImageID], func(i *cloudprovider.InstanceType) bool { + return i.Name == instanceTypeName + }) { + return nil + } + + amiHash, _ := hashstructure.Hash(nodeClass.Status.AMIs, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + key := fmt.Sprintf("%s-%016x", instanceTypeName, amiHash) + + // Update cache if non-existent or actual capacity is less than or equal to cached value + actualCapacity := node.Status.Capacity.Memory() + if cachedCapacity, ok := p.discoveredCapacityCache.Get(key); !ok || actualCapacity.Cmp(cachedCapacity.(resource.Quantity)) < 1 { + log.FromContext(ctx).WithValues("memory-capacity", actualCapacity, "instance-type", instanceTypeName).V(1).Info("updating discovered capacity cache") + p.discoveredCapacityCache.SetDefault(key, *actualCapacity) + } + return nil +} + func (p *DefaultProvider) Reset() { p.instanceTypesInfo = []*ec2.InstanceTypeInfo{} p.instanceTypesOfferings = map[string]sets.Set[string]{} p.instanceTypesCache.Flush() + p.discoveredCapacityCache.Flush() } diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 698ae0df7c41..d531369c0bde 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -71,6 +71,7 @@ type Environment struct { SecurityGroupCache *cache.Cache InstanceProfileCache *cache.Cache SSMCache *cache.Cache + DiscoveredCapacityCache *cache.Cache // Providers InstanceTypesResolver *instancetype.DefaultResolver @@ -100,6 +101,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + discoveredCapacityCache := cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval) unavailableOfferingsCache := awscache.NewUnavailableOfferings() launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) @@ -120,7 +122,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment amiProvider := amifamily.NewDefaultProvider(clock, versionProvider, ssmProvider, ec2api, ec2Cache) amiResolver := amifamily.NewDefaultResolver() instanceTypesResolver := instancetype.NewDefaultResolver(fake.DefaultRegion, pricingProvider, unavailableOfferingsCache) - instanceTypesProvider := instancetype.NewDefaultProvider(instanceTypeCache, ec2api, subnetProvider, instanceTypesResolver) + instanceTypesProvider := instancetype.NewDefaultProvider(instanceTypeCache, discoveredCapacityCache, ec2api, subnetProvider, instanceTypesResolver) launchTemplateProvider := launchtemplate.NewDefaultProvider( ctx, @@ -164,6 +166,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment InstanceProfileCache: instanceProfileCache, UnavailableOfferingsCache: unavailableOfferingsCache, SSMCache: ssmCache, + DiscoveredCapacityCache: discoveredCapacityCache, InstanceTypesResolver: instanceTypesResolver, InstanceTypesProvider: instanceTypesProvider, @@ -199,6 +202,7 @@ func (env *Environment) Reset() { env.SecurityGroupCache.Flush() env.InstanceProfileCache.Flush() env.SSMCache.Flush() + env.DiscoveredCapacityCache.Flush() mfs, err := crmetrics.Registry.Gather() if err != nil { for _, mf := range mfs { diff --git a/website/content/en/preview/reference/settings.md b/website/content/en/preview/reference/settings.md index f39eebf3b8d9..e62984e05ac3 100644 --- a/website/content/en/preview/reference/settings.md +++ b/website/content/en/preview/reference/settings.md @@ -33,7 +33,7 @@ Karpenter surfaces environment variables and CLI parameters to allow you to conf | MEMORY_LIMIT | \-\-memory-limit | Memory limit on the container running the controller. The GC soft memory limit is set to 90% of this value. (default = -1)| | METRICS_PORT | \-\-metrics-port | The port the metric endpoint binds to for operating metrics about the controller itself (default = 8080)| | RESERVED_ENIS | \-\-reserved-enis | Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html. (default = 0)| -| VM_MEMORY_OVERHEAD_PERCENT | \-\-vm-memory-overhead-percent | The VM memory overhead as a percent that will be subtracted from the total memory for all instance types. (default = 0.075)| +| VM_MEMORY_OVERHEAD_PERCENT | \-\-vm-memory-overhead-percent | The VM memory overhead as a percent that will be subtracted from the total memory for all instance types when cached information is unavailable. (default = 0.075, which equals to 7.5%) | [comment]: <> (end docs generated content from hack/docs/configuration_gen_docs.go) diff --git a/website/content/en/preview/troubleshooting.md b/website/content/en/preview/troubleshooting.md index ee9d4e572672..89f48766e8d5 100644 --- a/website/content/en/preview/troubleshooting.md +++ b/website/content/en/preview/troubleshooting.md @@ -310,25 +310,34 @@ then the following solution(s) may resolve your issue. ### Karpenter incorrectly computes available resources for a node -When creating nodes, the allocatable resources Karpenter computed (as seen in logs and `nodeClaim.status.allocatable`) do not always match the allocatable resources on the created node (`node.status.allocatable`). -Karpenter uses the results from `ec2:DescribeInstanceTypes` to determine the resources available on a node launched with a given instance type. +When creating nodes, the allocatable resources Karpenter computed (as seen in logs and `nodeClaim.status.allocatable`) do not always match the allocatable resources on the created node (`node.status.allocatable`) due to some amount of memory being reserved for the hypervisor and underlying OS. +Karpenter uses the results from `ec2:DescribeInstanceTypes` along with a cache for tracking observed memory capacity to determine the resources available on a node launched with a given instance type. The following computation is used to determine allocatable CPU, memory, and ephemeral storage based on the results returned from `ec2:DescribeInstanceTypes`. ``` +### cpu nodeClaim.allocatable.cpu = instance.cpu - kubeReserved.cpu - systemReserved.cpu -nodeClaim.allocatable.memory = (instance.memory * (1.0 - VM_MEMORY_OVERHEAD_PERCENT)) - kubeReserved.memory - systemReserved.memory - max(evictionSoft.memory.available, evictionHard.memory.available) + +### memory +# If first time launching this instance-type + AMI pair +nodeClaim.allocatable.memory = (instance.memory * (1.0 - VM_MEMORY_OVERHEAD_PERCENT)) - kubeReserved.memory - systemReserved.memory - max(evictionSoft.memory.available, evictionHard.memory.available) +# For subsequent nodes where cached instance-type capacity is available +nodeClaim.allocatable.memory = ( cached.instance.memory - kubeReserved.memory - systemReserved.memory - max(evictionSoft.memory.available, evictionHard.memory.available) + +### ephemeral-storage nodeClaim.allocatable.ephemeralStorage = instance.storage - kubeReserved.ephemeralStorage - systemReserved.ephemeralStorage - max(evictionSoft.nodefs.available, evictionHard.nodefs.available) ``` Most of these factors directly model user configuration (i.e. the KubeletConfiguration options). On the other hand, `VM_MEMORY_OVERHEAD_PERCENT` models an implicit reduction of available memory that varies by instance type and AMI. -Karpenter can't compute the exact value being modeled, so `VM_MEMORY_OVERHEAD_PERCENT` is a [global setting]({{< ref "./reference/settings.md" >}}) used across all instance type and AMI combinations. +However, once a node is created, the actual memory capacity on that node (node.status.capacity.memory) is checked by the controller. The controller caches the observed memory for any subsequent nodes launched with the same AMI and instance type pair, improving accuracy for future nodes. +For new combinations of AMI and instance type (i.e., when this pair is launched for the first time), Karpenter will still use the VM_MEMORY_OVERHEAD_PERCENT value as a fallback for estimating allocatable memory. +This fallback is necessary because Karpenter can't compute the exact value being modeled ahead of time, so `VM_MEMORY_OVERHEAD_PERCENT` is a [global setting]({{< ref "./reference/settings.md" >}}) used across all instance type and AMI combinations. The default value (`7.5%`) has been tuned to closely match reality for the majority of instance types while not overestimating. -As a result, Karpenter will typically underestimate the memory availble on a node for a given instance type. +As a result, Karpenter will typically underestimate the memory available on a node for a given instance type. If you know the real `VM_MEMORY_OVERHEAD_PERCENT` for the specific instances you're provisioning in your cluster, you can tune this value to tighten the bound. However, this should be done with caution. A `VM_MEMORY_OVERHEAD_PERCENT` which results in Karpenter overestimating the memory available on a node can result in Karpenter launching nodes which are too small for your workload. -In the worst case, this can result in an instance launch loop and your workload remaining unschedulable indefinitely. To detect instances of Karpenter overestimating resource availability, the following status condition can be monitored: