-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup pserver
resource after trainer
job end.
#515
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
FROM ubuntu:16.04 | ||
ADD pservercleaner /usr/bin | ||
CMD ["pservercleaner"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/pkg/api" | ||
batchv1 "k8s.io/client-go/pkg/apis/batch/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
log "github.com/inconshreveable/log15" | ||
) | ||
|
||
type eventType int | ||
|
||
const ( | ||
add eventType = iota | ||
del | ||
update | ||
) | ||
|
||
type event struct { | ||
Type eventType | ||
Job interface{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 为何这里不是 |
||
} | ||
|
||
// Cleaner is a struct to clean pserver. | ||
type Cleaner struct { | ||
client *rest.RESTClient | ||
clientset *kubernetes.Clientset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kubernetes Client可以使用 |
||
ticker *time.Ticker | ||
eventCh chan event | ||
jobs map[types.UID]*batchv1.Job | ||
} | ||
|
||
// NewCleaner gets cleaner struct. | ||
func NewCleaner(c *rest.RESTClient, cs *kubernetes.Clientset) *Cleaner { | ||
return &Cleaner{ | ||
client: c, | ||
clientset: cs, | ||
ticker: time.NewTicker(time.Second * 5), | ||
eventCh: make(chan event), | ||
jobs: make(map[types.UID]*batchv1.Job), | ||
} | ||
} | ||
|
||
func (c *Cleaner) startWatch(ctx context.Context) error { | ||
// TODO(gongwb): filer only paddle-job | ||
source := cache.NewListWatchFromClient( | ||
c.client, | ||
"Jobs", | ||
api.NamespaceAll, | ||
fields.Everything()) | ||
|
||
_, informer := cache.NewInformer( | ||
source, | ||
&batchv1.Job{}, | ||
|
||
// resyncPeriod: Every resyncPeriod, all resources in | ||
// the cache will retrigger events. Set to 0 to | ||
// disable the resync. | ||
0, | ||
|
||
cache.ResourceEventHandlerFuncs{ | ||
AddFunc: c.onAdd, | ||
UpdateFunc: c.onUpdate, | ||
DeleteFunc: c.onDel, | ||
}) | ||
|
||
go informer.Run(ctx.Done()) | ||
return nil | ||
} | ||
|
||
// Run start to watch kubernetes events and do handlers. | ||
func (c *Cleaner) Run(ctx context.Context) error { | ||
err := c.startWatch(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go c.Monitor() | ||
|
||
<-ctx.Done() | ||
return ctx.Err() | ||
} | ||
|
||
func (c *Cleaner) onAdd(obj interface{}) { | ||
c.eventCh <- event{Type: add, Job: obj} | ||
} | ||
|
||
func (c *Cleaner) onDel(obj interface{}) { | ||
c.eventCh <- event{Type: del, Job: obj} | ||
} | ||
|
||
func (c *Cleaner) onUpdate(oldObj, newObj interface{}) { | ||
c.eventCh <- event{Type: update, Job: newObj} | ||
} | ||
|
||
func getTrainerJobName(j *batchv1.Job) string { | ||
m := j.ObjectMeta.Labels | ||
if val, ok := m["paddle-job"]; ok { | ||
return val | ||
} | ||
|
||
return "" | ||
} | ||
|
||
func cleanupReplicaSets(client *kubernetes.Clientset, | ||
namespace string, l metav1.ListOptions) error { | ||
rsList, err := client.ExtensionsV1beta1().ReplicaSets(namespace).List(l) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, rs := range rsList.Items { | ||
err := client.ExtensionsV1beta1().ReplicaSets(namespace).Delete(rs.ObjectMeta.Name, nil) | ||
if err != nil { | ||
log.Error(fmt.Sprintf("delete rs namespace:%v rsname:%v err:%v", namespace, rs.Name, err)) | ||
} | ||
|
||
log.Info(fmt.Sprintf("delete rs namespace:%v rsname:%v", namespace, rs.Name)) | ||
|
||
} | ||
return nil | ||
} | ||
|
||
func cleanupPods(client *kubernetes.Clientset, | ||
namespace string, l metav1.ListOptions) error { | ||
podList, err := client.CoreV1().Pods(namespace).List(l) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, pod := range podList.Items { | ||
err := client.CoreV1().Pods(namespace).Delete(pod.ObjectMeta.Name, nil) | ||
if err != nil { | ||
log.Error(fmt.Sprintf("delete pod namespace:%v podname:%v err:%v", namespace, pod.Name, err)) | ||
} | ||
|
||
log.Info(fmt.Sprintf("delete pod namespace:%v podname:%v", namespace, pod.Name)) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *Cleaner) cleanupPserver(namespace, jobname string) { | ||
cleanupReplicaSets(c.clientset, namespace, | ||
metav1.ListOptions{LabelSelector: "paddle-job-pserver=" + jobname}) | ||
log.Info(fmt.Sprintf("delete pserver replicaset namespace:%s jobname:%s", namespace, jobname)) | ||
|
||
// wait to delete replicaset | ||
time.Sleep(1 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 能否设置删除replica set的时候就自动删除对应的pod?这里好像有一些资料:https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 好的。研究一下。 |
||
|
||
cleanupPods(c.clientset, namespace, | ||
metav1.ListOptions{LabelSelector: "paddle-job-pserver=" + jobname}) | ||
log.Info(fmt.Sprintf("delete pserver pods namespace:%s jobname:%s", namespace, jobname)) | ||
} | ||
|
||
func (c *Cleaner) cleanup(j *batchv1.Job) { | ||
jobname := getTrainerJobName(j) | ||
if jobname == "" { | ||
return | ||
} | ||
|
||
c.cleanupPserver(j.ObjectMeta.Namespace, jobname) | ||
} | ||
|
||
// Monitor monitors the cluster paddle-job resource. | ||
func (c *Cleaner) Monitor() { | ||
for { | ||
select { | ||
case <-c.ticker.C: | ||
case e := <-c.eventCh: | ||
switch e.Type { | ||
case add: | ||
j := e.Job.(*batchv1.Job) | ||
// get only paddle-job, it's not the best method. | ||
if getTrainerJobName(j) == "" { | ||
break | ||
} | ||
|
||
log.Info(fmt.Sprintf("add jobs namespace:%v name:%v uid:%v", | ||
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID)) | ||
c.jobs[j.UID] = j | ||
case update: // process only complation | ||
j := e.Job.(*batchv1.Job) | ||
|
||
// not complete | ||
if j.Status.CompletionTime == nil { | ||
break | ||
} | ||
|
||
// if not in controll or completed already | ||
if _, ok := c.jobs[j.UID]; !ok { | ||
break | ||
} | ||
|
||
log.Info(fmt.Sprintf("complete jobs namespace:%v name:%v uid:%v", | ||
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID)) | ||
|
||
c.cleanup(j) | ||
delete(c.jobs, j.UID) | ||
case del: | ||
j := e.Job.(*batchv1.Job) | ||
log.Info(fmt.Sprintf("delete jobs namespace:%v name:%v uid:%v", | ||
j.ObjectMeta.Namespace, j.ObjectMeta.Name, j.ObjectMeta.UID)) | ||
|
||
// deleted already | ||
if _, ok := c.jobs[j.UID]; !ok { | ||
break | ||
} | ||
|
||
c.cleanup(j) | ||
delete(c.jobs, j.UID) | ||
default: | ||
log.Error("unrecognized event", "event", e) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"flag" | ||
"fmt" | ||
|
||
log "github.com/inconshreveable/log15" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/runtime/serializer" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/pkg/api" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
) | ||
|
||
func main() { | ||
kubeconfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.") | ||
logLevel := flag.String("log_level", "info", "Log level can be debug, info, warn, error, crit.") | ||
flag.Parse() | ||
|
||
lvl, err := log.LvlFromString(*logLevel) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
log.Root().SetHandler( | ||
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)), | ||
) | ||
|
||
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster. | ||
config, err := buildConfig(*kubeconfig) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
// setup some optional configuration | ||
configureClient(config) | ||
|
||
clientset, err := kubernetes.NewForConfig(config) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
fmt.Printf("%v\n", config) | ||
client, err := rest.RESTClientFor(config) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
cleaner := NewCleaner(client, clientset) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
defer cancelFunc() | ||
cleaner.Run(ctx) | ||
} | ||
|
||
func buildConfig(kubeconfig string) (*rest.Config, error) { | ||
if kubeconfig != "" { | ||
return clientcmd.BuildConfigFromFlags("", kubeconfig) | ||
} | ||
return rest.InClusterConfig() | ||
} | ||
|
||
func configureClient(config *rest.Config) { | ||
groupversion := schema.GroupVersion{ | ||
Group: "batch", | ||
Version: "v1", | ||
} | ||
|
||
config.GroupVersion = &groupversion | ||
config.APIPath = "/apis" | ||
config.ContentType = runtime.ContentTypeJSON | ||
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
apiVersion: extensions/v1beta1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 不需要一个独立的Yaml,和controller部署在起就好。 |
||
kind: Deployment | ||
metadata: | ||
name: pservercleaner | ||
spec: | ||
replicas: 1 | ||
template: | ||
metadata: | ||
labels: | ||
name: pservercleaner | ||
spec: | ||
containers: | ||
- name: pservercleaner | ||
image: paddlepaddle/pservercleaner | ||
imagePullPolicy: Never | ||
command: ["pservercleaner"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put this function in the controller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
监听的资源和功能都不一样。
我准备把scaler的功能完善之后把这个功能merge进去。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job多的时候效率很差。
监听的资源不是一样的。不是一个Generation的东西。
同意。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我理解,我们需要把现在Create/Delete Job的功能都移到controller中,所以自动回收PServer资源也在Controller,应该也是没问题的,相当于做了garbage collection。