Skip to content

Commit

Permalink
status: Add test cases for failure (#643)
Browse files Browse the repository at this point in the history
* pkg: Update

Signed-off-by: Ce Gao <[email protected]>

* status: Add unit test when pod is failed

Signed-off-by: Ce Gao <[email protected]>

* status: Test chief and ps/worker

Signed-off-by: Ce Gao <[email protected]>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Jun 13, 2018
1 parent 2456e44 commit a310cb9
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/controller.v2/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (tc *TFJobController) reconcilePods(
}
}

return tc.updateStatus(tfjob, rtype, replicas)
return updateStatus(tfjob, rtype, replicas)
}

// getPodSlices returns a slice, which element is the slice of pod.
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller.v2/controller_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

// updateStatus updates the status of the tfjob.
func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType, replicas int) error {
func updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType, replicas int) error {
// Expect to have `replicas - succeeded` pods alive.
expected := replicas - int(tfjob.Status.TFReplicaStatuses[rtype].Succeeded)
running := int(tfjob.Status.TFReplicaStatuses[rtype].Active)
Expand All @@ -52,7 +52,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
if rtype == tfv1alpha2.TFReplicaTypeChief {
if running > 0 {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -62,7 +62,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
now := metav1.Now()
tfjob.Status.CompletionTime = &now
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -72,7 +72,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -85,7 +85,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
// Some workers are still running, leave a running condition.
if running > 0 {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -97,7 +97,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
now := metav1.Now()
tfjob.Status.CompletionTime = &now
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -107,7 +107,7 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha
// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -126,7 +126,7 @@ func (tc *TFJobController) updateTFJobStatus(tfjob *tfv1alpha2.TFJob) error {
}

// updateTFJobConditions updates the conditions of the given tfjob.
func (tc *TFJobController) updateTFJobConditions(tfjob *tfv1alpha2.TFJob, conditionType tfv1alpha2.TFJobConditionType, reason, message string) error {
func updateTFJobConditions(tfjob *tfv1alpha2.TFJob, conditionType tfv1alpha2.TFJobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(&tfjob.Status, condition)
return nil
Expand Down
303 changes: 303 additions & 0 deletions pkg/controller.v2/controller_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package controller provides a Kubernetes controller for a TFJob resource.
package controller

import (
"testing"

"k8s.io/api/core/v1"

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
)

func TestFailed(t *testing.T) {
tfJob := newTFJob(3, 0)
initializeTFReplicaStatuses(tfJob, tfv1alpha2.TFReplicaTypeWorker)
pod := newBasePod("pod", tfJob, t)
pod.Status.Phase = v1.PodFailed
updateTFJobReplicaStatuses(tfJob, tfv1alpha2.TFReplicaTypeWorker, pod)
if tfJob.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Failed != 1 {
t.Errorf("Failed to set the failed to 1")
}
err := updateStatus(tfJob, tfv1alpha2.TFReplicaTypeWorker, 3)
if err != nil {
t.Errorf("Expected error %v to be nil", err)
}
found := false
for _, condition := range tfJob.Status.Conditions {
if condition.Type == tfv1alpha2.TFJobFailed {
found = true
}
}
if !found {
t.Errorf("Failed condition is not found")
}
}

func TestStatus(t *testing.T) {
type testCase struct {
description string
tfJob *tfv1alpha2.TFJob

expectedFailedPS int32
expectedSucceededPS int32
expectedActivePS int32

expectedFailedWorker int32
expectedSucceededWorker int32
expectedActiveWorker int32

expectedFailedChief int32
expectedSucceededChief int32
expectedActiveChief int32

expectedType tfv1alpha2.TFJobConditionType
}

testCases := []testCase{
testCase{
description: "Chief worker is succeeded",
tfJob: newTFJobWithChief(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 1,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 1,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobSucceeded,
},
testCase{
description: "Chief worker is running",
tfJob: newTFJobWithChief(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 0,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 1,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "Chief worker is failed",
tfJob: newTFJobWithChief(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 0,
expectedActiveWorker: 0,
expectedFailedChief: 1,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "(No chief worker) Worker is failed",
tfJob: newTFJob(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 1,
expectedSucceededWorker: 0,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "(No chief worker) Worker is succeeded",
tfJob: newTFJob(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 1,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobSucceeded,
},
testCase{
description: "(No chief worker) Worker is running",
tfJob: newTFJob(1, 0),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 0,
expectedActiveWorker: 1,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "(No chief worker) 2 workers are succeeded, 2 workers are active",
tfJob: newTFJob(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 0,
expectedSucceededWorker: 2,
expectedActiveWorker: 2,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "(No chief worker) 2 workers are running, 2 workers are failed",
tfJob: newTFJob(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 2,
expectedSucceededWorker: 0,
expectedActiveWorker: 2,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "(No chief worker) 2 workers are succeeded, 2 workers are failed",
tfJob: newTFJob(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 2,
expectedSucceededWorker: 2,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "Chief is running, workers are failed",
tfJob: newTFJobWithChief(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 4,
expectedSucceededWorker: 0,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 1,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "Chief is running, workers are succeeded",
tfJob: newTFJobWithChief(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 0,
expectedSucceededWorker: 4,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 1,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "Chief is failed, workers are succeeded",
tfJob: newTFJobWithChief(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 0,
expectedSucceededWorker: 4,
expectedActiveWorker: 0,
expectedFailedChief: 1,
expectedSucceededChief: 0,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "Chief is succeeded, workers are failed",
tfJob: newTFJobWithChief(4, 2),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 2,
expectedFailedWorker: 4,
expectedSucceededWorker: 0,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 1,
expectedActiveChief: 0,
expectedType: tfv1alpha2.TFJobSucceeded,
},
}

for _, c := range testCases {
initializeTFReplicaStatuses(c.tfJob, tfv1alpha2.TFReplicaTypeWorker)
initializeTFReplicaStatuses(c.tfJob, tfv1alpha2.TFReplicaTypeChief)
initializeTFReplicaStatuses(c.tfJob, tfv1alpha2.TFReplicaTypePS)

setStatusForTest(c.tfJob, tfv1alpha2.TFReplicaTypePS, c.expectedFailedPS, c.expectedSucceededPS, c.expectedActivePS, t)
setStatusForTest(c.tfJob, tfv1alpha2.TFReplicaTypeWorker, c.expectedFailedWorker, c.expectedSucceededWorker, c.expectedActiveWorker, t)
setStatusForTest(c.tfJob, tfv1alpha2.TFReplicaTypeChief, c.expectedFailedChief, c.expectedSucceededChief, c.expectedActiveChief, t)

if _, ok := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeChief]; ok {
err := updateStatus(c.tfJob, tfv1alpha2.TFReplicaTypeChief, 1)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
} else {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].Replicas
err := updateStatus(c.tfJob, tfv1alpha2.TFReplicaTypeWorker, int(*replicas))
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
}
found := false
for _, condition := range c.tfJob.Status.Conditions {
if condition.Type == c.expectedType {
found = true
}
}
if !found {
t.Errorf("%s: Condition %s is not found", c.description, c.expectedType)
}
}
}

func setStatusForTest(tfJob *tfv1alpha2.TFJob, typ tfv1alpha2.TFReplicaType, failed, succeeded, active int32, t *testing.T) {
pod := newBasePod("pod", tfJob, t)
var i int32
for i = 0; i < failed; i++ {
pod.Status.Phase = v1.PodFailed
updateTFJobReplicaStatuses(tfJob, typ, pod)
}
for i = 0; i < succeeded; i++ {
pod.Status.Phase = v1.PodSucceeded
updateTFJobReplicaStatuses(tfJob, typ, pod)
}
for i = 0; i < active; i++ {
pod.Status.Phase = v1.PodRunning
updateTFJobReplicaStatuses(tfJob, typ, pod)
}
}
2 changes: 1 addition & 1 deletion pkg/controller.v2/controller_tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (tc *TFJobController) addTFJob(obj interface{}) {
log.Info(msg)

// Add a created condition.
err = tc.updateTFJobConditions(tfJob, tfv1alpha2.TFJobCreated, tfJobCreatedReason, msg)
err = updateTFJobConditions(tfJob, tfv1alpha2.TFJobCreated, tfJobCreatedReason, msg)
if err != nil {
log.Infof("Append tfJob condition error: %v", err)
return
Expand Down
Loading

0 comments on commit a310cb9

Please sign in to comment.