Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Add authentication for Marathon scheduler (#1871)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrcrawfo authored and objmagic committed May 30, 2017
1 parent e242f53 commit db9e08a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
public final class MarathonContext extends Context {
public static final String HERON_MARATHON_SCHEDULER_URI = "heron.marathon.scheduler.uri";
public static final String HERON_EXECUTOR_DOCKER_IMAGE = "heron.executor.docker.image";
public static final String HERON_MARATHON_SCHEDULER_AUTH_TOKEN =
"heron.marathon.scheduler.auth.token";

private MarathonContext() {
}
Expand All @@ -32,4 +34,8 @@ public static String getSchedulerURI(Config config) {
public static String getExecutorDockerImage(Config config) {
return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE);
}

public static String getSchedulerAuthToken(Config config) {
return config.getStringValue(HERON_MARATHON_SCHEDULER_AUTH_TOKEN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,35 @@ public class MarathonController {
private static final Logger LOG = Logger.getLogger(MarathonController.class.getName());

private final String marathonURI;
private final String marathonAuthToken;
private final String topologyName;
private final boolean isVerbose;

public MarathonController(
String marathonURI,
String marathonAuthToken,
String topologyName,
boolean isVerbose
) {
this.marathonURI = marathonURI;
this.marathonAuthToken = marathonAuthToken;
this.topologyName = topologyName;
this.isVerbose = isVerbose;
}

/**
* Kills a marathon app (topology)
*/
public boolean killTopology() {
// Setup Connection
String topologyURI = String.format("%s/v2/groups/%s", this.marathonURI, this.topologyName);
HttpURLConnection conn = NetworkUtils.getHttpConnection(topologyURI);

// Attach a token if there is one specified
if (this.marathonAuthToken != null) {
conn.setRequestProperty("Authorization", String.format("token=%s", this.marathonAuthToken));
}

if (conn == null) {
LOG.log(Level.SEVERE, "Failed to find marathon scheduler");
return false;
Expand All @@ -59,6 +71,9 @@ public boolean killTopology() {
if (success) {
LOG.log(Level.INFO, "Successfully killed topology");
return true;
} else if (NetworkUtils.checkHttpResponseCode(conn, HttpURLConnection.HTTP_UNAUTHORIZED)) {
LOG.log(Level.SEVERE, "Marathon requires authentication");
return false;
} else {
LOG.log(Level.SEVERE, "Failed to kill topology");
return false;
Expand All @@ -69,6 +84,10 @@ public boolean killTopology() {
}
}

/**
* Restarts a given marathon app (topology)
* @param appId ID of marathon app
*/
public boolean restartApp(int appId) {
if (appId == -1) {
// TODO (nlu): implement restart all
Expand All @@ -82,6 +101,11 @@ public boolean restartApp(int appId) {
String restartRequest = String.format("%s/v2/apps/%s/%d/restart",
this.marathonURI, this.topologyName, appId);
HttpURLConnection conn = NetworkUtils.getHttpConnection(restartRequest);

if (this.marathonAuthToken != null) {
conn.setRequestProperty("Authorization", String.format("token=%s", this.marathonAuthToken));
}

if (conn == null) {
LOG.log(Level.SEVERE, "Failed to find marathon scheduler");
return false;
Expand All @@ -101,6 +125,9 @@ public boolean restartApp(int appId) {
if (success) {
LOG.log(Level.INFO, "Successfully restarted container {0}", appId);
return true;
} else if (NetworkUtils.checkHttpResponseCode(conn, HttpURLConnection.HTTP_UNAUTHORIZED)) {
LOG.log(Level.SEVERE, "Marathon requires authentication");
return false;
} else {
LOG.log(Level.SEVERE, "Failed to restart container {0}", appId);
return false;
Expand All @@ -111,6 +138,10 @@ public boolean restartApp(int appId) {
}
}

/**
* Restarts a given marathon app (topology)
* @param appConf App configuration
*/
// submit a topology as a group, containers as apps in the group
public boolean submitTopology(String appConf) {
if (this.isVerbose) {
Expand All @@ -126,6 +157,12 @@ public boolean submitTopology(String appConf) {
// Setup Connection
String schedulerURI = String.format("%s/v2/groups", this.marathonURI);
HttpURLConnection conn = NetworkUtils.getHttpConnection(schedulerURI);

// Attach a token if there is one specified
if (this.marathonAuthToken != null) {
conn.setRequestProperty("Authorization", String.format("token=%s", this.marathonAuthToken));
}

if (conn == null) {
LOG.log(Level.SEVERE, "Failed to find marathon scheduler");
return false;
Expand All @@ -144,6 +181,9 @@ public boolean submitTopology(String appConf) {
if (success) {
LOG.log(Level.INFO, "Topology submitted successfully");
return true;
} else if (NetworkUtils.checkHttpResponseCode(conn, HttpURLConnection.HTTP_UNAUTHORIZED)) {
LOG.log(Level.SEVERE, "Marathon requires authentication");
return false;
} else {
LOG.log(Level.SEVERE, "Failed to submit topology");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void initialize(Config aConfig, Config aRuntime) {
protected MarathonController getController() {
return new MarathonController(
MarathonContext.getSchedulerURI(config),
MarathonContext.getSchedulerAuthToken(config),
Runtime.topologyName(runtime),
Context.verbose(config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@

import com.twitter.heron.spi.utils.NetworkUtils;


@RunWith(PowerMockRunner.class)
@PrepareForTest(NetworkUtils.class)
public class MarathonControllerTest {
private static final String MARATHON_URI = "http://marathon.uri:8080";
private static final String MARATHON_AUTH_TOKEN = "a_token";
private static final String TOPOLOGY_NAME = "topology_name";
private static final boolean IS_VERBOSE = true;

private static MarathonController controller;

@Before
public void setUp() throws Exception {
controller = Mockito.spy(new MarathonController(MARATHON_URI, TOPOLOGY_NAME, IS_VERBOSE));
controller = Mockito.spy(new MarathonController(MARATHON_URI, MARATHON_AUTH_TOKEN,
TOPOLOGY_NAME, IS_VERBOSE));
}

@After
Expand All @@ -66,7 +69,8 @@ public void testKillTopology() throws Exception {

// Failed to get connection
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(null).when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
Assert.assertFalse(controller.killTopology());
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
Expand Down Expand Up @@ -97,6 +101,31 @@ public void testKillTopology() throws Exception {
NetworkUtils.sendHttpDeleteRequest(Mockito.any(HttpURLConnection.class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());

// Failed authentication
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "sendHttpPostRequest",
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
PowerMockito.doReturn(false)
.when(NetworkUtils.class, "checkHttpResponseCode",
Mockito.any(HttpURLConnection.class), Mockito.anyInt());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "checkHttpResponseCode",
httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);
Assert.assertFalse(controller.killTopology());
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
NetworkUtils.sendHttpPostRequest(
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());
NetworkUtils.checkHttpResponseCode(httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);

// Success
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
Expand Down Expand Up @@ -124,7 +153,8 @@ public void testRestartApp() throws Exception {

// Failed to get connection
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(null).when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
Assert.assertFalse(controller.restartApp(appId));
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
Expand Down Expand Up @@ -167,6 +197,31 @@ public void testRestartApp() throws Exception {
Mockito.any(byte[].class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());

// Failed authentication
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "sendHttpPostRequest",
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
PowerMockito.doReturn(false)
.when(NetworkUtils.class, "checkHttpResponseCode",
Mockito.any(HttpURLConnection.class), Mockito.anyInt());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "checkHttpResponseCode",
httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);
Assert.assertFalse(controller.restartApp(appId));
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
NetworkUtils.sendHttpPostRequest(
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());
NetworkUtils.checkHttpResponseCode(httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);

// Success
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
Expand Down Expand Up @@ -200,7 +255,8 @@ public void testSubmitTopology() throws Exception {

// Failed to get connection
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(null).when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
Assert.assertFalse(controller.submitTopology(Mockito.anyString()));
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
Expand Down Expand Up @@ -243,6 +299,32 @@ public void testSubmitTopology() throws Exception {
Mockito.any(byte[].class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());

// Failed authentication
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
.when(NetworkUtils.class, "getHttpConnection", Mockito.anyString());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "sendHttpPostRequest",
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
PowerMockito.doReturn(false)
.when(NetworkUtils.class, "checkHttpResponseCode",
Mockito.any(HttpURLConnection.class), Mockito.anyInt());
PowerMockito.doReturn(true)
.when(NetworkUtils.class, "checkHttpResponseCode",
httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);
Assert.assertFalse(controller.submitTopology(appConf));
PowerMockito.verifyStatic();
NetworkUtils.getHttpConnection(Mockito.anyString());
NetworkUtils.sendHttpPostRequest(
Mockito.any(HttpURLConnection.class),
Mockito.anyString(),
Mockito.any(byte[].class));
NetworkUtils.checkHttpResponseCode(Mockito.any(HttpURLConnection.class), Mockito.anyInt());
NetworkUtils.checkHttpResponseCode(httpURLConnection, HttpURLConnection.HTTP_UNAUTHORIZED);


// Success
PowerMockito.spy(NetworkUtils.class);
PowerMockito.doReturn(httpURLConnection)
Expand Down
10 changes: 9 additions & 1 deletion website/content/docs/operators/deployment/schedulers/dcos.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ environments on AWS, this will be available at `<core.dcos_url>/service/marathon
`core.dcos_url` by executing `dcos config show` from your terminal if you have the DC/OS CLI
installed.

* `heron.marathon.scheduler.auth.token` --- Provides an auth token to use when submitting a topology
to a marathon instance that requires authentication. One can retrieve an auth token by logging in
to the cluster via the DC/OS CLI (`dcos auth login`) and then copying the token (`dcos_acs_token`)
that is stored in `~/.dcos/dcos.toml`

* `heron.scheduler.is.service` --- This config indicates whether the scheduler
is a service. In the case of Marathon, it should be set to `False`.

Expand Down Expand Up @@ -95,11 +100,14 @@ heron.directory.conf: "./heron-conf/"
# The URI of marathon scheduler
heron.marathon.scheduler.uri: "<core.dcos_url>/service/marathon"

# The token of the marathon scheduler
heron.marathon.scheduler.auth.token: "<auth_token>"

# Invoke the IScheduler as a library directly
heron.scheduler.is.service: False

# location of the core package
heron.package.core.uri: https://github.com/twitter/heron/releases/download/0.14.7/heron-core-0.14.7-ubuntu.tar.gz
heron.package.core.uri: https://github.com/twitter/heron/releases/download/0.14.7/heron-core-0.14.7-ubuntu.tar.gz

# docker repo for executor
heron.executor.docker.image: 'ndustrialio/heron-executor:jre8'
Expand Down

0 comments on commit db9e08a

Please sign in to comment.