Skip to content

Commit 34fe890

Browse files
author
rezra3
committed
codeine-237 add SERVER_HOST and SERVER_PORT env vars to commands - fixed build
1 parent 399466b commit 34fe890

File tree

1 file changed

+157
-136
lines changed

1 file changed

+157
-136
lines changed
Lines changed: 157 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package codeine.nodes;
22

3+
import codeine.jsons.global.GlobalConfigurationJsonStore;
34
import java.net.InetAddress;
45
import java.util.List;
56
import java.util.Map;
@@ -33,141 +34,161 @@
3334
import com.google.common.collect.Maps;
3435
import com.google.common.collect.Sets;
3536

36-
public class NodesRunner implements Task{
37-
38-
private static final Logger log = Logger.getLogger(NodesRunner.class);
39-
40-
private static final long NODE_MONITOR_INTERVAL = TimeUnit.SECONDS.toMillis(29);
41-
public static final long NODE_RUNNER_INTERVAL = TimeUnit.HOURS.toMillis(1);
42-
43-
@Inject private IConfigurationManager configurationManager;
44-
@Inject private PathHelper pathHelper;
45-
@Inject private PeerStatus peerStatus;
46-
@Inject private MailSender mailSender;
47-
@Inject private NotificationDeliverToDatabase notificationDeliverToMongo;
48-
@Inject private NodesManager nodesManager;
49-
@Inject private SnoozeKeeper snoozeKeeper;
50-
private Map<String, Map<NodeInfo, PeriodicExecuter>> executers = Maps.newHashMap();
51-
@Inject private PeerStatusChangedUpdater mongoPeerStatusUpdater;
52-
@Inject private CollectorsRunnerFactory collectorsRunnerFactory;
53-
54-
@Override
55-
public synchronized void run() {
56-
InetAddress localHost = InetUtils.getLocalHost();
57-
log.info("NodeRunner is starting on host " + localHost.getHostName() + " " + localHost.getCanonicalHostName());
58-
log.info("NodeRunner is starting " + this + " with executers " + executers);
59-
Set<String> removedProjects = Sets.newHashSet(executers.keySet());
60-
for (ProjectJson project : getProjects()) {
61-
removedProjects.remove(project.name());
62-
try {
63-
boolean hasNodes = startStopExecutorsForProject(project);
64-
if (!hasNodes) {
65-
cleanupProject(project.name());
66-
}
67-
} catch (Exception e) {
68-
log.error("failed startStopExecutorsForProject for project " + project.name(), e);
69-
}
70-
}
71-
for (String project : removedProjects) {
72-
try {
73-
stopNodes(project, executers.get(project));
74-
cleanupProject(project);
75-
log.info("removed project " + project);
76-
} catch (Exception e) {
77-
log.error("failed to stop nodes for project " + project, e);
78-
}
79-
}
80-
}
81-
82-
/**
83-
* assuming nodes already stopped
84-
*/
85-
private void cleanupProject(String project) {
86-
log.info("cleanupProject " + project);
87-
executers.remove(project);
88-
peerStatus.removeProject(project);
89-
}
90-
91-
private void stop(PeriodicExecuter e) {
92-
log.info("stopping 1executor " + e.name());
93-
e.stopWhenPossible();
94-
}
95-
96-
private boolean startStopExecutorsForProject(ProjectJson project) {
97-
Map<NodeInfo, PeriodicExecuter> currentNodes = getCurrentNodes(project);
98-
log.info("project: " + project.name() + " currentProjectExecutors: " + currentNodes.keySet());
99-
SelectedNodes selectedNodes;
100-
try {
101-
selectedNodes = new NodesSelector(currentNodes, getNodes(project)).selectStartStop();
102-
} catch (Exception e) {
103-
log.error("failed to select nodes for project " + project.name() + " will leave old nodes " + currentNodes, e);
104-
return !currentNodes.isEmpty();
105-
}
106-
log.info("selectedNodes: " + selectedNodes);
107-
stopNodes(project.name(), selectedNodes.nodesToStop());
108-
Map<NodeInfo, PeriodicExecuter> newProjectExecutors = selectedNodes.existingProjectExecutors();
109-
for (NodeInfo nodeJson : selectedNodes.nodesToStart()) {
110-
log.info("start exec1 monitoring node " + nodeJson + " in project " + project.name());
111-
try {
112-
PeriodicExecuter e = startExecuter(project, nodeJson);
113-
newProjectExecutors.put(nodeJson, e);
114-
} catch (Exception e1) {
115-
log.error("failed to start executor for node " + nodeJson + " in project " + project.name(), e1);
116-
}
117-
}
118-
executers.put(project.name(), newProjectExecutors);
119-
log.info("project: " + project.name() + " newProjectExecutors: " + newProjectExecutors.keySet());
120-
return !executers.get(project.name()).isEmpty();
121-
}
122-
123-
private void stopNodes(String project, Map<NodeInfo, PeriodicExecuter> map) {
124-
for (Entry<NodeInfo, PeriodicExecuter> e : map.entrySet()) {
125-
log.info("stop exec1 monitoring node " + e.getKey() + " in project " + project);
126-
peerStatus.removeNode(project, e.getKey().name());
127-
stop(e.getValue());
128-
}
129-
}
130-
131-
private Map<NodeInfo, PeriodicExecuter> getCurrentNodes(ProjectJson project) {
132-
Map<NodeInfo, PeriodicExecuter> currentNodes = executers.get(project.name());
133-
if (null == currentNodes) {
134-
currentNodes = Maps.newHashMap();
135-
executers.put(project.name(), currentNodes);
136-
}
137-
return currentNodes;
138-
}
139-
140-
private PeriodicExecuter startExecuter(ProjectJson project, NodeInfo nodeJson) {
141-
log.info("Starting monitor thread for project " + project.name() + " node " + nodeJson);
142-
Task task;
143-
RunMonitors monitorsTask = new RunMonitors(configurationManager, project.name(), peerStatus, mailSender, pathHelper,
144-
nodeJson, notificationDeliverToMongo, mongoPeerStatusUpdater, snoozeKeeper);
145-
if (Constants.RUNNING_COLLECTORS_IN_PEER) {
146-
CollectorsRunner collectorsTask = collectorsRunnerFactory.create(project.name(), nodeJson);
147-
collectorsTask.init();
148-
task = collectorsTask;
149-
}
150-
else {
151-
task = monitorsTask;
152-
}
153-
PeriodicExecuter periodicExecuter = new PeriodicExecuter(NODE_MONITOR_INTERVAL,
154-
task, "RunMonitors_" + project.name() + "_" + nodeJson.name());
155-
log.info("starting 1executor " + periodicExecuter.name());
156-
periodicExecuter.runInThread();
157-
return periodicExecuter;
158-
}
159-
160-
private List<ProjectJson> getProjects() {
161-
return configurationManager.getConfiguredProjects();
162-
}
163-
164-
private List<NodeInfo> getNodes(ProjectJson project) {
165-
try {
166-
return nodesManager.nodesOf(project).nodes();
167-
} catch (Exception e) {
168-
log.warn("failed to get nodes for project " + project.name(), e);
169-
}
170-
return Lists.newArrayList();
171-
}
37+
public class NodesRunner implements Task {
38+
39+
private static final Logger log = Logger.getLogger(NodesRunner.class);
40+
41+
private static final long NODE_MONITOR_INTERVAL = TimeUnit.SECONDS.toMillis(29);
42+
public static final long NODE_RUNNER_INTERVAL = TimeUnit.HOURS.toMillis(1);
43+
44+
@Inject
45+
private IConfigurationManager configurationManager;
46+
@Inject
47+
private PathHelper pathHelper;
48+
@Inject
49+
private PeerStatus peerStatus;
50+
@Inject
51+
private MailSender mailSender;
52+
@Inject
53+
private NotificationDeliverToDatabase notificationDeliverToMongo;
54+
@Inject
55+
private NodesManager nodesManager;
56+
@Inject
57+
private SnoozeKeeper snoozeKeeper;
58+
private Map<String, Map<NodeInfo, PeriodicExecuter>> executers = Maps.newHashMap();
59+
@Inject
60+
private PeerStatusChangedUpdater mongoPeerStatusUpdater;
61+
@Inject
62+
private CollectorsRunnerFactory collectorsRunnerFactory;
63+
@Inject
64+
private GlobalConfigurationJsonStore globalConfigurationJsonStore;
65+
66+
@Override
67+
public synchronized void run() {
68+
InetAddress localHost = InetUtils.getLocalHost();
69+
log.info("NodeRunner is starting on host " + localHost.getHostName() + " " + localHost
70+
.getCanonicalHostName());
71+
log.info("NodeRunner is starting " + this + " with executers " + executers);
72+
Set<String> removedProjects = Sets.newHashSet(executers.keySet());
73+
for (ProjectJson project : getProjects()) {
74+
removedProjects.remove(project.name());
75+
try {
76+
boolean hasNodes = startStopExecutorsForProject(project);
77+
if (!hasNodes) {
78+
cleanupProject(project.name());
79+
}
80+
} catch (Exception e) {
81+
log.error("failed startStopExecutorsForProject for project " + project.name(), e);
82+
}
83+
}
84+
for (String project : removedProjects) {
85+
try {
86+
stopNodes(project, executers.get(project));
87+
cleanupProject(project);
88+
log.info("removed project " + project);
89+
} catch (Exception e) {
90+
log.error("failed to stop nodes for project " + project, e);
91+
}
92+
}
93+
}
94+
95+
/**
96+
* assuming nodes already stopped
97+
*/
98+
private void cleanupProject(String project) {
99+
log.info("cleanupProject " + project);
100+
executers.remove(project);
101+
peerStatus.removeProject(project);
102+
}
103+
104+
private void stop(PeriodicExecuter e) {
105+
log.info("stopping 1executor " + e.name());
106+
e.stopWhenPossible();
107+
}
108+
109+
private boolean startStopExecutorsForProject(ProjectJson project) {
110+
Map<NodeInfo, PeriodicExecuter> currentNodes = getCurrentNodes(project);
111+
log.info(
112+
"project: " + project.name() + " currentProjectExecutors: " + currentNodes.keySet());
113+
SelectedNodes selectedNodes;
114+
try {
115+
selectedNodes = new NodesSelector(currentNodes, getNodes(project)).selectStartStop();
116+
} catch (Exception e) {
117+
log.error(
118+
"failed to select nodes for project " + project.name() + " will leave old nodes "
119+
+ currentNodes, e);
120+
return !currentNodes.isEmpty();
121+
}
122+
log.info("selectedNodes: " + selectedNodes);
123+
stopNodes(project.name(), selectedNodes.nodesToStop());
124+
Map<NodeInfo, PeriodicExecuter> newProjectExecutors = selectedNodes
125+
.existingProjectExecutors();
126+
for (NodeInfo nodeJson : selectedNodes.nodesToStart()) {
127+
log.info("start exec1 monitoring node " + nodeJson + " in project " + project.name());
128+
try {
129+
PeriodicExecuter e = startExecuter(project, nodeJson);
130+
newProjectExecutors.put(nodeJson, e);
131+
} catch (Exception e1) {
132+
log.error("failed to start executor for node " + nodeJson + " in project " + project
133+
.name(), e1);
134+
}
135+
}
136+
executers.put(project.name(), newProjectExecutors);
137+
log.info(
138+
"project: " + project.name() + " newProjectExecutors: " + newProjectExecutors.keySet());
139+
return !executers.get(project.name()).isEmpty();
140+
}
141+
142+
private void stopNodes(String project, Map<NodeInfo, PeriodicExecuter> map) {
143+
for (Entry<NodeInfo, PeriodicExecuter> e : map.entrySet()) {
144+
log.info("stop exec1 monitoring node " + e.getKey() + " in project " + project);
145+
peerStatus.removeNode(project, e.getKey().name());
146+
stop(e.getValue());
147+
}
148+
}
149+
150+
private Map<NodeInfo, PeriodicExecuter> getCurrentNodes(ProjectJson project) {
151+
Map<NodeInfo, PeriodicExecuter> currentNodes = executers.get(project.name());
152+
if (null == currentNodes) {
153+
currentNodes = Maps.newHashMap();
154+
executers.put(project.name(), currentNodes);
155+
}
156+
return currentNodes;
157+
}
158+
159+
private PeriodicExecuter startExecuter(ProjectJson project, NodeInfo nodeJson) {
160+
log.info("Starting monitor thread for project " + project.name() + " node " + nodeJson);
161+
Task task;
162+
RunMonitors monitorsTask = new RunMonitors(configurationManager, project.name(), peerStatus,
163+
mailSender, pathHelper,
164+
nodeJson, notificationDeliverToMongo, mongoPeerStatusUpdater, snoozeKeeper,
165+
globalConfigurationJsonStore);
166+
if (Constants.RUNNING_COLLECTORS_IN_PEER) {
167+
CollectorsRunner collectorsTask = collectorsRunnerFactory
168+
.create(project.name(), nodeJson);
169+
collectorsTask.init();
170+
task = collectorsTask;
171+
} else {
172+
task = monitorsTask;
173+
}
174+
PeriodicExecuter periodicExecuter = new PeriodicExecuter(NODE_MONITOR_INTERVAL,
175+
task, "RunMonitors_" + project.name() + "_" + nodeJson.name());
176+
log.info("starting 1executor " + periodicExecuter.name());
177+
periodicExecuter.runInThread();
178+
return periodicExecuter;
179+
}
180+
181+
private List<ProjectJson> getProjects() {
182+
return configurationManager.getConfiguredProjects();
183+
}
184+
185+
private List<NodeInfo> getNodes(ProjectJson project) {
186+
try {
187+
return nodesManager.nodesOf(project).nodes();
188+
} catch (Exception e) {
189+
log.warn("failed to get nodes for project " + project.name(), e);
190+
}
191+
return Lists.newArrayList();
192+
}
172193

173194
}

0 commit comments

Comments
 (0)