Skip to content


Create pod instead of job (kubeflow#344)
Browse files Browse the repository at this point in the history
This PR is a part of kubeflow#325:

rename jobName() to genName()
create Pod instead of Job

TODOs (in another PR):

use controller.PodControlInterface and CreatePodsWithControllerRef to create Pod
Listen Pod CRUD and update TFJob status which descried in kubeflow#314
  • Loading branch information
ScorpioCPH authored and Jiayu Liu committed Mar 7, 2018
1 parent e63c759 commit de82ac0
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 137 deletions.
297 changes: 179 additions & 118 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (

log ""
batch ""
log ""
k8s_errors ""
meta_v1 ""
Expand Down Expand Up @@ -118,123 +116,97 @@ func (s *TFReplicaSet) Labels() KubernetesLabels {

func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

// Create the service.
service := &v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.jobName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
Spec: v1.ServiceSpec{
Selector: taskLabels,
Ports: []v1.ServicePort{
Name: "tf-port",
Port: *s.Spec.TFPort,

log.Infof("Creating Service: %v", service.ObjectMeta.Name)
createdService, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)
// Create services
err := s.SyncServices()
if err != nil {
return err

// If the job already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service %v already exists.", s.jobName(index))
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating service %v returned error.", createdService.ObjectMeta.Name), err})
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created service: %v", createdService.Name)
// Create pods
return s.SyncPods()

// Configure the TFCONFIG environment variable.
tfConfig := TFConfig{
Cluster: s.Job.ClusterSpec(),
Task: TaskSpec{
Type: strings.ToLower(string(s.Spec.TFReplicaType)),
Index: int(index),
// CreateServiceWithIndex will create a new service with specify index
func (s *TFReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error) {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

// Create the service.
service := &v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.genName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
// We need to set environment to cloud otherwise it will default to local which isn't what we want.
Environment: "cloud",

tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return err

// Make a copy of the template because we will modify it below. .
newPodSpecTemplate := s.Spec.Template.DeepCopy()

newJ := &batch.Job{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.jobName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
Spec: v1.ServiceSpec{
Selector: taskLabels,
Ports: []v1.ServicePort{
Name: "tf-port",
Port: *s.Spec.TFPort,
Spec: batch.JobSpec{
Completions: proto.Int32(1),
Parallelism: proto.Int32(1),
Template: *newPodSpecTemplate,

if newJ.Spec.Template.ObjectMeta.Labels == nil {
newJ.Spec.Template.ObjectMeta.Labels = make(map[string]string)

// Pods need to be tagged with the labels.
for k, v := range taskLabels {
newJ.Spec.Template.ObjectMeta.Labels[k] = v
log.Infof("Creating service: %v", service.ObjectMeta.Name)
return s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)

// Add TF_CONFIG environment variable.
for i, _ := range newJ.Spec.Template.Spec.Containers {
// We can't get c in the loop variable because that would be by value so our modifications
// wouldn't have any effect.
c := &newJ.Spec.Template.Spec.Containers[i]
if tfv1alpha1.ContainerName(c.Name) != tfv1alpha1.TENSORFLOW {
if len(c.Env) == 0 {
c.Env = make([]v1.EnvVar, 0)
c.Env = append(c.Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: string(tfConfigJson),
// CreatePodWithIndex will create a new pod with specify index
func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

pod := &v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.genPodName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
Spec: *s.Spec.Template.Spec.DeepCopy(),

log.Infof("Creating Job: %v", newJ.ObjectMeta.Name)
createdJob, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Create(newJ)
// Configure the TFCONFIG environment variable.
tfConfig := TFConfig{
Cluster: s.Job.ClusterSpec(),
Task: TaskSpec{
Type: strings.ToLower(string(s.Spec.TFReplicaType)),
Index: int(index),
// We need to set environment to cloud otherwise it will default to local which isn't what we want.
Environment: "cloud",

// If the job already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("%v already exists.", s.jobName(index))
tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return nil, err

} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Job %v returned error.", createdJob.ObjectMeta.Name), err})
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created job: %v", createdJob.Name)
// Add TF_CONFIG environment variable.
for i, _ := range pod.Spec.Containers {
// We can't get c in the loop variable because that would be by value so our modifications
// wouldn't have any effect.
c := &pod.Spec.Containers[i]
if tfv1alpha1.ContainerName(c.Name) != tfv1alpha1.TENSORFLOW {
if len(c.Env) == 0 {
c.Env = make([]v1.EnvVar, 0)
c.Env = append(c.Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: string(tfConfigJson),
return nil

log.Infof("Creating pod: %v", pod.ObjectMeta.Name)
return s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(pod)

// Delete deletes the replicas
Expand All @@ -250,8 +222,8 @@ func (s *TFReplicaSet) Delete() error {
LabelSelector: selector,

log.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)
log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the jobs; %v", err)
Expand All @@ -270,11 +242,11 @@ func (s *TFReplicaSet) Delete() error {
// Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ {
log.Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(index), &meta_v1.DeleteOptions{})
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{})

if err != nil {
log.Errorf("Error deleting service %v; %v", s.jobName(index), err)
log.Errorf("Error deleting service %v; %v", s.genName(index), err)
failures = true
Expand Down Expand Up @@ -304,7 +276,6 @@ func (s *TFReplicaSet) Delete() error {

// replicaStatusFromPodList returns a status from a list of pods for a job.
func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1alpha1.ReplicaState {
log.Infof("Get replicaStatus from PodList: %v", util.Pformat(l))
var latest *v1.Pod
for _, i := range l.Items {
if latest == nil {
Expand Down Expand Up @@ -359,13 +330,13 @@ func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1a

func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState {
j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.jobName(index), meta_v1.GetOptions{})
p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})

if err != nil {
return tfv1alpha1.ReplicaStateUnknown

if j.Status.Succeeded >= 1 {
if v1.PodSucceeded == p.Status.Phase {
return tfv1alpha1.ReplicaStateSucceeded

Expand Down Expand Up @@ -436,14 +407,104 @@ func (s *TFReplicaSet) GetStatus() (tfv1alpha1.TFReplicaStatus, error) {
return status, nil

func (s *TFReplicaSet) jobName(index int32) string {
// SyncPods will try to check current pods for this TFReplicaSet and try to make it as desired.
func (s *TFReplicaSet) SyncPods() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {

// Label to get all pods of this TFReplicaType + index
labels := s.Labels()
labels["task_index"] = fmt.Sprintf("%v", index)

labelSelector, err := labels.ToSelector()
if err != nil {
return err

// Filter the unactive pods
fieldSelector := "status.phase!=" + string(v1.PodFailed) +

options := meta_v1.ListOptions{
LabelSelector: labelSelector,
FieldSelector: fieldSelector,

// List to get pods
pl, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(options)

if len(pl.Items) == 0 {
log.Infof("Pod not found, create new one.")
// Create the pod
createdPod, err := s.CreatePodWithIndex(index)

// If the pod already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating pod %v returned error.", createdPod.ObjectMeta.Name), err})

s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created pod: %v", createdPod.Name)

if err != nil {
// TODO: handing this error

return nil

// SyncServices will try to check current services for this TFReplicaSet and try to make it as desired.
func (s *TFReplicaSet) SyncServices() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
_, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})
if err != nil && k8s_errors.IsNotFound(err) {
log.Infof("Service: %v not found, create new one.", s.genName(index))
// Create the service
createdService, err := s.CreateServiceWithIndex(index)

// If the service already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service: %v already exists.", s.genName(index))
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Service %v returned error.", createdService.ObjectMeta.Name), err})

s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created Service: %v", createdService.Name)

if err != nil {
// TODO: handing this error

return nil

func (s *TFReplicaSet) genName(index int32) string {
// Truncate tfjob name to 40 characters
// The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters
// Thus jobname(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces
// Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces
// See
return fmt.Sprintf("%v-%v-%v-%v", fmt.Sprintf("%.40s", s.Job.job.ObjectMeta.Name), strings.ToLower(string(s.Spec.TFReplicaType)), s.Job.job.Spec.RuntimeId, index)

func (s *TFReplicaSet) genPodName(index int32) string {
// Generate a new pod name with random string
return s.genName(index) + "-" + util.RandString(5)

func (s *TFReplicaSet) defaultPSConfigMapName() string {
return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId)

0 comments on commit de82ac0

Please sign in to comment.