Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix executor tries to create same name deployment #1082

Merged
merged 9 commits into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion executor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (executor *Executor) getServiceForFunction(m *metav1.ObjectMeta) (string, e
if resp.err != nil {
return "", resp.err
}
executor.fsCache.IncreaseColdStarts(m.Name, string(m.UID))
return resp.funcSvc.Address, resp.err
}

Expand Down
57 changes: 9 additions & 48 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/fission/fission"
"github.com/fission/fission/cache"
"github.com/fission/fission/crd"
"github.com/fission/fission/executor/fscache"
"github.com/fission/fission/executor/newdeploy"
Expand All @@ -41,9 +40,9 @@ import (

type (
Executor struct {
gpm *poolmgr.GenericPoolManager
ndm *newdeploy.NewDeploy
functionEnv *cache.Cache
gpm *poolmgr.GenericPoolManager
ndm *newdeploy.NewDeploy

fissionClient *crd.FissionClient
fsCache *fscache.FunctionServiceCache

Expand All @@ -65,7 +64,6 @@ func MakeExecutor(gpm *poolmgr.GenericPoolManager, ndm *newdeploy.NewDeploy, fis
executor := &Executor{
gpm: gpm,
ndm: ndm,
functionEnv: cache.MakeCache(10*time.Second, 0),
fissionClient: fissionClient,
fsCache: fsCache,

Expand Down Expand Up @@ -154,58 +152,21 @@ func (executor *Executor) createServiceForFunction(meta *metav1.ObjectMeta) (*fs
case fission.ExecutorTypeNewdeploy:
fsvc, fsvcErr = executor.ndm.GetFuncSvc(meta)
default:
// from Func -> get Env
log.Printf("[%v] getting environment for function", meta.Name)
env, err := executor.getFunctionEnv(meta)
if err != nil {
return nil, err
}

pool, err := executor.gpm.GetPool(env)
if err != nil {
return nil, err
}
// from GenericPool -> get one function container
// (this also adds to the cache)
log.Printf("[%v] getting function service from pool", meta.Name)
fsvc, fsvcErr = pool.GetFuncSvc(meta)
fsvc, fsvcErr = executor.gpm.GetFuncSvc(meta)
}

if fsvcErr != nil {
fsvcErr = errors.Wrap(fsvcErr, fmt.Sprintf("[%v] Error creating service for function", meta.Name))
log.Print(fsvcErr)
}

return fsvc, fsvcErr
}

func (executor *Executor) getFunctionEnv(m *metav1.ObjectMeta) (*crd.Environment, error) {
var env *crd.Environment

// Cached ?
result, err := executor.functionEnv.Get(crd.CacheKey(m))
if err == nil {
env = result.(*crd.Environment)
return env, nil
}

// Cache miss -- get func from controller
f, err := executor.fissionClient.Functions(m.Namespace).Get(m.Name)
if err != nil {
return nil, err
}

// Get env from metadata
log.Printf("[%v] getting env", m)
env, err = executor.fissionClient.Environments(f.Spec.Environment.Namespace).Get(f.Spec.Environment.Name)
executor.fsCache.IncreaseColdStarts(meta.Name, string(meta.UID))
_, err = executor.fsCache.Add(*fsvc)
if err != nil {
return nil, err
}

// cache for future lookups
executor.functionEnv.Set(crd.CacheKey(m), env)

return env, nil
return fsvc, fsvcErr
}

// isValidAddress invokes isValidService or isValidPod depending on the type of executor
Expand Down Expand Up @@ -255,11 +216,11 @@ func StartExecutor(fissionNamespace string, functionNamespace string, envBuilder

gpm := poolmgr.MakeGenericPoolManager(
fissionClient, kubernetesClient,
functionNamespace, fsCache, poolID)
functionNamespace, poolID)

ndm := newdeploy.MakeNewDeploy(
fissionClient, kubernetesClient, restClient,
functionNamespace, fsCache, poolID)
functionNamespace, poolID)

api := MakeExecutor(gpm, ndm, fissionClient, fsCache)

Expand Down
4 changes: 2 additions & 2 deletions executor/fscache/functionServiceCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ func (fsc *FunctionServiceCache) GetByFunctionUID(uid types.UID) (*FuncSvc, erro
func (fsc *FunctionServiceCache) Add(fsvc FuncSvc) (*FuncSvc, error) {
err, existing := fsc.byFunction.Set(crd.CacheKey(fsvc.Function), &fsvc)
if err != nil {
if existing != nil {
if IsNameExistError(err) {
f := existing.(*FuncSvc)
err2 := fsc.TouchByAddress(f.Address)
if err2 != nil {
return nil, err2
}
fCopy := *f
return &fCopy, err
return &fCopy, nil
}
return nil, err
}
Expand Down
11 changes: 5 additions & 6 deletions executor/newdeploy/newdeploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ func (deploy *NewDeploy) setupRBACObjs(deployNamespace string, fn *crd.Function)
return nil
}

func (deploy *NewDeploy) getDeployment(fn *crd.Function) (*v1beta1.Deployment, error) {
deployName := deploy.getObjName(fn)
return deploy.kubernetesClient.ExtensionsV1beta1().Deployments(fn.Metadata.Namespace).Get(deployName, metav1.GetOptions{})
func (deploy *NewDeploy) getDeployment(ns, name string) (*v1beta1.Deployment, error) {
return deploy.kubernetesClient.ExtensionsV1beta1().Deployments(ns).Get(name, metav1.GetOptions{})
}

func (deploy *NewDeploy) updateDeployment(deployment *v1beta1.Deployment, ns string) error {
Expand Down Expand Up @@ -428,9 +427,8 @@ func (deploy *NewDeploy) createOrGetHpa(hpaName string, execStrategy *fission.Ex

}

func (deploy *NewDeploy) getHpa(ns string, fn *crd.Function) (*asv1.HorizontalPodAutoscaler, error) {
hpaName := deploy.getObjName(fn)
return deploy.kubernetesClient.AutoscalingV1().HorizontalPodAutoscalers(ns).Get(hpaName, metav1.GetOptions{})
func (deploy *NewDeploy) getHpa(ns, name string) (*asv1.HorizontalPodAutoscaler, error) {
return deploy.kubernetesClient.AutoscalingV1().HorizontalPodAutoscalers(ns).Get(name, metav1.GetOptions{})
}

func (deploy *NewDeploy) updateHpa(hpa *asv1.HorizontalPodAutoscaler) error {
Expand Down Expand Up @@ -510,6 +508,7 @@ func (deploy *NewDeploy) waitForDeploy(depl *v1beta1.Deployment, replicas int32)
return nil, errors.New("failed to create deployment within timeout window")
}

// cleanupNewdeploy cleans all kubernetes objects related to function
func (deploy *NewDeploy) cleanupNewdeploy(ns string, name string) error {
var multierr *multierror.Error

Expand Down
Loading