Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup pserver resource after trainer job end. #515

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/pservercleaner/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM ubuntu:16.04
ADD pservercleaner /usr/bin
CMD ["pservercleaner"]
225 changes: 225 additions & 0 deletions go/cmd/pservercleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package main
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put this function in the controller?

Copy link
Collaborator Author

@gongweibao gongweibao Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

监听的资源和功能都不一样。
我准备把scaler的功能完善之后把这个功能merge进去。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 如果只是kill pserver,没必要使用Informer接口注册controller。直接每隔几分钟扫一遍所有job就行,代码更简单。
  2. 没必要先单独实现一个controller之外的单独的二进制。都放在controller内部,只需要一个二进制就可以启动。尽量不要有“中间状态”或“临时的程序”

Copy link
Collaborator Author

@gongweibao gongweibao Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果只是kill pserver,没必要使用Informer接口注册controller。直接每隔几分钟扫一遍所有job就行,代码更简单。

job多的时候效率很差。

没必要先单独实现一个controller之外的单独的二进制。都放在controller内部,只需要一个二进制就可以启动。

监听的资源不是一样的。不是一个Generation的东西。

尽量不要有“中间状态”或“临时的程序”。

同意。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

监听的资源不是一样的。不是一个Generation的东西。

我理解,我们需要把现在Create/Delete Job的功能都移到controller中,所以自动回收PServer资源也在Controller,应该也是没问题的,相当于做了garbage collection。


import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

log "github.com/inconshreveable/log15"
)

type eventType int

const (
add eventType = iota
del
update
)

type event struct {
Type eventType
Job interface{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为何这里不是Job *batchv1.Job,创建event的时候把类型改过来可能更好:event{Type: x, Job: job.(*batchv1.Job)}

}

// Cleaner is a struct to clean pserver.
type Cleaner struct {
client *rest.RESTClient
clientset *kubernetes.Clientset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kubernetes Client可以使用 go/controller/cluster.go#Cluster

ticker *time.Ticker
eventCh chan event
jobs map[types.UID]*batchv1.Job
}

// NewCleaner gets cleaner struct.
func NewCleaner(c *rest.RESTClient, cs *kubernetes.Clientset) *Cleaner {
return &Cleaner{
client: c,
clientset: cs,
ticker: time.NewTicker(time.Second * 5),
eventCh: make(chan event),
jobs: make(map[types.UID]*batchv1.Job),
}
}

func (c *Cleaner) startWatch(ctx context.Context) error {
// TODO(gongwb): filer only paddle-job
source := cache.NewListWatchFromClient(
c.client,
"Jobs",
api.NamespaceAll,
fields.Everything())

_, informer := cache.NewInformer(
source,
&batchv1.Job{},

// resyncPeriod: Every resyncPeriod, all resources in
// the cache will retrigger events. Set to 0 to
// disable the resync.
0,

cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDel,
})

go informer.Run(ctx.Done())
return nil
}

// Run start to watch kubernetes events and do handlers.
func (c *Cleaner) Run(ctx context.Context) error {
err := c.startWatch(ctx)
if err != nil {
return err
}

go c.Monitor()

<-ctx.Done()
return ctx.Err()
}

func (c *Cleaner) onAdd(obj interface{}) {
c.eventCh <- event{Type: add, Job: obj}
}

func (c *Cleaner) onDel(obj interface{}) {
c.eventCh <- event{Type: del, Job: obj}
}

func (c *Cleaner) onUpdate(oldObj, newObj interface{}) {
c.eventCh <- event{Type: update, Job: newObj}
}

func getTrainerJobName(j *batchv1.Job) string {
m := j.ObjectMeta.Labels
if val, ok := m["paddle-job"]; ok {
return val
}

return ""
}

func cleanupReplicaSets(client *kubernetes.Clientset,
namespace string, l metav1.ListOptions) error {
rsList, err := client.ExtensionsV1beta1().ReplicaSets(namespace).List(l)
if err != nil {
return err
}

for _, rs := range rsList.Items {
err := client.ExtensionsV1beta1().ReplicaSets(namespace).Delete(rs.ObjectMeta.Name, nil)
if err != nil {
log.Error(fmt.Sprintf("delete rs namespace:%v rsname:%v err:%v", namespace, rs.Name, err))
}

log.Info(fmt.Sprintf("delete rs namespace:%v rsname:%v", namespace, rs.Name))

}
return nil
}

