Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recover from configuration failures #151

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.aws.greengrass.clientdevices.auth.connectivity.CISShadowMonitor;
import com.aws.greengrass.clientdevices.auth.exception.CertificateChainLoadingException;
import com.aws.greengrass.clientdevices.auth.exception.CertificateGenerationException;
import com.aws.greengrass.clientdevices.auth.exception.InvalidConfigurationException;
import com.aws.greengrass.clientdevices.auth.helpers.CertificateTestHelpers;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.dependency.State;
Expand Down Expand Up @@ -52,16 +53,19 @@
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.security.KeyPair;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand All @@ -74,10 +78,12 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -128,21 +134,29 @@ void cleanup() {
kernel.shutdown();
}

// TODO: Consolidate this test helpers with ClientDevicesAuthServiceTest
private void givenNucleusRunningWithConfig(String configFileName) throws InterruptedException {
private void givenNucleusRunningWithConfig(String configFileName, Consumer<State> consumer) throws InterruptedException {
CountDownLatch authServiceRunning = new CountDownLatch(1);
kernel.parseArgs("-r", rootDir.toAbsolutePath().toString(), "-i",
getClass().getResource(configFileName).toString());
Objects.requireNonNull(getClass().getResource(configFileName)).toString());
kernel.getContext().addGlobalStateChangeListener((service, was, newState) -> {
if (ClientDevicesAuthService.CLIENT_DEVICES_AUTH_SERVICE_NAME.equals(service.getName()) && service.getState()
.equals(State.RUNNING)) {
authServiceRunning.countDown();
if (ClientDevicesAuthService.CLIENT_DEVICES_AUTH_SERVICE_NAME.equals(service.getName())) {
State serviceState = service.getState();
consumer.accept(serviceState);

if (serviceState.equals(State.RUNNING)) {
authServiceRunning.countDown();
}
}
});
kernel.launch();

assertThat(authServiceRunning.await(30L, TimeUnit.SECONDS), is(true));
}

private void givenNucleusRunningWithConfig(String configFileName) throws InterruptedException {
givenNucleusRunningWithConfig(configFileName, (State s) -> {});
}

private static Pair<X509Certificate[], KeyPair[]> givenRootAndIntermediateCA() throws NoSuchAlgorithmException,
CertificateException,
OperatorCreationException, CertIOException {
Expand Down Expand Up @@ -317,4 +331,47 @@ void GIVEN_managedCAConfiguration_WHEN_updatedToCustomCAConfiguration_THEN_serve
CertificateUpdateEvent event = eventRef.get();
assertTrue(CertificateTestHelpers.wasCertificateIssuedBy(intermediateCA, event.getCertificate()));
}

@Test
void GIVEN_invalidConfigServiceBroken_WHEN_whenCorrected_THEN_serviceCanRecover(ExtensionContext context)
throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, CertIOException,
URISyntaxException, InterruptedException, KeyLoadingException, ServiceUnavailableException,
CertificateChainLoadingException, ServiceLoadException, KeyStoreException {
ignoreExceptionOfType(context, InvalidConfigurationException.class);
ignoreExceptionOfType(context, URISyntaxException.class);
Pair<X509Certificate[], KeyPair[]> credentials = givenRootAndIntermediateCA();
X509Certificate[] chain = credentials.getLeft();
KeyPair[] certificateKeys = credentials.getRight();
KeyPair intermediateKeyPair = certificateKeys[0];

CountDownLatch authServiceBroken = new CountDownLatch(1);
CountDownLatch recoveredFromBroken = new CountDownLatch(1);
AtomicBoolean wasBroken = new AtomicBoolean(false);
Consumer<State> serviceStateChangeListener = (State s) -> {
if (s.equals(State.BROKEN)) {
wasBroken.getAndSet(true);
authServiceBroken.countDown();
}

if (wasBroken.get() && s.equals(State.RUNNING)) {
recoveredFromBroken.countDown();
}
};


givenNucleusRunningWithConfig("config.yaml", serviceStateChangeListener);
verify(certificateStoreSpy, times(1)).setCaKeyAndCertificateChain(any(), any(), any());

// Do enough bad operations until the service goes belly up
givenCDAWithCustomCertificateAuthority(new URI("file:///private.key"), new URI(""));
assertThat(authServiceBroken.await(10L, TimeUnit.SECONDS), is(true));

// Do the right thing
URI privateKeyUri = new URI("file:///private.key");
URI certificateUri = new URI("file:///certificate.pem");
when(securityServiceMock.getKeyPair(privateKeyUri, certificateUri)).thenReturn(intermediateKeyPair);
doReturn(chain).when(certificateStoreSpy).loadCaCertificateChain(privateKeyUri, certificateUri);
givenCDAWithCustomCertificateAuthority(privateKeyUri, certificateUri);
assertThat(recoveredFromBroken.await(10L, TimeUnit.SECONDS), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.aws.greengrass.clientdevices.auth.configuration.GroupConfiguration;
import com.aws.greengrass.clientdevices.auth.configuration.GroupManager;
import com.aws.greengrass.clientdevices.auth.connectivity.CISShadowMonitor;
import com.aws.greengrass.clientdevices.auth.exception.InvalidConfigurationException;
import com.aws.greengrass.clientdevices.auth.infra.NetworkState;
import com.aws.greengrass.clientdevices.auth.session.MqttSessionFactory;
import com.aws.greengrass.clientdevices.auth.session.SessionConfig;
Expand All @@ -26,6 +27,7 @@
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.dependency.ImplementsService;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.ipc.AuthorizeClientDeviceActionOperationHandler;
import com.aws.greengrass.ipc.GetClientDeviceAuthTokenOperationHandler;
import com.aws.greengrass.ipc.SubscribeToCertificateUpdatesOperationHandler;
Expand Down Expand Up @@ -70,7 +72,10 @@ public class ClientDevicesAuthService extends PluginService {
// Create a threadpool for calling the cloud. Single thread will be used by default.
private ThreadPoolExecutor cloudCallThreadPool;
private int cloudCallQueueSize;

private final Object configLock = new Object();
private CDAConfiguration cdaConfiguration;
private volatile boolean configurationErrored = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is necessary at all, but future reference, you could just synchronize on configurationErrored instead and then you also don't need volatile



/**
Expand Down Expand Up @@ -136,11 +141,38 @@ private void subscribeToConfigChanges() {
}

private void onConfigurationChanged() {
try {
cdaConfiguration = CDAConfiguration.from(cdaConfiguration, getConfig());
} catch (URISyntaxException e) {
serviceErrored(e);
// Note: The nucleus emits multiple configuration changed events, one per key that changed. It will also
// keep emitting them regardless of the state it is current in. If the configuration was incorrect, we want the
// service to error, but we don't want to check again until the nucleus has run the remediation steps (when the
// service errors, the nucleus will try to call shutdown -> install -> startup).
Comment on lines +146 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand this? My mental model is that we shouldn't need to care about what Nucleus is doing, and that we can rely entirely on configuration updates. If we are broken, and a configuration change fixes us, then request reinstall? Are there scenarios where that will not work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it should be possible to make this as simple as, on config change, if state is broken, then request reinstall.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbutler and I chatted about this offline - we are going to dig deeper and see if it is required

if (configurationErrored && !inState(State.BROKEN)) {
return;
}

// Note: Need to synchronize this block given multiple threads can be reading the value of configurationErrored
// and changing it.
synchronized (configLock) {
try {
CDAConfiguration configuration = CDAConfiguration.from(cdaConfiguration, getConfig());

if (configuration.isEqual(cdaConfiguration)) {
return;
}

cdaConfiguration = configuration;

// Good configuration and was previously broken
if (inState(State.BROKEN)) {
logger.info("Service is {} and configuration changed. Attempting to reinstall.", State.BROKEN);
configurationErrored = false;
requestReinstall();
}
} catch (URISyntaxException | InvalidConfigurationException e) {
configurationErrored = true;
serviceErrored(e);
}
}

}

private void configChangeHandler(WhatHappened whatHappened, Node node) {
Expand Down Expand Up @@ -183,6 +215,13 @@ private void configChangeHandler(WhatHappened whatHappened, Node node) {
protected void startup() throws InterruptedException {
context.get(CertificateManager.class).startMonitors();
super.startup();

synchronized (configLock) {
if (configurationErrored) {
configurationErrored = false;
onConfigurationChanged();
}
}
Comment on lines +219 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the issue was that if we are broken, then Nucleus doesn't call startup?

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.aws.greengrass.clientdevices.auth.configuration;

import com.aws.greengrass.clientdevices.auth.certificate.CertificateStore;
import com.aws.greengrass.clientdevices.auth.exception.InvalidConfigurationException;
import com.aws.greengrass.config.Topic;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.logging.api.Logger;
Expand All @@ -16,8 +17,10 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
Expand Down Expand Up @@ -60,15 +63,31 @@ private CAConfiguration(List<String> caTypes, CertificateStore.CAType caType,
* @param configurationTopics the configuration key of the service configuration
*
* @throws URISyntaxException if invalid certificateUri or privateKeyUri provided.
* @throws InvalidConfigurationException if provided privateKeyUri but not certificateUri
*/
public static CAConfiguration from(Topics configurationTopics) throws URISyntaxException {
public static CAConfiguration from(Topics configurationTopics) throws URISyntaxException,
InvalidConfigurationException {
Topics certAuthorityTopic = configurationTopics.lookupTopics(CERTIFICATE_AUTHORITY_TOPIC);

Optional<URI> privateKeyUri = getCaPrivateKeyUriFromConfiguration(certAuthorityTopic);
Optional<URI> certificateUri = getCaCertificateUriFromConfiguration(certAuthorityTopic);

if (privateKeyUri.isPresent() != certificateUri.isPresent()) {
throw new InvalidConfigurationException(
MessageFormat.format(
"{0} and {1} must have a value. Provided {0}:{2} and {1}:{3}",
CA_PRIVATE_KEY_URI, CA_CERTIFICATE_URI,
privateKeyUri.orElse(new URI("")),
certificateUri.orElse(new URI(""))
)
);
}

return new CAConfiguration(
getCaTypeListFromConfiguration(configurationTopics),
getCaTypeFromConfiguration(configurationTopics),
getCaPrivateKeyUriFromConfiguration(certAuthorityTopic),
getCaCertificateUriFromConfiguration(certAuthorityTopic)
privateKeyUri,
certificateUri
);
}

Expand Down Expand Up @@ -145,4 +164,20 @@ private static Optional<URI> getCaCertificateUriFromConfiguration(Topics certAut

return Optional.of(getUri(certificateUri));
}

/**
* Checks if the CAConfiguration is equal to another.
*
* @param config a CAConfiguration
*/
public boolean isEqual(CAConfiguration config) {
if (config == null) {
return false;
}

return Objects.equals(config.getCertificateUri(), certificateUri)
&& Objects.equals(config.getPrivateKeyUri(), privateKeyUri)
&& Objects.equals(config.getCaType(), caType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import com.aws.greengrass.clientdevices.auth.api.DomainEvents;
import com.aws.greengrass.clientdevices.auth.certificate.CertificateStore;
import com.aws.greengrass.clientdevices.auth.certificate.events.CAConfigurationChanged;
import com.aws.greengrass.clientdevices.auth.exception.InvalidConfigurationException;
import com.aws.greengrass.config.Topics;
import lombok.Getter;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
Expand Down Expand Up @@ -47,24 +48,27 @@
public final class CDAConfiguration {

private final RuntimeConfiguration runtime;
private final CAConfiguration ca;
@Getter
private final CAConfiguration caConfig;
private final DomainEvents domainEvents;

private CDAConfiguration(DomainEvents domainEvents, RuntimeConfiguration runtime, CAConfiguration ca) {
this.domainEvents = domainEvents;
this.runtime = runtime;
this.ca = ca;
this.caConfig = ca;
}

/**
* Creates the CDA (Client Device Auth) Service configuration. And allows it to be available in the context
* with the updated values
* Creates the CDA (Client Device Auth) Service configuration.
*
* @param existingConfig an existing version of the CDAConfiguration
* @param topics configuration topics from GG
*
* @throws URISyntaxException if invalid URI inside the configuration
* @throws InvalidConfigurationException if a part of the configuration is invalid
*/
public static CDAConfiguration from(CDAConfiguration existingConfig, Topics topics) throws URISyntaxException {
public static CDAConfiguration from(CDAConfiguration existingConfig, Topics topics) throws URISyntaxException,
InvalidConfigurationException {
Topics runtimeTopics = topics.lookupTopics(RUNTIME_STORE_NAMESPACE_TOPIC);
Topics serviceConfiguration = topics.lookupTopics(CONFIGURATION_CONFIG_KEY);

Expand All @@ -85,20 +89,22 @@ public static CDAConfiguration from(CDAConfiguration existingConfig, Topics topi
* Creates the CDA (Client Device Auth) Service configuration.
*
* @param topics configuration topics from GG
*
* @throws URISyntaxException if invalid URI inside the configuration
* @throws InvalidConfigurationException if a part of the configuration is invalid
*/
public static CDAConfiguration from(Topics topics) throws URISyntaxException {
public static CDAConfiguration from(Topics topics) throws URISyntaxException, InvalidConfigurationException {
return from(null, topics);
}

private void triggerChanges(CDAConfiguration current, CDAConfiguration prev) {
if (hasCAConfigurationChanged(prev)) {
if (prev == null || !caConfig.isEqual(prev.getCaConfig())) {
domainEvents.emit(new CAConfigurationChanged(current));
}
}

public boolean isUsingCustomCA() {
return ca.isUsingCustomCA();
return caConfig.isUsingCustomCA();
}

public String getCaPassphrase() {
Expand All @@ -114,30 +120,28 @@ public void updateCACertificates(List<String> caCertificates) {
}

public CertificateStore.CAType getCaType() {
return ca.getCaType();
return caConfig.getCaType();
}

public Optional<URI> getPrivateKeyUri() {
return ca.getPrivateKeyUri();
return caConfig.getPrivateKeyUri();
}

public Optional<URI> getCertificateUri() {
return ca.getCertificateUri();
return caConfig.getCertificateUri();
}

/**
* Verifies if the configuration for the certificateAuthority has changed, given a previous
* configuration.
* Verifies if the configuration for the certificateAuthority is equal to another CDA configuration.
*
* @param config CDAConfiguration
* @param configuration CDAConfiguration
*/
private boolean hasCAConfigurationChanged(CDAConfiguration config) {
if (config == null) {
return true;
public boolean isEqual(CDAConfiguration configuration) {
if (configuration == null) {
return false;
}

return !Objects.equals(config.getCertificateUri(), getCertificateUri())
|| !Objects.equals(config.getPrivateKeyUri(), getPrivateKeyUri())
|| !Objects.equals(config.getCaType(), getCaType());
// TODO: As we add more configurations here we should change the equality comparison.
return caConfig.isEqual(configuration.getCaConfig());
}
}
Loading