Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -161,11 +163,45 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
if err != nil {
return fmt.Errorf("failed to create event recorder: %v", err)
}
// Data gatherers are loaded depending on what the Kubernetes API supports.
// First, let's do a /api discovery to see what the API supports.
discoveryClient, err := k8s.NewDiscoveryClient("")
if err != nil {
return fmt.Errorf("failed to create a discovery client: %v", err)
}

_, apiResourceLists, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return fmt.Errorf("failed to get server resources: %v", err)
}
availableGVRs := sets.NewString()
for _, arl := range apiResourceLists {
gv, err := schema.ParseGroupVersion(arl.GroupVersion)
if err != nil {
log.Error(err, "Failed to parse group version")
continue
}
for _, ar := range arl.APIResources {
availableGVRs.Insert(gv.WithResource(ar.Name).String())
}
}
log.V(logs.Debug).Info("API discovery complete", "found", availableGVRs.List())
dataGatherers := map[string]datagatherer.DataGatherer{}

// load datagatherer config and boot each one
for _, dgConfig := range config.DataGatherers {
if c, ok := dgConfig.Config.(*k8s.ConfigDynamic); ok {
gvr := c.GroupVersionResource
if !availableGVRs.Has(gvr.String()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the challenge is that we want to start the DataGatherer when the CRD is installed at a later point in time.

log.Info(
"Skipping DataGatherer",
"name", dgConfig.Name,
"reason", "GroupVersionResource not available",
"groupVersionResource", gvr,
)
continue
}
}
kind := dgConfig.Kind
if dgConfig.DataPath != "" {
kind = "local"
Expand Down Expand Up @@ -213,17 +249,16 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// too, it will backoff and retry of its own accord. Initial boot
// will only be delayed by a max of 5 seconds.
bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second)
defer bootCancel()
for _, dgConfig := range config.DataGatherers {
dg := dataGatherers[dgConfig.Name]
for dgName, dg := range dataGatherers {
// wait for the informer to complete an initial sync, we do this to
// attempt to have an initial set of data for the first upload of
// the run.
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
// log sync failure, this might recover in future
log.Error(err, "Failed to complete initial sync of DataGatherer", "kind", dgConfig.Kind, "name", dgConfig.Name)
log.Error(err, "Failed to complete initial sync of DataGatherer", "name", dgName)
}
}
bootCancel()

// begin the datagathering loop, periodically sending data to the
// configured output using data in datagatherer caches or refreshing from
Expand Down