From 256bf2638945c52619398792b3fd514ab6f19df3 Mon Sep 17 00:00:00 2001 From: Trevor Date: Sat, 20 Aug 2022 09:50:06 -0700 Subject: [PATCH] Restart testing and general pubber stability (#427) --- .gencode_hash.txt | 14 +- .github/workflows/testing.yml | 13 +- .gitignore | 1 + bin/pubber | 4 +- bin/reset_config | 2 +- bin/test_redirect | 24 +- dashboard/functions/index.js | 21 +- gencode/docs/config.html | 47 ++- gencode/docs/state.html | 47 ++- .../java/udmi/schema/BlobBlobsetConfig.java | 3 +- gencode/java/udmi/schema/SystemConfig.java | 21 +- gencode/java/udmi/schema/SystemState.java | 15 +- gencode/python/udmi/schema/config_system.py | 4 + gencode/python/udmi/schema/state_system.py | 4 + pubber/bin/build | 9 +- .../main/java/daq/pubber/MqttPublisher.java | 12 +- pubber/src/main/java/daq/pubber/Pubber.java | 364 ++++++++++++------ schema/common.json | 8 +- schema/config_system.json | 5 + schema/state_system.json | 5 + tests/state.tests/blobset_received.json | 4 +- tests/state.tests/blobset_updating.json | 2 +- validator/bin/build | 14 +- validator/bin/reflector | 1 + 24 files changed, 483 insertions(+), 161 deletions(-) diff --git a/.gencode_hash.txt b/.gencode_hash.txt index 77299527a7..35000db3c1 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -1,4 +1,4 @@ -026b146c355ab6d2a6767e25d8a5c382f49c22f05289abd5eb6b6ca941e1ac47 gencode/docs/config.html +a9cf25fa459ab0122be8df276ab4d95c6b1f2ed2256e1cf202f18c178e6906c0 gencode/docs/config.html 90679d3d866579501e7aa00b515af05d42fc9fe399eafacaacf297d1e4a22884 gencode/docs/envelope.html 80bbcb648a4ebe8f1b5dbb363e1654dd6db0cec2d67ac2c642f2bddc69d5a6ff gencode/docs/event_discovery.html 8133e380e40f27c56accbffc665b2eeb56ec84a4da3b52ba7aa5e439c9c40572 gencode/docs/event_pointset.html @@ -9,10 +9,10 @@ f48026471ae3cd7867bce416dc21c2fb728f48d8476a8d6e95f6acaf1d8b6cf3 gencode/docs/i 741b880216be3743f6747800a042f2dbd89f3b0344c6b0a965f4bc010f03a930 gencode/docs/schema_doc.css 878ea88206c974f40643c3cc430875f9c4e8c5e3fd6bcd6358bd3eb6d48699a9 gencode/docs/schema_doc.min.js 7ed934930aee763e0beebc349725ba3909115e8d346bb762f28bcbe745bb163a gencode/docs/schema_extras.js -46a606b867b60c1c7249612283be534ea31c95c11b04532f880bf600668124c6 gencode/docs/state.html +9b1f7c10c78d58acf650d8f500e8d972af17fce409189652cb62a1774760cd27 gencode/docs/state.html d39d7fe37a41c74a40080af7b0a429d201ab1fdff7444428c4b98eb7b38c332b gencode/java/udmi/schema/Asset.java 0825a5cec83003bb0a6488c4ed7010a04ae0d3848ef36fe01bb4e6718ba7b96d gencode/java/udmi/schema/Aux.java -902704240cee3e70c0549aed1608ba8f3c2626f719483ac06631c3811d1e0330 gencode/java/udmi/schema/BlobBlobsetConfig.java +1f2757c67215cf657297a009a01288108a04daeea919713871a2d34ea903b5f9 gencode/java/udmi/schema/BlobBlobsetConfig.java a7c57d119adcd0cf6363cc5301ce562004222522242e8ffd1d0cd7010f235ae1 gencode/java/udmi/schema/BlobBlobsetState.java b9f903444ab08907e41eb123286434ff3207b1edd01397af3ddefb8475bbdadc gencode/java/udmi/schema/BlobsetConfig.java fcbed49f1af8b791d8c52bcbe18f65521a79d9ac3eb33ec3afd9b342ab2bfc56 gencode/java/udmi/schema/BlobsetState.java @@ -64,12 +64,12 @@ abe99dd74122c186403baa6982300a9d5968f8bbb7a67b1689104111b98f32fb gencode/java/u 5a44075bc03f2b9b2cc090f007fd1692832871f0981dcb02579d8dda96a96206 gencode/java/udmi/schema/SetupReflectorConfig.java 649c0291ad81421fc51da0c2f7da3286628127157a5a6eef77610e8c37c14941 gencode/java/udmi/schema/SetupReflectorState.java 580df660dad1b97399002271716d597f72aa1a6110a49de9e162104c231752f4 gencode/java/udmi/schema/State.java -a21ef284dcc82b0addbb10fdc429d1ef543f2353d2374a4b23c0d01e6ee2c2f9 gencode/java/udmi/schema/SystemConfig.java +b2274107ce0e4e663b2a35e2ca284e492ec683bd687f6b8d8b014d5f8be67233 gencode/java/udmi/schema/SystemConfig.java 7de481ba531ea915a57cb6c5b23278db4fac6588354683cf63613b20c522af24 gencode/java/udmi/schema/SystemDiscoveryEvent.java 247652dd11714452adc27ffa542e3d5915f8e9b7b255d181723d6fe1e897b565 gencode/java/udmi/schema/SystemEvent.java 2cf23174ef4e2876511fb471d3f9fcb5cefe2fde324db844c2d0d505fd2c8844 gencode/java/udmi/schema/SystemHardware.java 1c79bdb3da4c9127c75aa6660be0e62bfab1d1ee47c289637662fee2e37f1491 gencode/java/udmi/schema/SystemModel.java -127cd1f0f137cd79c8f42e289bd80e8509b7cc269f69d9ac10874706a7a1a0b6 gencode/java/udmi/schema/SystemState.java +64e5b9a2c5699a3ca3b13dc6cb0243ff5b4c90908a14e3f11c5298ae89c8b261 gencode/java/udmi/schema/SystemState.java 7d6dd13e368e7f073738fee69c15e18652a9b7d7ac63bde0a200f747e3aa1b1d gencode/java/udmi/schema/TargetTestingModel.java d3968b92497e83a63f18cc0e74484a9807f1bb92db0c92d556ec2caaa143d645 gencode/java/udmi/schema/TestingModel.java 8a7dd590b84944de25392b1437184608edd2f8305a183eced8b1b40eae27c6d2 gencode/java/udmi/schema/TestingSystemConfig.java @@ -90,7 +90,7 @@ b461bdc24310ef972faf579b5be577b5af67fb0977d6afb4c42955211b26e3d5 gencode/python 8b2bf3e753c09e375f0fd59dcf1f0be61205ed247160b7a8718cb3b9ebef2c30 gencode/python/udmi/schema/config_localnet.py 9eab64849e04b25203d5da47856c3f8dda2b96903e4dc43ab932ee35014700bd gencode/python/udmi/schema/config_pointset.py 607c5047df878a1333df3ce88dcce34668959b0b315f6954bf1a4963dcf7839e gencode/python/udmi/schema/config_pointset_point.py -2a016b8d45868db8a146a6b84badc13c0668b94d55ab2647630645e71c4e4419 gencode/python/udmi/schema/config_system.py +ba37d2d54df565aba42ef055a0ec961175d180c2e8092a914e62029bc5388857 gencode/python/udmi/schema/config_system.py 97c2e5fadc6da0d84660f3296de885ab59f4b04154179b6717f77ec366f1544b gencode/python/udmi/schema/config_system_testing.py 5781474ae451777f65dbb13eb0a4707845551da9cfde8bf214f192e7849cd2ed gencode/python/udmi/schema/configuration_endpoint.py 3814c88403934dbd3fce77d92a8ad45c68dc7e07c319ba9b4e8f1b1ac7518c07 gencode/python/udmi/schema/configuration_pubber.py @@ -129,5 +129,5 @@ a58f8c98e837a5b56126ca0f410e02f1e9cfcd80a8cb429e0ef522defab1f690 gencode/python 05e82aa15c64842e206ae8ce3d5810d115bb890d009ea5d657822fad0e0d2165 gencode/python/udmi/schema/state_gateway.py 3520ad936af70b414d9e7f90e606a011768bc4ee3bf1248714acc517ee9b393d gencode/python/udmi/schema/state_pointset.py 837ecc89c477abe3a1faf837733ca05475774891b55353d84ca231d90a1fbf31 gencode/python/udmi/schema/state_pointset_point.py -c3d8f40e6651ca02740312066d417e59721da0ecb1860d922d53e68b650c5d64 gencode/python/udmi/schema/state_system.py +8184e783f3e2da5a6d3fb7e7fac105aaeb28106a7c146e82b610049554abf184 gencode/python/udmi/schema/state_system.py 791006619518fd7b38adb532879bcacd6f3f8795026cd75e10d3434a177757ac gencode/python/udmi/schema/state_system_hardware.py diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 40f5b0acfc..c9893c760c 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -68,9 +68,9 @@ jobs: echo ::::::: echo Simple checks that a redirect happened and failed fgrep registries/ZZ-TRI-FECTA/devices pubber.out.1 - fgrep 'system.config.parse success' pubber.out.1 fgrep registries/missing/devices pubber.out.1 - fgrep 'Not authorized to connect' pubber.out.1 + fgrep 'While waiting for connection start: Connection error' pubber.out.1 + fgrep 'Endpoint connection restored to last working endpoint' pubber.out.1 - name: pubber config checks env: GCP_TARGET_PROJECT: ${{ secrets.GCP_TARGET_PROJECT }} @@ -81,9 +81,12 @@ jobs: echo ::::::: echo Check the redirect-by-config setup fgrep registries/ZZ-TRI-FECTA/devices pubber.out.2 - fgrep 'system.config.parse success' pubber.out.2 fgrep registries/reconfigure/devices pubber.out.2 - fgrep 'Not authorized to connect' pubber.out.2 + fgrep 'While waiting for connection start: Connection error' pubber.out.2 + fgrep 'Endpoint connection restored to last working endpoint' pubber.out.2 + fgrep 'Stopping system with extreme prejudice, restart true' pubber.out.2 # restart config + fgrep 'Stopping system with extreme prejudice, restart false' pubber.out.2 # auto-kill + fgrep 'Done with pubber run, exit code 193' pubber.out.2 # last_start auto-kill check udmi: name: Sequence tests @@ -129,4 +132,4 @@ jobs: if: ${{ always() }} run: | cat pubber.out || true - more pubber/out/*.json + more pubber/out/*/*.json diff --git a/.gitignore b/.gitignore index ebcc41fcf1..908317cffe 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ credentials.json /venv/ /local/ .pubber.pid +pubber.out.* __pycache__/ /tests/downgrade.site/devices/*/out/ diff --git a/bin/pubber b/bin/pubber index 6c47df16bf..af202c6b71 100755 --- a/bin/pubber +++ b/bin/pubber @@ -18,8 +18,8 @@ if [ ! -f $site_path/cloud_iot_config.json ]; then false fi -echo Cleaning output direction $ROOT_DIR/pubber/out -rm -rf $ROOT_DIR/pubber/out +echo Cleaning output directory $ROOT_DIR/pubber/out/$serial_no +rm -rf $ROOT_DIR/pubber/out/$serial_no echo Building pubber... $ROOT_DIR/pubber/bin/build diff --git a/bin/reset_config b/bin/reset_config index f40201868f..d3637a1b40 100755 --- a/bin/reset_config +++ b/bin/reset_config @@ -25,7 +25,7 @@ src_config=${site_dir}/devices/${device_id}/out/$config_file now_date=$(python3 -c 'import datetime; print(datetime.datetime.utcnow().isoformat() + "Z")') echo Setting config timestamp ${now_date} jq < ${src_config} .timestamp=\"${now_date}\" |\ - jq .system.testing.sqeuence_name=\"${config_file%.json}\" > ${dst_config} + jq .system.testing.sequence_name=\"${config_file%.json}\" > ${dst_config} echo Resetting device ${device_id} config... validator/bin/reflector ${site_dir} ${project_id} ${device_id} update/config:${dst_config} diff --git a/bin/test_redirect b/bin/test_redirect index 40c9f24492..fa93419196 100755 --- a/bin/test_redirect +++ b/bin/test_redirect @@ -30,6 +30,7 @@ cloud_region=$(jq -r .cloud_region $site_config) registry_id=$(jq -r .registry_id $site_config) PUBBER_OUT=pubber.out +rm -f $PUBBER_OUT.* echo Killing running pubber instances... ps ax | fgrep pubber | fgrep java | awk '{print $1}' | xargs kill || true @@ -57,6 +58,9 @@ base64=$(base64 -w 0 out/endpoint.json) cat < out/blobs.json { + "system": { + "mode": "active" + }, "blobset": { "blobs": { "_iot_endpoint_config": { @@ -71,6 +75,7 @@ EOF out_base=$site_path/devices/$device_id/out # Merge JSON files together into new redirect config jq -s '.[0] * .[1]' $out_base/generated_config.json out/blobs.json > $out_base/redirect_config.json +jq '.system.mode = "restart"' $out_base/redirect_config.json > $out_base/restart_config.json echo New redirection config: cat /tmp/${device_id}_config.json @@ -81,8 +86,21 @@ bin/reset_config $site_path $project_id $device_id redirect_config.json echo Let pubber react to the new configuration... sleep 20 -# Ideally use lock files, not grep-and-kill... -echo Killing running pubber instances... -ps ax | fgrep pubber | fgrep java | awk '{print $1}' | xargs kill || true +echo Restart the system... +echo bin/reset_config $site_path $project_id $device_id restart_config.json +bin/reset_config $site_path $project_id $device_id restart_config.json + +echo And let it settle for restart... +sleep 20 + +# Now test that pubber responds to an later last_start config (to trigger automatic shutdown) +now_date=$(python3 -c 'import datetime; print(datetime.datetime.utcnow().isoformat() + "Z")') +echo Setting last_start time to $now_date for pubber shutdown... +jq ".system.last_start = \"$now_date\"" $out_base/redirect_config.json > $out_base/shutdown_config.json +echo bin/reset_config $site_path $project_id $device_id shutdown_config.json +bin/reset_config $site_path $project_id $device_id shutdown_config.json + +echo And let it settle for last start... +sleep 20 echo Done with redirect test. diff --git a/dashboard/functions/index.js b/dashboard/functions/index.js index f961000eee..cdb857ed60 100644 --- a/dashboard/functions/index.js +++ b/dashboard/functions/index.js @@ -266,6 +266,10 @@ function process_state_update(attributes, msgObject) { attributes.subType = STATE_TYPE; promises.push(publishPubsubMessage('udmi_target', attributes, msgObject)); + const stateStart = msgObject.system && msgObject.system.last_start; + stateStart && promises.push(modify_device_config(registryId, deviceId, 'last_start', + stateStart, currentTimestamp())); + for (var block in msgObject) { let subMsg = msgObject[block]; if (typeof subMsg === 'object') { @@ -325,11 +329,24 @@ function parse_old_config(oldConfig, resetConfig) { } } +function update_last_start(config, stateStart) { + const configStart = config.system && config.system.last_start; + const shouldUpdate = stateStart && (!configStart || (stateStart > configStart)); + console.log('State update last state/config', stateStart, configStart, shouldUpdate); + config.system.last_start = stateStart; + return shouldUpdate; +} + async function modify_device_config(registryId, deviceId, subFolder, subContents, startTime) { const [oldConfig, version] = await get_device_config(registryId, deviceId); var newConfig; - if (subFolder == 'update') { + if (subFolder == 'last_start') { + newConfig = parse_old_config(oldConfig, false); + if (!newConfig || !update_last_start(newConfig, subContents)) { + return; + } + } else if (subFolder == 'update') { console.log('Config replace version', version, startTime); newConfig = subContents; } else { @@ -440,7 +457,7 @@ function consolidate_config(registryId, deviceId, subFolder) { if (subFolder == UPDATE_FOLDER) { return; } - + console.log('consolidating config for', registryId, deviceId); const new_config = { diff --git a/gencode/docs/config.html b/gencode/docs/config.html index 7ae41f1b01..fa48a01193 100644 --- a/gencode/docs/config.html +++ b/gencode/docs/config.html @@ -244,12 +244,12 @@

/> modeType: enum (of string)
-

Operating mode for the device. Defaults is 'active'.

+

Operating mode for the device. Default is 'active'.

Must be one of:

-
  • "initial"
  • "active"
  • "restart"
+
  • "initial"
  • "active"
  • "restart"
  • "shutdown"
@@ -257,6 +257,47 @@

Must be one of:

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Last time a device with this id said it restarted: being later than status-supplied last_start indicates resource conflict.

+
+ + + + + +
@@ -1579,7 +1620,7 @@

Must be one of:

-
  • "initial"
  • "updating"
  • "final"
+
  • "apply"
  • "final"
diff --git a/gencode/docs/state.html b/gencode/docs/state.html index eca4268e99..0b5a601452 100644 --- a/gencode/docs/state.html +++ b/gencode/docs/state.html @@ -334,12 +334,12 @@

/> mode

Type: enum (of string)
-

Operating mode for the device. Defaults is 'active'.

+

Operating mode for the device. Default is 'active'.

Must be one of:

-
  • "initial"
  • "active"
  • "restart"
+
  • "initial"
  • "active"
  • "restart"
  • "shutdown"
@@ -347,6 +347,47 @@

Must be one of:

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Last time the system started up.

+
+ + + + + +
@@ -8275,7 +8316,7 @@

Must be one of:

-
  • "initial"
  • "updating"
  • "final"
+
  • "apply"
  • "final"
diff --git a/gencode/java/udmi/schema/BlobBlobsetConfig.java b/gencode/java/udmi/schema/BlobBlobsetConfig.java index 27214dd9ff..f525566e49 100644 --- a/gencode/java/udmi/schema/BlobBlobsetConfig.java +++ b/gencode/java/udmi/schema/BlobBlobsetConfig.java @@ -86,8 +86,7 @@ public boolean equals(Object other) { @Generated("jsonschema2pojo") public enum BlobPhase { - INITIAL("initial"), - UPDATING("updating"), + APPLY("apply"), FINAL("final"); private final String value; private final static Map CONSTANTS = new HashMap(); diff --git a/gencode/java/udmi/schema/SystemConfig.java b/gencode/java/udmi/schema/SystemConfig.java index 6f7840febe..f80b48d1b5 100644 --- a/gencode/java/udmi/schema/SystemConfig.java +++ b/gencode/java/udmi/schema/SystemConfig.java @@ -1,6 +1,7 @@ package udmi.schema; +import java.util.Date; import java.util.HashMap; import java.util.Map; import javax.annotation.processing.Generated; @@ -23,6 +24,7 @@ "min_loglevel", "metrics_rate_sec", "mode", + "last_start", "testing" }) @Generated("jsonschema2pojo") @@ -45,12 +47,19 @@ public class SystemConfig { /** * System Mode *

- * Operating mode for the device. Defaults is 'active'. + * Operating mode for the device. Default is 'active'. * */ @JsonProperty("mode") - @JsonPropertyDescription("Operating mode for the device. Defaults is 'active'.") + @JsonPropertyDescription("Operating mode for the device. Default is 'active'.") public SystemConfig.SystemMode mode; + /** + * Last time a device with this id said it restarted: being later than status-supplied last_start indicates resource conflict. + * + */ + @JsonProperty("last_start") + @JsonPropertyDescription("Last time a device with this id said it restarted: being later than status-supplied last_start indicates resource conflict.") + public Date last_start; /** * Testing System Config *

@@ -66,6 +75,7 @@ public int hashCode() { int result = 1; result = ((result* 31)+((this.metrics_rate_sec == null)? 0 :this.metrics_rate_sec.hashCode())); result = ((result* 31)+((this.mode == null)? 0 :this.mode.hashCode())); + result = ((result* 31)+((this.last_start == null)? 0 :this.last_start.hashCode())); result = ((result* 31)+((this.min_loglevel == null)? 0 :this.min_loglevel.hashCode())); result = ((result* 31)+((this.testing == null)? 0 :this.testing.hashCode())); return result; @@ -80,14 +90,14 @@ public boolean equals(Object other) { return false; } SystemConfig rhs = ((SystemConfig) other); - return (((((this.metrics_rate_sec == rhs.metrics_rate_sec)||((this.metrics_rate_sec!= null)&&this.metrics_rate_sec.equals(rhs.metrics_rate_sec)))&&((this.mode == rhs.mode)||((this.mode!= null)&&this.mode.equals(rhs.mode))))&&((this.min_loglevel == rhs.min_loglevel)||((this.min_loglevel!= null)&&this.min_loglevel.equals(rhs.min_loglevel))))&&((this.testing == rhs.testing)||((this.testing!= null)&&this.testing.equals(rhs.testing)))); + return ((((((this.metrics_rate_sec == rhs.metrics_rate_sec)||((this.metrics_rate_sec!= null)&&this.metrics_rate_sec.equals(rhs.metrics_rate_sec)))&&((this.mode == rhs.mode)||((this.mode!= null)&&this.mode.equals(rhs.mode))))&&((this.last_start == rhs.last_start)||((this.last_start!= null)&&this.last_start.equals(rhs.last_start))))&&((this.min_loglevel == rhs.min_loglevel)||((this.min_loglevel!= null)&&this.min_loglevel.equals(rhs.min_loglevel))))&&((this.testing == rhs.testing)||((this.testing!= null)&&this.testing.equals(rhs.testing)))); } /** * System Mode *

- * Operating mode for the device. Defaults is 'active'. + * Operating mode for the device. Default is 'active'. * */ @Generated("jsonschema2pojo") @@ -95,7 +105,8 @@ public enum SystemMode { INITIAL("initial"), ACTIVE("active"), - RESTART("restart"); + RESTART("restart"), + SHUTDOWN("shutdown"); private final String value; private final static Map CONSTANTS = new HashMap(); diff --git a/gencode/java/udmi/schema/SystemState.java b/gencode/java/udmi/schema/SystemState.java index 020802458f..411f76dc02 100644 --- a/gencode/java/udmi/schema/SystemState.java +++ b/gencode/java/udmi/schema/SystemState.java @@ -23,6 +23,7 @@ "last_config", "operational", "mode", + "last_start", "serial_no", "hardware", "software", @@ -65,12 +66,19 @@ public class SystemState { /** * System Mode *

- * Operating mode for the device. Defaults is 'active'. + * Operating mode for the device. Default is 'active'. * */ @JsonProperty("mode") - @JsonPropertyDescription("Operating mode for the device. Defaults is 'active'.") + @JsonPropertyDescription("Operating mode for the device. Default is 'active'.") public udmi.schema.SystemConfig.SystemMode mode; + /** + * Last time the system started up. + * + */ + @JsonProperty("last_start") + @JsonPropertyDescription("Last time the system started up.") + public Date last_start; /** * The serial number of the physical device * (Required) @@ -112,6 +120,7 @@ public class SystemState { public int hashCode() { int result = 1; result = ((result* 31)+((this.mode == null)? 0 :this.mode.hashCode())); + result = ((result* 31)+((this.last_start == null)? 0 :this.last_start.hashCode())); result = ((result* 31)+((this.software == null)? 0 :this.software.hashCode())); result = ((result* 31)+((this.operational == null)? 0 :this.operational.hashCode())); result = ((result* 31)+((this.params == null)? 0 :this.params.hashCode())); @@ -133,7 +142,7 @@ public boolean equals(Object other) { return false; } SystemState rhs = ((SystemState) other); - return (((((((((((this.mode == rhs.mode)||((this.mode!= null)&&this.mode.equals(rhs.mode)))&&((this.software == rhs.software)||((this.software!= null)&&this.software.equals(rhs.software))))&&((this.operational == rhs.operational)||((this.operational!= null)&&this.operational.equals(rhs.operational))))&&((this.params == rhs.params)||((this.params!= null)&&this.params.equals(rhs.params))))&&((this.version == rhs.version)||((this.version!= null)&&this.version.equals(rhs.version))))&&((this.serial_no == rhs.serial_no)||((this.serial_no!= null)&&this.serial_no.equals(rhs.serial_no))))&&((this.timestamp == rhs.timestamp)||((this.timestamp!= null)&&this.timestamp.equals(rhs.timestamp))))&&((this.last_config == rhs.last_config)||((this.last_config!= null)&&this.last_config.equals(rhs.last_config))))&&((this.hardware == rhs.hardware)||((this.hardware!= null)&&this.hardware.equals(rhs.hardware))))&&((this.status == rhs.status)||((this.status!= null)&&this.status.equals(rhs.status)))); + return ((((((((((((this.mode == rhs.mode)||((this.mode!= null)&&this.mode.equals(rhs.mode)))&&((this.last_start == rhs.last_start)||((this.last_start!= null)&&this.last_start.equals(rhs.last_start))))&&((this.software == rhs.software)||((this.software!= null)&&this.software.equals(rhs.software))))&&((this.operational == rhs.operational)||((this.operational!= null)&&this.operational.equals(rhs.operational))))&&((this.params == rhs.params)||((this.params!= null)&&this.params.equals(rhs.params))))&&((this.version == rhs.version)||((this.version!= null)&&this.version.equals(rhs.version))))&&((this.serial_no == rhs.serial_no)||((this.serial_no!= null)&&this.serial_no.equals(rhs.serial_no))))&&((this.timestamp == rhs.timestamp)||((this.timestamp!= null)&&this.timestamp.equals(rhs.timestamp))))&&((this.last_config == rhs.last_config)||((this.last_config!= null)&&this.last_config.equals(rhs.last_config))))&&((this.hardware == rhs.hardware)||((this.hardware!= null)&&this.hardware.equals(rhs.hardware))))&&((this.status == rhs.status)||((this.status!= null)&&this.status.equals(rhs.status)))); } } diff --git a/gencode/python/udmi/schema/config_system.py b/gencode/python/udmi/schema/config_system.py index 5ff3a74d0c..4b87c9a812 100644 --- a/gencode/python/udmi/schema/config_system.py +++ b/gencode/python/udmi/schema/config_system.py @@ -9,6 +9,7 @@ def __init__(self): self.min_loglevel = None self.metrics_rate_sec = None self.mode = None + self.last_start = None self.testing = None @staticmethod @@ -19,6 +20,7 @@ def from_dict(source): result.min_loglevel = source.get('min_loglevel') result.metrics_rate_sec = source.get('metrics_rate_sec') result.mode = source.get('mode') + result.last_start = source.get('last_start') result.testing = TestingSystemConfig.from_dict(source.get('testing')) return result @@ -46,6 +48,8 @@ def to_dict(self): result['metrics_rate_sec'] = self.metrics_rate_sec # 5 if self.mode: result['mode'] = self.mode # 5 + if self.last_start: + result['last_start'] = self.last_start # 5 if self.testing: result['testing'] = self.testing.to_dict() # 4 return result diff --git a/gencode/python/udmi/schema/state_system.py b/gencode/python/udmi/schema/state_system.py index 421d2779c9..a56f5ebc6b 100644 --- a/gencode/python/udmi/schema/state_system.py +++ b/gencode/python/udmi/schema/state_system.py @@ -12,6 +12,7 @@ def __init__(self): self.last_config = None self.operational = None self.mode = None + self.last_start = None self.serial_no = None self.hardware = None self.software = None @@ -28,6 +29,7 @@ def from_dict(source): result.last_config = source.get('last_config') result.operational = source.get('operational') result.mode = source.get('mode') + result.last_start = source.get('last_start') result.serial_no = source.get('serial_no') result.hardware = SystemHardware.from_dict(source.get('hardware')) result.software = source.get('software') @@ -63,6 +65,8 @@ def to_dict(self): result['operational'] = self.operational # 5 if self.mode: result['mode'] = self.mode # 5 + if self.last_start: + result['last_start'] = self.last_start # 5 if self.serial_no: result['serial_no'] = self.serial_no # 5 if self.hardware: diff --git a/pubber/bin/build b/pubber/bin/build index d0c74de2df..1a43c94ed2 100755 --- a/pubber/bin/build +++ b/pubber/bin/build @@ -8,11 +8,18 @@ fi rundir=$(dirname $0) cd $rundir/.. +newest=$(ls -rt `find src/ ../gencode/java -type f` | tail -n 1) +jarfile=build/libs/pubber-1.0-SNAPSHOT-all.jar +if [[ -f $jarfile && $jarfile -nt $newest ]]; then + echo $jarfile up to date, skipping build. + exit 0 +fi + echo Building pubber in $PWD rm -rf build ./gradlew shadow $check -ls -l build/libs/pubber-1.0-SNAPSHOT-all.jar +ls -l $jarfile echo Done with pubber build diff --git a/pubber/src/main/java/daq/pubber/MqttPublisher.java b/pubber/src/main/java/daq/pubber/MqttPublisher.java index c3c751150f..d27cc02b16 100644 --- a/pubber/src/main/java/daq/pubber/MqttPublisher.java +++ b/pubber/src/main/java/daq/pubber/MqttPublisher.java @@ -151,10 +151,14 @@ private String getClientId(String deviceId) { return getClientId(projectId, cloudRegion, registryId, deviceId); } + boolean isActive() { + return !publisherExecutor.isShutdown(); + } + void publish(String deviceId, String topic, Object data, Runnable callback) { Preconditions.checkNotNull(deviceId, "publish deviceId"); - if (publisherExecutor.isShutdown()) { - throw new RuntimeException("Publisher shutdown."); + if (!isActive()) { + throw new RuntimeException("Publisher already shutdown."); } debug("Publishing in background " + topic); Object marked = topic.startsWith(EVENT_MARK_PREFIX) ? decorateMessage(topic, data) : data; @@ -186,7 +190,6 @@ private void publishCore(String deviceId, String topic, Object data, } } catch (Exception e) { errorCounter.incrementAndGet(); - e.printStackTrace(); warn(String.format("Publish failed for %s: %s", deviceId, e)); if (configuration.gatewayId == null) { closeMqttClient(deviceId); @@ -523,7 +526,8 @@ private class MqttCallbackHandler implements MqttCallback { @Override public void connectionLost(Throwable cause) { - warn("MQTT Connection Lost: " + cause); + boolean connected = mqttClients.remove(deviceId).isConnected(); + warn("MQTT Connection Lost: " + connected + cause); onError.accept(new ConnectionClosedException()); } diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 8f85adfd70..4bc3dba2c0 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.http.ConnectionClosedException; @@ -78,6 +79,7 @@ import udmi.schema.PubberConfiguration; import udmi.schema.PubberOptions; import udmi.schema.State; +import udmi.schema.SystemConfig; import udmi.schema.SystemConfig.SystemMode; import udmi.schema.SystemEvent; import udmi.schema.SystemHardware; @@ -89,7 +91,7 @@ public class Pubber { public static final int SCAN_DURATION_SEC = 10; - public static final String DISCOVERY_ID = "RANDOM_ID"; + public static final String PUBBER_OUT = "pubber/out"; private static final String UDMI_VERSION = "1.3.14"; private static final Logger LOG = LoggerFactory.getLogger(Pubber.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() @@ -97,16 +99,12 @@ public class Pubber { .setDateFormat(new ISO8601DateFormat()) .setSerializationInclusion(JsonInclude.Include.NON_NULL); private static final String HOSTNAME = System.getenv("HOSTNAME"); - private static final String POINTSET_TOPIC = "events/pointset"; - private static final String SYSTEM_TOPIC = "events/system"; - private static final String STATE_TOPIC = "state"; private static final String CONFIG_TOPIC = "config"; private static final String ERROR_TOPIC = "errors"; private static final int MIN_REPORT_MS = 200; private static final int DEFAULT_REPORT_SEC = 10; - private static final int CONFIG_WAIT_TIME_MS = 10000; + private static final int CONFIG_WAIT_TIME_SEC = 10; private static final int STATE_THROTTLE_MS = 2000; - private static final String OUT_DIR = "pubber/out"; private static final String PUBSUB_SITE = "PubSub"; private static final Set BOOLEAN_UNITS = ImmutableSet.of("No-units"); private static final double DEFAULT_BASELINE_VALUE = 50; @@ -133,9 +131,14 @@ public class Pubber { "faulty_finding", makePointPointsetModel(true, 40, 0, "deg"), "superimposition_reading", makePointPointsetModel(false) ); - private static final Date DEVICE_START_TIME = new Date(); - private static final int RESTART_EXIT_CODE = 192; + private static final Date DEVICE_START_TIME = getRoundedStartTime(); + private static final int RESTART_EXIT_CODE = 192; // After exit, wrapper script should restart. + private static final int SHUTDOWN_EXIT_CODE = 193; // After exit, do not restart. private static final Map MESSAGE_COUNTS = new HashMap<>(); + private static final int CONNECT_RETRIES = 10; + private static final AtomicInteger retriesRemaining = new AtomicInteger(CONNECT_RETRIES); + private static final long RESTART_DELAY_MS = 1000; + private final File outDir; private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1); private final PubberConfiguration configuration; private final AtomicInteger messageDelayMs = new AtomicInteger(DEFAULT_REPORT_SEC * 1000); @@ -146,15 +149,18 @@ public class Pubber { private final AtomicBoolean stateDirty = new AtomicBoolean(); private final String projectId; private final String deviceId; + private final Object reconnectLock = new Object(); private Config deviceConfig = new Config(); private int deviceMessageCount = -1; private MqttPublisher mqttPublisher; - private ScheduledFuture scheduledFuture; + private ScheduledFuture periodicSender; private long lastStateTimeMs; private PubSubClient pubSubClient; - private Consumer connectionDone; + private Function connectionDone; private boolean publishingLog; private String appliedEndpoint; + private String workingEndpoint; + private String attemptedEndpoint; private EndpointConfiguration extractedEndpoint; private SiteModel siteModel; private PrintStream logPrintWriter; @@ -173,9 +179,11 @@ public Pubber(String configPath) { ClientInfo clientInfo = MqttPublisher.parseClientId(configuration.endpoint.client_id); projectId = clientInfo.projectId; deviceId = clientInfo.deviceId; + outDir = new File(PUBBER_OUT); } catch (Exception e) { - executor.shutdown(); - throw new RuntimeException("While reading config " + configFile.getAbsolutePath(), e); + terminate(); + throw new RuntimeException("While configuring instance from " + configFile.getAbsolutePath(), + e); } } @@ -190,6 +198,7 @@ public Pubber(String configPath) { public Pubber(String projectId, String sitePath, String deviceId, String serialNo) { this.projectId = projectId; this.deviceId = deviceId; + outDir = new File(PUBBER_OUT + "/" + serialNo); configuration = sanitizeConfiguration(new PubberConfiguration()); configuration.deviceId = deviceId; configuration.serialNo = serialNo; @@ -200,6 +209,12 @@ public Pubber(String projectId, String sitePath, String deviceId, String serialN } } + private static Date getRoundedStartTime() { + long timestamp = new Date().getTime(); + // Remove ms so that rounded conversions preserve equality. + return new Date(timestamp - (timestamp % 1000)); + } + private static PointPointsetModel makePointPointsetModel(boolean writable, int value, double tolerance, String units) { PointPointsetModel pointMetadata = new PointPointsetModel(); @@ -243,8 +258,10 @@ private static void singularPubber(String[] args) throws InterruptedException { } pubber.initialize(); pubber.startConnection(deviceId -> { - LOG.info("Connection closed/finished " + deviceId); - pubber.terminate(); + int retries = retriesRemaining.decrementAndGet(); + LOG.info(String.format("Connection closed/finished for %s, %d retries remaining", deviceId, + retries)); + return retries > 0 && pubber.maybeRestart(); }); } @@ -272,9 +289,9 @@ private static void startFeedListener(String projectId, String siteName, String Pubber pubber = new Pubber(projectId, siteName, feedName, serialNo); pubber.initialize(); pubber.startConnection(deviceId -> { - pubber.terminate(); LOG.error("Connection terminated, restarting listener"); startFeedListener(projectId, siteName, feedName, serialNo); + return false; }); } catch (Exception e) { LOG.error("Exception starting instance " + serialNo, e); @@ -289,6 +306,19 @@ private static PubberConfiguration sanitizeConfiguration(PubberConfiguration con return configuration; } + private boolean maybeRestart() { + if (deviceConfig.system != null && (deviceConfig.system.last_start == null + || !deviceConfig.system.last_start.equals(DEVICE_START_TIME))) { + error("Local start_time is older than config start_time, terminating process."); + deviceState.system.status = exceptionStatus(new InterruptedException("Start time conflict"), + Category.SYSTEM_BASE_SHUTDOWN); + return false; + } + safeSleep(RESTART_DELAY_MS); // Prevent thrashing of reconnects. + startConnection(connectionDone); + return true; + } + private AbstractPoint makePoint(String name, PointPointsetModel point) { boolean writable = point.writable != null && point.writable; if (BOOLEAN_UNITS.contains(point.units)) { @@ -317,8 +347,9 @@ private double convertValue(Object baselineValue, double defaultBaselineValue) { private void initializeDevice() { deviceState.system = new SystemState(); - deviceState.pointset = new PointsetState(); deviceState.system.hardware = new SystemHardware(); + deviceState.system.last_start = DEVICE_START_TIME; + deviceState.pointset = new PointsetState(); deviceState.pointset.points = new HashMap<>(); devicePoints.points = new HashMap<>(); @@ -391,14 +422,14 @@ private void pullDeviceMessage() { return; } catch (Exception e) { error("Error pulling swarm message", e); - safeSleep(10000); + safeSleep(CONFIG_WAIT_TIME_SEC); } } } - private void safeSleep(long duration) { + private void safeSleep(long durationMs) { try { - Thread.sleep(duration); + Thread.sleep(durationMs); } catch (InterruptedException e) { throw new RuntimeException("Error sleeping", e); } @@ -444,7 +475,7 @@ private void processDeviceMetadata(Metadata metadata) { } private synchronized void maybeRestartExecutor(int intervalMs) { - if (scheduledFuture == null || intervalMs != messageDelayMs.get()) { + if (periodicSender == null || intervalMs != messageDelayMs.get()) { cancelPeriodicSend(); messageDelayMs.set(intervalMs); startPeriodicSend(); @@ -452,21 +483,21 @@ private synchronized void maybeRestartExecutor(int intervalMs) { } private synchronized void startPeriodicSend() { - Preconditions.checkState(scheduledFuture == null); + Preconditions.checkState(periodicSender == null); int delay = messageDelayMs.get(); info("Starting executor with send message delay " + delay); - scheduledFuture = executor - .scheduleAtFixedRate(this::sendMessages, delay, delay, TimeUnit.MILLISECONDS); + periodicSender = executor.scheduleAtFixedRate(this::sendMessages, delay, delay, + TimeUnit.MILLISECONDS); } private synchronized void cancelPeriodicSend() { - if (scheduledFuture != null) { + if (periodicSender != null) { try { - scheduledFuture.cancel(false); + periodicSender.cancel(false); } catch (Exception e) { throw new RuntimeException("While cancelling executor", e); } finally { - scheduledFuture = null; + periodicSender = null; } } } @@ -479,7 +510,6 @@ private void sendMessages() { flushDirtyState(); } catch (Exception e) { error("Fatal error during execution", e); - terminate(); } } @@ -489,29 +519,34 @@ private void deferredConfigActions() { } private void maybeRestartSystem() { - if (deviceConfig.system == null) { + SystemConfig systemConfig = deviceConfig.system; + if (systemConfig == null) { return; } if (SystemMode.ACTIVE.equals(deviceState.system.mode) - && SystemMode.RESTART.equals(deviceConfig.system.mode)) { - restartSystem(); + && SystemMode.RESTART.equals(systemConfig.mode)) { + restartSystem(true); } - if (SystemMode.ACTIVE.equals(deviceConfig.system.mode)) { + if (SystemMode.ACTIVE.equals(systemConfig.mode)) { deviceState.system.mode = SystemMode.ACTIVE; } + if (systemConfig.last_start != null && DEVICE_START_TIME.before(systemConfig.last_start)) { + System.err.printf("Device start time %s before last config start %s, terminating.", + isoConvert(DEVICE_START_TIME), isoConvert(systemConfig.last_start)); + restartSystem(false); + } } - private void restartSystem() { - deviceState.system.mode = SystemMode.RESTART; - publishStateMessage(); - System.err.println("Restarting system with extreme prejudice."); - System.err.flush(); - System.exit(RESTART_EXIT_CODE); + private void restartSystem(boolean restart) { + deviceState.system.mode = restart ? SystemMode.RESTART : SystemMode.SHUTDOWN; + publishSynchronousState(); + error("Stopping system with extreme prejudice, restart " + restart); + System.exit(restart ? RESTART_EXIT_CODE : SHUTDOWN_EXIT_CODE); } private void flushDirtyState() { if (stateDirty.get()) { - publishStateMessage(); + publishAsynchronousState(); } } @@ -529,27 +564,57 @@ private void updateState(AbstractPoint point) { } } + private void captureExceptions(String action, Runnable runnable) { + try { + runnable.run(); + } catch (Exception e) { + error(action, e); + } + } + private void terminate() { + warn("Terminating"); + deviceState.system.mode = SystemMode.SHUTDOWN; + captureExceptions("publishing shutdown state", this::publishSynchronousState); + stop(); + captureExceptions("executor flush", this::stopExecutor); + } + + private void stopExecutor() { try { - info("Terminating"); - if (mqttPublisher != null) { - mqttPublisher.close(); - mqttPublisher = null; - } cancelPeriodicSend(); executor.shutdown(); + if (!executor.awaitTermination(CONFIG_WAIT_TIME_SEC, TimeUnit.SECONDS)) { + throw new RuntimeException("Failed to shutdown scheduled tasks"); + } } catch (Exception e) { - info("Error terminating: " + e.getMessage()); + throw new RuntimeException("While stopping executor", e); } } - private void startConnection(Consumer onDone) throws InterruptedException { - this.connectionDone = onDone; - connect(); - boolean result = configLatch.await(CONFIG_WAIT_TIME_MS, TimeUnit.MILLISECONDS); - info("synchronized start config result " + result); - if (!result && mqttPublisher != null) { - mqttPublisher.close(); + private void startConnection(Function connectionDone) { + try { + while (retriesRemaining.getAndDecrement() > 0) { + try { + this.connectionDone = connectionDone; + if (mqttPublisher == null) { + throw new RuntimeException("Mqtt publisher not initialized"); + } + connect(); + if (configLatch.await(CONFIG_WAIT_TIME_SEC, TimeUnit.SECONDS)) { + return; + } + error("Configuration sync failed after " + CONFIG_WAIT_TIME_SEC); + } catch (Exception e) { + error("While waiting for connection start", e); + } + error("Attempt failed, retries remaining: " + retriesRemaining.get()); + safeSleep(RESTART_DELAY_MS); + } + throw new RuntimeException("Failed connection attempt after retries"); + } catch (Exception e) { + stop(); + throw new RuntimeException("While attempting to start connection", e); } } @@ -566,27 +631,40 @@ private void initialize() { try { initializeDevice(); - File outDir = new File(OUT_DIR); try { - outDir.mkdir(); - File logOut = new File(OUT_DIR, traceTimestamp("pubber") + ".log"); + outDir.mkdirs(); + File logOut = new File(outDir, traceTimestamp("pubber") + ".log"); logPrintWriter = new PrintStream(logOut); logPrintWriter.println("Pubber log started at " + getTimestamp()); } catch (Exception e) { - throw new RuntimeException("While initializing out dir " + outDir.getPath(), e); + throw new RuntimeException("While initializing out dir " + outDir.getAbsolutePath(), e); } initializeMqtt(); } catch (Exception e) { - executor.shutdown(); + terminate(); throw new RuntimeException("While initializing main pubber class", e); } } + private void stop() { + captureExceptions("disconnecting mqtt", this::disconnectMqtt); + captureExceptions("closing log", this::closeLogWriter); + captureExceptions("stopping periodic send", this::cancelPeriodicSend); + } + + private void closeLogWriter() { + if (logPrintWriter != null) { + logPrintWriter.close(); + logPrintWriter = null; + } + } + private void disconnectMqtt() { - Preconditions.checkState(mqttPublisher != null, "mqttPublisher not defined"); - mqttPublisher.close(); - mqttPublisher = null; + if (mqttPublisher != null) { + captureExceptions("closing mqtt publisher", mqttPublisher::close); + mqttPublisher = null; + } } private void initializeMqtt() { @@ -608,9 +686,12 @@ private void initializeMqtt() { this::configHandler, Config.class); } - private String toJson(Object configuration) { + private String toJson(Object target) { try { - return OBJECT_MAPPER.writeValueAsString(configuration); + if (target == null) { + return null; + } + return OBJECT_MAPPER.writeValueAsString(target); } catch (Exception e) { throw new RuntimeException("While converting object to string", e); } @@ -630,8 +711,10 @@ private void connect() { try { mqttPublisher.connect(configuration.deviceId); info("Connection complete."); + workingEndpoint = toJson(configuration.endpoint); + retriesRemaining.set(CONNECT_RETRIES); } catch (Exception e) { - error("Connection error", e); + throw new RuntimeException("Connection error", e); } } @@ -644,8 +727,12 @@ private void publisherException(Exception toReport) { publisherHandler(((PublisherException) toReport).type, ((PublisherException) toReport).phase, toReport.getCause()); } else if (toReport instanceof ConnectionClosedException) { - if (connectionDone != null) { - connectionDone.accept(configuration.deviceId); + synchronized (reconnectLock) { + if (connectionDone != null) { + while (connectionDone.apply(configuration.deviceId)) { + warn("Retrying reconnect handler..."); + } + } } } else { error("Unknown exception type " + toReport.getClass(), toReport); @@ -662,7 +749,7 @@ private void publisherHandler(String type, String phase, Throwable cause) { publishLogMessage(report); // TODO: Replace this with a heap so only the highest-priority status is reported. deviceState.system.status = shouldLogLevel(report.level) ? report : null; - publishStateMessage(); + publishAsynchronousState(); if (cause != null && configLatch.getCount() > 0) { configLatch.countDown(); warn("Released startup latch because reported error"); @@ -703,11 +790,10 @@ private void gatewayHandler(Config config) { private void configHandler(Config config) { try { info("Config handler"); - File configOut = new File(OUT_DIR, traceTimestamp("config") + ".json"); + File configOut = new File(outDir, traceTimestamp("config") + ".json"); try { OBJECT_MAPPER.writeValue(configOut, config); - debug(String.format("Config update%s", getTestingTag(config)), - OBJECT_MAPPER.writeValueAsString(config)); + debug(String.format("Config update%s", getTestingTag(config)), toJson(config)); } catch (Exception e) { throw new RuntimeException("While writing config " + configOut.getPath(), e); } @@ -716,9 +802,8 @@ private void configHandler(Config config) { publisherConfigLog("apply", null); } catch (Exception e) { publisherConfigLog("apply", e); - trace(stackTraceString(e)); } - publishStateMessage(); + publishAsynchronousState(); } private void processConfigUpdate(Config config) { @@ -764,33 +849,64 @@ private void removeBlobsetBlobState(SystemBlobsets blobId) { private void maybeRedirectEndpoint() { String redirectRegistry = configuration.options.redirectRegistry; - String currentId = configuration.endpoint.client_id; - String redirectId = getClientId(redirectRegistry); + String currentSignature = toJson(configuration.endpoint); + String extractedSignature = + redirectRegistry == null ? toJson(extractedEndpoint) : redirectedEndpoint(redirectRegistry); - if (extractedEndpoint != null && !toJson(extractedEndpoint).equals(appliedEndpoint)) { + if (extractedSignature != null && !extractedSignature.equals( + currentSignature) && !extractedSignature.equals(attemptedEndpoint)) { info("New config blob endpoint detected"); - configuration.endpoint = extractedEndpoint; - } else if (redirectRegistry != null && configLatch.getCount() <= 0 && !redirectId.equals( - currentId)) { - info("Mismatched redirectRegistry detected, redirecting to " + redirectRegistry); - configuration.endpoint.client_id = redirectId; } else { return; // No need to redirect anything! } BlobBlobsetState endpointState = ensureBlobsetState(IOT_ENDPOINT_CONFIG); try { - endpointState.phase = BlobPhase.UPDATING; + endpointState.phase = BlobPhase.APPLY; endpointState.status = null; - publishStateMessage(true); - disconnectMqtt(); - initializeMqtt(); - startConnection(connectionDone); + publishSynchronousState(); + attemptedEndpoint = extractedSignature; + resetConnection(extractedSignature); endpointState.phase = BlobPhase.FINAL; + appliedEndpoint = null; + } catch (Exception e) { + try { + error("Reconfigure failed, attempting connection to last working endpoint", e); + endpointState.status = exceptionStatus(e, Category.BLOBSET_BLOB_APPLY); + resetConnection(workingEndpoint); + publishAsynchronousState(); + notice("Endpoint connection restored to last working endpoint"); + } catch (Exception e2) { + throw new RuntimeException("While restoring working endpoint", e2); + } + error("While redirecting connection endpoint", e); + } + } + + private String redirectedEndpoint(String redirectRegistry) { + try { + EndpointConfiguration endpoint = OBJECT_MAPPER.readValue(toJson(configuration.endpoint), + EndpointConfiguration.class); + endpoint.client_id = getClientId(redirectRegistry); + return toJson(endpoint); + } catch (Exception e) { + throw new RuntimeException("While getting redirected endpoint"); + } + } + + private void resetConnection(String targetEndpoint) { + try { + configuration.endpoint = OBJECT_MAPPER.readValue(targetEndpoint, + EndpointConfiguration.class); + synchronized (reconnectLock) { + disconnectMqtt(); + initializeMqtt(); + retriesRemaining.set(CONNECT_RETRIES); + startConnection(connectionDone); + } } catch (Exception e) { - endpointState.status = exceptionStatus(e, Category.BLOBSET_BLOB_APPLY); - publishStateMessage(); - throw new RuntimeException("While redirecting connection endpoint", e); + stop(); + throw new RuntimeException("While resetting connection", e); } } @@ -940,6 +1056,9 @@ private FamilyDiscoveryState getFamilyDiscoveryState(String family) { } private long scheduleFuture(Date futureTime, Runnable futureTask) { + if (executor.isShutdown() || executor.isTerminated()) { + throw new RuntimeException("Executor shutdown/terminated, not scheduling"); + } long delay = futureTime.getTime() - new Date().getTime(); debug(String.format("Scheduling future in %dms", delay)); executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS); @@ -965,7 +1084,7 @@ private void scheduleDiscoveryScan(String family, Date scanGeneration) { scheduleFuture(stopTime, () -> discoveryScanComplete(family, scanGeneration)); familyDiscoveryState.generation = scanGeneration; familyDiscoveryState.active = true; - publishStateMessage(); + publishAsynchronousState(); Date sendTime = Date.from(Instant.now().plusSeconds(SCAN_DURATION_SEC / 2)); scheduleFuture(sendTime, () -> sendDiscoveryEvent(family, scanGeneration)); } @@ -1031,7 +1150,7 @@ private void discoveryScanComplete(String family, Date scanGeneration) { } else { info("Discovery scan stopping " + family + " from " + isoConvert(scanGeneration)); familyDiscoveryState.active = false; - publishStateMessage(); + publishAsynchronousState(); } } } catch (Exception e) { @@ -1073,7 +1192,7 @@ private String isoConvert(Date timestamp) { if (timestamp == null) { return "null"; } - String dateString = OBJECT_MAPPER.writeValueAsString(timestamp); + String dateString = toJson(timestamp); // Strip off the leading and trailing quotes from the JSON string-as-string representation. return dateString.substring(1, dateString.length() - 1); } catch (Exception e) { @@ -1142,28 +1261,42 @@ private void publishLogMessage(Entry report) { } } - private void publishStateMessage() { - publishStateMessage(false); + private void publishAsynchronousState() { + long delay = lastStateTimeMs + STATE_THROTTLE_MS - System.currentTimeMillis(); + if (delay > 0) { + warn(String.format("State update defer %dms", delay)); + markStateDirty(delay); + } else { + publishStateMessage(null); + } } - private void publishStateMessage(boolean synchronous) { + private void publishSynchronousState() { long delay = lastStateTimeMs + STATE_THROTTLE_MS - System.currentTimeMillis(); if (delay > 0) { - if (synchronous) { - warn(String.format("State update delay %dms", delay)); - safeSleep(delay); - } else { - warn(String.format("State update defer %dms", delay)); - markStateDirty(delay); - return; + warn(String.format("State update delay %dms", delay)); + safeSleep(delay); + } + + CountDownLatch latch = new CountDownLatch(1); + publishStateMessage(latch); + + try { + info("Waiting for synchronous state send..."); + if (!latch.await(CONFIG_WAIT_TIME_SEC, TimeUnit.SECONDS)) { + error("Timeout waiting for synchronous state send"); } + } catch (Exception e) { + error("Exception while waiting for synchronous state send", e); } + } + + private void publishStateMessage(CountDownLatch latch) { deviceState.timestamp = new Date(); info(String.format("update state %s last_config %s", isoConvert(deviceState.timestamp), isoConvert(deviceState.system.last_config))); try { - debug(String.format("State update%s", getTestingTag(deviceConfig)), - OBJECT_MAPPER.writeValueAsString(deviceState)); + debug(String.format("State update%s", getTestingTag(deviceConfig)), toJson(deviceState)); } catch (Exception e) { throw new RuntimeException("While converting new device state", e); } @@ -1172,6 +1305,9 @@ private void publishStateMessage(boolean synchronous) { lastStateTimeMs = System.currentTimeMillis() + STATE_THROTTLE_MS; publishDeviceMessage(deviceState, () -> { lastStateTimeMs = System.currentTimeMillis(); + if (latch != null) { + latch.countDown(); + } }); } @@ -1180,21 +1316,22 @@ private void publishDeviceMessage(Object message) { } private void publishDeviceMessage(Object message, Runnable callback) { - if (mqttPublisher == null) { - warn("Ignoring publish message b/c connection is shutdown"); - return; - } String topic = MESSAGE_TOPIC_MAP.get(message.getClass()); if (topic == null) { error("Unknown message class " + message.getClass()); return; } - augmentDeviceMessage(message); - mqttPublisher.publish(configuration.deviceId, topic, message, callback); + augmentDeviceMessage(message); + synchronized (reconnectLock) { + if (!publisherActive()) { + throw new RuntimeException("Connection already shutdown, stopping."); + } + mqttPublisher.publish(configuration.deviceId, topic, message, callback); + } String messageBase = topic.replace("/", "_"); String fileName = traceTimestamp(messageBase) + ".json"; - File messageOut = new File(OUT_DIR, fileName); + File messageOut = new File(outDir, fileName); try { OBJECT_MAPPER.writeValue(messageOut, message); } catch (Exception e) { @@ -1222,6 +1359,10 @@ private void augmentDeviceMessage(Object message) { } } + private boolean publisherActive() { + return mqttPublisher != null && mqttPublisher.isActive(); + } + private void cloudLog(String message, Level level) { cloudLog(message, level, null); } @@ -1230,7 +1371,7 @@ private void cloudLog(String message, Level level, String detail) { String timestamp = getTimestamp(); localLog(message, level, timestamp, detail); - if (publishingLog || mqttPublisher == null) { + if (publishingLog || !publisherActive()) { return; } @@ -1286,6 +1427,10 @@ private void info(String message) { cloudLog(message, Level.INFO); } + private void notice(String message) { + cloudLog(message, Level.NOTICE); + } + private void warn(String message) { cloudLog(message, Level.WARNING); } @@ -1297,7 +1442,6 @@ private void error(String message) { private void error(String message, Throwable e) { String longMessage = message + ": " + e.getMessage(); cloudLog(longMessage, Level.ERROR); - trace(stackTraceString(e)); } static class ExtraPointsetEvent extends PointsetEvent { diff --git a/schema/common.json b/schema/common.json index b3c70a9e82..dfc7ad99c0 100644 --- a/schema/common.json +++ b/schema/common.json @@ -4,19 +4,19 @@ "definitions": { "mode": { "title": "System Mode", - "description": "Operating mode for the device. Defaults is 'active'.", + "description": "Operating mode for the device. Default is 'active'.", "enum": [ "initial", "active", - "restart" + "restart", + "shutdown" ] }, "phase": { "title": "BlobPhase", "description": "Phase for the management of a configuration blob.", "enum": [ - "initial", - "updating", + "apply", "final" ] }, diff --git a/schema/config_system.json b/schema/config_system.json index e1a711ead6..5304c879ed 100644 --- a/schema/config_system.json +++ b/schema/config_system.json @@ -22,6 +22,11 @@ "mode": { "$ref": "file:common.json#/definitions/mode" }, + "last_start": { + "description": "Last time a device with this id said it restarted: being later than status-supplied last_start indicates resource conflict.", + "type": "string", + "format": "date-time" + }, "testing": { "$ref": "file:config_system_testing.json#" } diff --git a/schema/state_system.json b/schema/state_system.json index 8754c7e6ec..e5b2dbc6de 100644 --- a/schema/state_system.json +++ b/schema/state_system.json @@ -28,6 +28,11 @@ "mode": { "$ref": "file:common.json#/definitions/mode" }, + "last_start": { + "description": "Last time the system started up.", + "type": "string", + "format": "date-time" + }, "serial_no": { "description": "The serial number of the physical device", "type": "string", diff --git a/tests/state.tests/blobset_received.json b/tests/state.tests/blobset_received.json index 7371bec27c..a3519e9e6a 100644 --- a/tests/state.tests/blobset_received.json +++ b/tests/state.tests/blobset_received.json @@ -25,7 +25,7 @@ } }, "_firmware_update": { - "phase": "updating", + "phase": "apply", "status": { "message": "Processing firmware update", "category": "blobset.blob.apply", @@ -34,7 +34,7 @@ } }, "_iot_endpoint_config": { - "phase": "initial", + "phase": "apply", "status": { "message": "Received endpoint config", "category": "blobset.blob.receive", diff --git a/tests/state.tests/blobset_updating.json b/tests/state.tests/blobset_updating.json index e5472b0f01..6ba2c6f4af 100644 --- a/tests/state.tests/blobset_updating.json +++ b/tests/state.tests/blobset_updating.json @@ -16,7 +16,7 @@ "blobset" : { "blobs" : { "_iot_endpoint_config" : { - "phase" : "updating" + "phase" : "apply" } } } diff --git a/validator/bin/build b/validator/bin/build index 3576b0d868..7c1395fed9 100755 --- a/validator/bin/build +++ b/validator/bin/build @@ -6,16 +6,24 @@ if [[ $1 == check ]]; then fi ROOT=$(dirname $0)/.. +BASE=.. +cd $ROOT -export JAVA_HOME=$JAVA_HOME_11_X64 +jarfile=build/libs/validator-1.0-SNAPSHOT-all.jar -cd $ROOT +newest=$(ls -rt `find src/ $BASE/gencode/java -type f` | tail -n 1) +if [[ -f $jarfile && $jarfile -nt $newest ]]; then + echo $jarfile up-to-date, skipping build. + exit 0 +fi + +export JAVA_HOME=$JAVA_HOME_11_X64 echo Building validataor in $PWD rm -rf build ./gradlew shadow $check -ls -l build/libs/validator-1.0-SNAPSHOT-all.jar +ls -l $jarfile echo Done with validator build. diff --git a/validator/bin/reflector b/validator/bin/reflector index eda35acf84..00617f9d9a 100755 --- a/validator/bin/reflector +++ b/validator/bin/reflector @@ -14,6 +14,7 @@ device_id=$3 shift 3 cd $ROOT +echo Building validator... validator/bin/build jarfile=validator/build/libs/validator-1.0-SNAPSHOT-all.jar