New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance NodeIPAM to support multiple ClusterCIDRs #109090
Enhance NodeIPAM to support multiple ClusterCIDRs #109090
Conversation
Hi @sarveshr7. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
This PR may require API review. If so, when the changes are ready, complete the pre-review checklist and request an API review. Status of requested reviews is tracked in the API Review project. |
/cc thockin |
So, we have 2 API object we have to reconcile, The key of the reconciliation is the The operations we have to do are
In addition we have to deal with a bootstrap process that depends on the apiserver. I think that all the code is valid, but we can use it as a level based controller like this type Controller struct {
client clientset.Interface
// informers for nodes and clusterCIDRConfig
nodeLister corelisters.NodeLister
nodesSynced cache.InformerSynced
clusterCIDRConfigLister networkinglisters.ClusterCIDRConfigLister
clusterCIDRConfigSynced cache.InformerSynced
// internal structures
pq PriorityQueue
CIDRMap map[string][]*cidrset.ClusterCIDR
// queue is where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
func NewController(client, informers, ....) *Controller {
c := &Controller{
}
// register event handlers to fill the queue with clusterCIDRConfig creations, updates and deletions
clusterCIDRConfigInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
c.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
},)
// register event handlers to fill the queue with clusterCIDRConfig creations, updates and deletions
// the handlers should map the Node object to the corresponding ClusterCIDRConfig key
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// function that returns the ClusterCIDR key given a node object
key, err := getClusterCIDRForNode(obj)
c.queue.Add(key)
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := getClusterCIDRForNode(new)
c.queue.Add(key)
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
// key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
key, err := getClusterCIDRForDeletedNode(obj)
c.queue.Add(key)
},
},)
return c
}
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
klog.Infof("Starting CIDR allocator controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
return
}
// BOOTSTRAP , the caches are synced, we have the information required on the
// information cache
bootStrap()
// start up your worker threads based on threadiness. Some controllers
// have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.Until(c.runWorker, time.Second, stopCh)
}
// wait until we're told to stop
<-stopCh
klog.Infof("Shutting down <NAME> controller")
}
func (c *Controller) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will
// automatically wait until there's work available, so we don't worry
// about secondary waits
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := c.syncHandler(key.(string))
if err == nil {
// if you had no error, tell the queue to stop tracking history for your
// key. This will reset things like failure counts for per-item rate
// limiting
c.queue.Forget(key)
return true
}
// there was a failure so be sure to report it. This method allows for
// pluggable error handling which can be used for things like
// cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
// since we failed, we should requeue the item to work on later. This
// method will add a backoff to avoid hotlooping on particular items
// (they're probably still not going to work right away) and overall
// controller protection (everything I've done is broken, this controller
// needs to calm down or it can starve other useful work) cases.
c.queue.AddRateLimited(key)
return true
}
func (c *Controller) syncHandler(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
// delete process
} else {
// create/update process
}
return nil
} |
/remove-sig api-machinery |
return err | ||
}(data) | ||
|
||
r.removeNodeFromProcessing(data.nodeName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it this being used?
return nil | ||
} | ||
|
||
if !r.insertNodeToProcessing(node.Name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this make sense if we use the channel
b11db08
to
edf75e0
Compare
/lgtm -- I am ok with the flag disablement for alpha. |
/remove-hold |
/lgtm |
MultiCIDRRangeAllocator is a new Range Allocator which makes using multiple ClusterCIDRs possible. It consists of two controllers, one for reconciling the ClusterCIDR API objects and the other for allocating Pod CIDRs to the nodes. The allocation is based on the rules defined in https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2593-multiple-cluster-cidrs
edf75e0
to
1473e13
Compare
/test pull-kubernetes-e2e-kind |
/lgtm This lgtm for alpha ... but there is still a lot of things to do for beta :) |
@sarveshr7: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
/test pull-kubernetes-conformance-kind-ga-only-parallel unrelated |
/triage accepted |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR implements kubernetes/enhancements#2593
Adds following components:
Which issue(s) this PR fixes:
NONE
Special notes for your reviewer:
Please note that this PR is rebased over an open API PR #111123, please review commits [Add cidrset to support multiple CIDRs](Add cidrset to support multiple CIDRs) onwards for this PR
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.:
/sig network