Skip to content

Commit

Permalink
Merge pull request #220 from mesos/jdef_mesos_026_proto_compat
Browse files Browse the repository at this point in the history
- mesos-0.26 protobuf compat
- authentication fixes (#216)
  • Loading branch information
jdef committed Jan 18, 2016
2 parents a457124 + 05cf673 commit 4a7554a
Show file tree
Hide file tree
Showing 39 changed files with 18,289 additions and 31,380 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ install:
script:
- ! gofmt -s -d . 2>&1 | read diff
- go build -v ./...
- go test -timeout 60s -race ./...
- go test -timeout 120s -race ./...
6 changes: 3 additions & 3 deletions detector/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func (s *Standalone) Cancel() {
s.cancelOnce.Do(func() { close(s.done) })
}

// poll for changes to master leadership via current leader's /state.json endpoint.
// poll for changes to master leadership via current leader's /state endpoint.
// we poll the `initial` leader, aborting if none was specified.
//
// TODO(jdef) follow the leader: change who we poll based on the prior leader
// TODO(jdef) somehow determine all masters in cluster from the state.json?
// TODO(jdef) somehow determine all masters in cluster from the /state?
//
func (s *Standalone) _poller(pf fetcherFunc) {
defer func() {
Expand Down Expand Up @@ -193,7 +193,7 @@ func (s *Standalone) _poller(pf fetcherFunc) {
// assumes that address is in host:port format
func (s *Standalone) _fetchPid(ctx context.Context, address string) (*upid.UPID, error) {
//TODO(jdef) need SSL support
uri := fmt.Sprintf("http://%s/state.json", address)
uri := fmt.Sprintf("http://%s/state", address)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions examples/persistent_scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"

Expand Down Expand Up @@ -327,7 +328,7 @@ func (sched *ExampleScheduler) ExecutorLost(_ sched.SchedulerDriver, eid *mesos.
log.Errorf("executor %q lost on slave %q code %d", eid, sid, code)
}
func (sched *ExampleScheduler) Error(_ sched.SchedulerDriver, err string) {
log.Errorf("Scheduler received error:", err)
log.Errorf("Scheduler received error: %v", err)
}

// ----------------------- func init() ------------------------- //
Expand Down Expand Up @@ -417,15 +418,19 @@ func main() {
cred := (*mesos.Credential)(nil)
if *mesosAuthPrincipal != "" {
fwinfo.Principal = proto.String(*mesosAuthPrincipal)
cred = &mesos.Credential{
Principal: proto.String(*mesosAuthPrincipal),
}
if *mesosAuthSecretFile != "" {
secret, err := ioutil.ReadFile(*mesosAuthSecretFile)
_, err := os.Stat(*mesosAuthSecretFile)
if err != nil {
log.Fatal(err)
log.Fatal("missing secret file: ", err.Error())
}
cred = &mesos.Credential{
Principal: proto.String(*mesosAuthPrincipal),
Secret: secret,
secret, err := ioutil.ReadFile(*mesosAuthSecretFile)
if err != nil {
log.Fatal("failed to read secret file: ", err.Error())
}
cred.Secret = proto.String(string(secret))
}
}
bindingAddress := parseIP(*address)
Expand Down
19 changes: 13 additions & 6 deletions examples/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"

Expand Down Expand Up @@ -190,7 +191,7 @@ func (sched *ExampleScheduler) ExecutorLost(_ sched.SchedulerDriver, eid *mesos.
log.Errorf("executor %q lost on slave %q code %d", eid, sid, code)
}
func (sched *ExampleScheduler) Error(_ sched.SchedulerDriver, err string) {
log.Errorf("Scheduler received error:", err)
log.Errorf("Scheduler received error: %v", err)
}

// ----------------------- func init() ------------------------- //
Expand Down Expand Up @@ -282,13 +283,19 @@ func main() {
cred := (*mesos.Credential)(nil)
if *mesosAuthPrincipal != "" {
fwinfo.Principal = proto.String(*mesosAuthPrincipal)
secret, err := ioutil.ReadFile(*mesosAuthSecretFile)
if err != nil {
log.Fatal(err)
}
cred = &mesos.Credential{
Principal: proto.String(*mesosAuthPrincipal),
Secret: secret,
}
if *mesosAuthSecretFile != "" {
_, err := os.Stat(*mesosAuthSecretFile)
if err != nil {
log.Fatal("missing secret file: ", err.Error())
}
secret, err := ioutil.ReadFile(*mesosAuthSecretFile)
if err != nil {
log.Fatal("failed to read secret file: ", err.Error())
}
cred.Secret = proto.String(string(secret))
}
}
bindingAddress := parseIP(*address)
Expand Down
34 changes: 8 additions & 26 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"testing"

"github.com/mesos/mesos-go/healthchecker"
"github.com/mesos/mesos-go/mesosproto"
util "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/messenger"
Expand Down Expand Up @@ -81,8 +80,7 @@ func (e *testExecutorDriver) setConnected(b bool) {

func createTestExecutorDriver(t *testing.T) (
*testExecutorDriver,
*messenger.MockedMessenger,
*healthchecker.MockedHealthChecker) {
*messenger.MockedMessenger) {

exec := NewMockedExecutor()
exec.On("Error").Return(nil)
Expand All @@ -96,12 +94,8 @@ func createTestExecutorDriver(t *testing.T) (
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)

checker := healthchecker.NewMockedHealthChecker()
checker.On("Start").Return()
checker.On("Stop").Return()

driver.messenger = messenger
return &testExecutorDriver{driver}, messenger, checker
return &testExecutorDriver{driver}, messenger
}

func TestExecutorDriverStartFailedToParseEnvironment(t *testing.T) {
Expand Down Expand Up @@ -176,10 +170,6 @@ func TestExecutorDriverStartSucceed(t *testing.T) {
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)

checker := healthchecker.NewMockedHealthChecker()
checker.On("Start").Return()
checker.On("Stop").Return()

assert.False(t, driver.Running())
status, err := driver.Start()
assert.True(t, driver.Running())
Expand Down Expand Up @@ -208,10 +198,6 @@ func TestExecutorDriverRun(t *testing.T) {
driver.messenger = messenger
assert.False(t, driver.Running())

checker := healthchecker.NewMockedHealthChecker()
checker.On("Start").Return()
checker.On("Stop").Return()

ch := make(chan struct{})
go func() {
defer close(ch)
Expand Down Expand Up @@ -242,10 +228,6 @@ func TestExecutorDriverJoin(t *testing.T) {
driver.messenger = messenger
assert.False(t, driver.Running())

checker := healthchecker.NewMockedHealthChecker()
checker.On("Start").Return()
checker.On("Stop").Return()

stat, err := driver.Start()
assert.NoError(t, err)
assert.True(t, driver.Running())
Expand All @@ -264,7 +246,7 @@ func TestExecutorDriverJoin(t *testing.T) {

func TestExecutorDriverAbort(t *testing.T) {
statusChan := make(chan mesosproto.Status)
driver, messenger, _ := createTestExecutorDriver(t)
driver, messenger := createTestExecutorDriver(t)

assert.False(t, driver.Running())
stat, err := driver.Start()
Expand Down Expand Up @@ -305,7 +287,7 @@ func TestExecutorDriverAbort(t *testing.T) {

func TestExecutorDriverStop(t *testing.T) {
statusChan := make(chan mesosproto.Status)
driver, messenger, _ := createTestExecutorDriver(t)
driver, messenger := createTestExecutorDriver(t)

assert.False(t, driver.Running())
stat, err := driver.Start()
Expand Down Expand Up @@ -345,7 +327,7 @@ func TestExecutorDriverStop(t *testing.T) {

func TestExecutorDriverSendStatusUpdate(t *testing.T) {

driver, _, _ := createTestExecutorDriver(t)
driver, _ := createTestExecutorDriver(t)

stat, err := driver.Start()
assert.NoError(t, err)
Expand All @@ -364,7 +346,7 @@ func TestExecutorDriverSendStatusUpdate(t *testing.T) {

func TestExecutorDriverSendStatusUpdateStaging(t *testing.T) {

driver, _, _ := createTestExecutorDriver(t)
driver, _ := createTestExecutorDriver(t)
stat, err := driver.Start()
assert.NoError(t, err)
assert.Equal(t, mesosproto.Status_DRIVER_RUNNING, stat)
Expand All @@ -382,7 +364,7 @@ func TestExecutorDriverSendStatusUpdateStaging(t *testing.T) {

func TestExecutorDriverSendFrameworkMessage(t *testing.T) {

driver, _, _ := createTestExecutorDriver(t)
driver, _ := createTestExecutorDriver(t)

stat, err := driver.SendFrameworkMessage("failed")
assert.Error(t, err)
Expand All @@ -398,7 +380,7 @@ func TestExecutorDriverSendFrameworkMessage(t *testing.T) {
}

func TestStatusUpdateAckRace_Issue103(t *testing.T) {
driver, _, _ := createTestExecutorDriver(t)
driver, _ := createTestExecutorDriver(t)
_, err := driver.Start()
assert.NoError(t, err)

Expand Down
14 changes: 9 additions & 5 deletions healthchecker/slave_health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type SlaveHealthChecker struct {
slaveUPID *upid.UPID
tr *http.Transport
client *http.Client
threshold int32
checkDuration time.Duration
continuousUnhealthyCount int32
threshold int32 // marked unhealthy once continuousUnhealthCount is greater than this
checkDuration time.Duration // perform the check at this interval
continuousUnhealthyCount int32 // marked unhealthy when this exceeds threshold
stop chan struct{}
ch chan time.Time
paused bool
Expand Down Expand Up @@ -131,6 +131,10 @@ func (s *SlaveHealthChecker) Stop() {
close(s.stop)
}

type errHttp struct{ StatusCode int }

func (e *errHttp) Error() string { return fmt.Sprintf("http error code: %d", e.StatusCode) }

func (s *SlaveHealthChecker) doCheck(pid upid.UPID) {
unhealthy := false
path := fmt.Sprintf("http://%s:%s/%s/health", pid.Host, pid.Port, pid.ID)
Expand All @@ -142,7 +146,7 @@ func (s *SlaveHealthChecker) doCheck(pid upid.UPID) {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http status error: %v\n", resp.StatusCode)
return &errHttp{resp.StatusCode}
}
return nil
})
Expand All @@ -152,7 +156,7 @@ func (s *SlaveHealthChecker) doCheck(pid upid.UPID) {
default:
}
if err != nil {
log.Errorf("Failed to request the health path: %v\n", err)
log.Errorf("Failed to request the health path: %v", err)
unhealthy = true
}
if unhealthy {
Expand Down
19 changes: 12 additions & 7 deletions healthchecker/slave_health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func (t *thresholdMonitor) incAndTest() bool {
// blockedServer replies only threshold times, after that
// it will block.
type blockedServer struct {
th *thresholdMonitor
ch chan struct{}
th *thresholdMonitor
ch chan struct{}
stopOnce sync.Once
}

func newBlockedServer(threshold int) *blockedServer {
Expand All @@ -70,7 +71,7 @@ func (s *blockedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (s *blockedServer) stop() {
close(s.ch)
s.stopOnce.Do(func() { close(s.ch) })
}

// eofServer will close the connection after it replies for threshold times.
Expand All @@ -95,7 +96,7 @@ func (s *eofServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn.Close()
}

// errorStatusCodeServer will reply error status code (e.g. 503) after the
// errorStatusCodeServer will reply error status code (e.g. 503) after
// it replies for threhold time.
type errorStatusCodeServer struct {
th *thresholdMonitor
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestSlaveHealthCheckerFailedOnBlockedSlave(t *testing.T) {
ts := httptest.NewUnstartedServer(s)
ts.Start()
defer ts.Close()
defer s.stop()

upid, err := upid.Parse(fmt.Sprintf("slave@%s", ts.Listener.Addr().String()))
assert.NoError(t, err)
Expand All @@ -169,15 +171,14 @@ func TestSlaveHealthCheckerFailedOnBlockedSlave(t *testing.T) {

select {
case <-time.After(time.Second):
s.stop()
t.Fatal("timeout")
t.Error("timeout")
case <-ch:
s.stop()
assert.True(t, atomic.LoadInt32(&s.th.cnt) > 10)
}

// TODO(jdef) hack: this sucks, but there's a data race in httptest's handler when Close()
// and ServeHTTP() are invoked (WaitGroup DATA RACE). Sleeping here to attempt to avoid that.
// I think this is supposed to be fixed in go1.6
time.Sleep(5 * time.Second)
}

Expand Down Expand Up @@ -224,6 +225,8 @@ func TestSlaveHealthCheckerFailedOnErrorStatusSlave(t *testing.T) {
}

func TestSlaveHealthCheckerSucceed(t *testing.T) {
t.Skip("skipping known flaky test (fails on busy CI servers, should use a fake clock)")

s := new(goodServer)
ts := httptest.NewUnstartedServer(s)
ts.Start()
Expand All @@ -245,6 +248,8 @@ func TestSlaveHealthCheckerSucceed(t *testing.T) {
}

func TestSlaveHealthCheckerPartitonedSlave(t *testing.T) {
t.Skip("skipping known flaky test (fails on busy CI servers, should use a fake clock)")

s := newPartitionedServer(5, 9)
ts := httptest.NewUnstartedServer(s)
ts.Start()
Expand Down
11 changes: 9 additions & 2 deletions mesosproto/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
all: *.proto
protoc --proto_path=${GOPATH}/src:${GOPATH}/src/github.com/gogo/protobuf/protobuf:. --gogo_out=. *.proto
PROTO_PATH := ${GOPATH}/src:.
PROTO_PATH := ${PROTO_PATH}:${GOPATH}/src/github.com/gogo/protobuf/protobuf
PROTO_PATH := ${PROTO_PATH}:${GOPATH}/src/github.com/gogo/protobuf/gogoproto

.PHONY: all

all:
protoc --proto_path=${PROTO_PATH} --gogo_out=. *.proto
protoc --proto_path=${PROTO_PATH} --gogo_out=. ./scheduler/*.proto
Loading

0 comments on commit 4a7554a

Please sign in to comment.