Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
ISpoutDelegate and IBoltDelegate to implement IUpdatable (#1749)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill Graham authored and objmagic committed Mar 8, 2017
1 parent 15e0dba commit 5da5be6
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package com.twitter.heron.api.bolt;

import java.util.Map;
import java.util.logging.Logger;

import com.twitter.heron.api.exception.FailedException;
import com.twitter.heron.api.exception.ReportedFailedException;
import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;

public class BasicBoltExecutor implements IRichBolt {
public class BasicBoltExecutor implements IRichBolt, IUpdatable {
private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());

private static final long serialVersionUID = 7021447981762957626L;

private IBasicBolt bolt;
Expand Down Expand Up @@ -70,4 +74,14 @@ public void cleanup() {
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (bolt instanceof IUpdatable) {
((IUpdatable) bolt).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), bolt));
}
}
}
1 change: 1 addition & 0 deletions heron/examples/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ java_binary(
name='examples-unshaded',
srcs = glob(["**/*.java"]),
deps = [
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/storm/src/java:storm-compatibility-java",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package com.twitter.heron.examples;

import java.util.List;
import java.util.Map;

import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.common.basics.ByteAmount;

import backtype.storm.Config;
Expand Down Expand Up @@ -68,18 +70,15 @@ public static void main(String[] args) throws Exception {
}
}

public static class ExclamationBolt extends BaseRichBolt {
public static class ExclamationBolt extends BaseRichBolt implements IUpdatable {

private static final long serialVersionUID = 1184860508880121352L;
private long nItems;
private long startTime;

@Override
@SuppressWarnings("rawtypes")
public void prepare(
Map conf,
TopologyContext context,
OutputCollector collector) {
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
nItems = 0;
startTime = System.currentTimeMillis();
}
Expand All @@ -98,5 +97,28 @@ public void execute(Tuple tuple) {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declarer.declare(new Fields("word"));
}

/**
* Implementing this method is optional and only necessary if BOTH of the following are true:
*
* a.) you plan to dynamically scale your bolt/spout at runtime using 'heron update'.
* b.) you need to take action based on a runtime change to the component parallelism.
*
* Most bolts and spouts should be written to be unaffected by changes in their parallelism,
* but some must be aware of it. An example would be a spout that consumes a subset of queue
* partitions, which must be algorithmically divided amongst the total number of spouts.
* <P>
* Note that this method is from the IUpdatable Heron interface which does not exist in Storm.
* It is fine to implement IUpdatable along with other Storm interfaces, but implementing it
* will bind an otherwise generic Storm implementation to Heron.
*
* @param heronTopologyContext Heron topology context.
*/
@Override
public void update(com.twitter.heron.api.topology.TopologyContext heronTopologyContext) {
List<Integer> newTaskIds =
heronTopologyContext.getComponentTasks(heronTopologyContext.getThisComponentId());
System.out.println("Bolt updated with new topologyContext. New taskIds: " + newTaskIds);
}
}
}
2 changes: 1 addition & 1 deletion heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def _kill_processes(self, commands):
Log.info("Killing %s process with pid %d: %s" %
(process_info.name, process_info.pid, ' '.join(command)))
try:
process_info.process.kill()
process_info.process.terminate() # sends SIGTERM to process
except OSError, e:
if e.errno == 3: # No such process
Log.warn("Expected process %s with pid %d was not running, ignoring." %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package backtype.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import com.twitter.heron.api.topology.IUpdatable;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;

public class BasicBoltExecutor implements IRichBolt {
public class BasicBoltExecutor implements IRichBolt, IUpdatable {
private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());

private static final long serialVersionUID = 4359767045622072660L;
private IBasicBolt delegate;
private transient BasicOutputCollector collector;
Expand Down Expand Up @@ -70,4 +75,14 @@ public void cleanup() {
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package backtype.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import com.twitter.heron.api.topology.IUpdatable;

import backtype.storm.task.OutputCollectorImpl;
import backtype.storm.task.TopologyContext;
Expand All @@ -28,7 +31,9 @@
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());

private static final long serialVersionUID = -3717575342431064148L;
private IRichBolt delegate;
private TopologyContext topologyContextImpl;
Expand Down Expand Up @@ -70,4 +75,14 @@ public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDecla
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package backtype.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.topology.IUpdatable;

import backtype.storm.spout.SpoutOutputCollectorImpl;
import backtype.storm.task.TopologyContext;
Expand All @@ -29,7 +31,9 @@
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());

private static final long serialVersionUID = -4310232227720592316L;
private IRichSpout delegate;
private TopologyContext topologyContextImpl;
Expand Down Expand Up @@ -88,4 +92,14 @@ public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDecla
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;

public class BasicBoltExecutor implements IRichBolt {
import com.twitter.heron.api.topology.IUpdatable;

public class BasicBoltExecutor implements IRichBolt, IUpdatable {
private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());

private static final long serialVersionUID = 235217339000923019L;
private IBasicBolt delegate;
private transient BasicOutputCollector collector;
Expand Down Expand Up @@ -70,4 +75,14 @@ public void cleanup() {
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
package org.apache.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import org.apache.storm.task.OutputCollectorImpl;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.TupleImpl;

import com.twitter.heron.api.topology.IUpdatable;

/**
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());

private static final long serialVersionUID = 8350418148268852902L;
private IRichBolt delegate;
private TopologyContext topologyContextImpl;
Expand Down Expand Up @@ -70,4 +75,14 @@ public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDecla
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@
package org.apache.storm.topology;

import java.util.Map;
import java.util.logging.Logger;

import org.apache.storm.spout.SpoutOutputCollectorImpl;
import org.apache.storm.task.TopologyContext;

import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.topology.IUpdatable;

/**
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());

private static final long serialVersionUID = -1543996045558101339L;
private IRichSpout delegate;
private TopologyContext topologyContextImpl;
Expand Down Expand Up @@ -88,4 +92,14 @@ public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDecla
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}

@Override
public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
if (delegate instanceof IUpdatable) {
((IUpdatable) delegate).update(topologyContext);
} else {
LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
}
}
}

0 comments on commit 5da5be6

Please sign in to comment.