func cleanupPods(client *kubernetes.Clientset,
namespace string, l metav1.ListOptions) error {
podList, err := client.CoreV1().Pods(namespace).List(l)
if err != nil {
return err
}

for _, pod := range podList.Items {
err := client.CoreV1().Pods(namespace).Delete(pod.ObjectMeta.Name, nil)
if err != nil {
log.Error(fmt.Sprintf("delete pod namespace:%v podname:%v err:%v", namespace, pod.Name, err))
}

log.Info(fmt.Sprintf("delete pod namespace:%v podname:%v", namespace, pod.Name))
}
return nil
}

func (c *Cleaner) cleanupPserver(namespace, jobname string) {
cleanupReplicaSets(c.clientset, namespace,
metav1.ListOptions{LabelSelector: "paddle-job-pserver=" + jobname})
log.Info(fmt.Sprintf("delete pserver replicaset namespace:%s jobname:%s", namespace, jobname))

// wait to delete replicaset
time.Sleep(1 * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能否设置删除replica set的时候就自动删除对应的pod?这里好像有一些资料:https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的。研究一下。


cleanupPods(c.clientset, namespace,
metav1.ListOptions{LabelSelector: "paddle-job-pserver=" + jobname})
log.Info(fmt.Sprintf("delete pserver pods namespace:%s jobname:%s", namespace, jobname))
}

func (c *Cleaner) cleanup(j *batchv1.Job) {
jobname := getTrainerJobName(j)
if jobname == "" {
return
}

c.cleanupPserver(j.ObjectMeta.Namespace, jobname)
}

// Monitor monitors the cluster paddle-job resource.
func (c *Cleaner) Monitor() {
for {
select {
case <-c.ticker.C:
case e := <-c.eventCh:
switch e.Type {
case add:
j := e.Job.(*batchv1.Job)
// get only paddle-job, it's not the best method.
if getTrainerJobName(j) == "" {
break
}

log.Info(fmt.Sprintf("add jobs namespace:%v name:%v uid:%v",
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID))
c.jobs[j.UID] = j
case update: // process only complation
j := e.Job.(*batchv1.Job)

// not complete
if j.Status.CompletionTime == nil {
break
}

// if not in controll or completed already
if _, ok := c.jobs[j.UID]; !ok {
break
}

log.Info(fmt.Sprintf("complete jobs namespace:%v name:%v uid:%v",
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID))

c.cleanup(j)
delete(c.jobs, j.UID)
case del:
j := e.Job.(*batchv1.Job)
log.Info(fmt.Sprintf("delete jobs namespace:%v name:%v uid:%v",
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID))

// deleted already
if _, ok := c.jobs[j.UID]; !ok {
break
}

c.cleanup(j)
delete(c.jobs, j.UID)
default:
log.Error("unrecognized event", "event", e)
}
}
}
}
79 changes: 79 additions & 0 deletions go/cmd/pservercleaner/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"context"
"flag"
"fmt"

log "github.com/inconshreveable/log15"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
kubeconfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
logLevel := flag.String("log_level", "info", "Log level can be debug, info, warn, error, crit.")
flag.Parse()

lvl, err := log.LvlFromString(*logLevel)
if err != nil {
panic(err)
}

log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
)

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
panic(err)
}

// setup some optional configuration
configureClient(config)

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}

fmt.Printf("%v\n", config)
client, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}

ctx, cancelFunc := context.WithCancel(context.Background())
cleaner := NewCleaner(client, clientset)
if err != nil {
panic(err)
}

defer cancelFunc()
cleaner.Run(ctx)
}

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}

func configureClient(config *rest.Config) {
groupversion := schema.GroupVersion{
Group: "batch",
Version: "v1",
}

config.GroupVersion = &groupversion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
}
16 changes: 16 additions & 0 deletions go/cmd/pservercleaner/pservercleaner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: extensions/v1beta1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不需要一个独立的Yaml,和controller部署在起就好。

kind: Deployment
metadata:
name: pservercleaner
spec:
replicas: 1
template:
metadata:
labels:
name: pservercleaner
spec:
containers:
- name: pservercleaner
image: paddlepaddle/pservercleaner
imagePullPolicy: Never
command: ["pservercleaner"]