Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,41 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription("Enable master & slave mode, which supports horizontal scaling of AMS.");

public static final ConfigOption<Integer> BUCKET_ID_TOTAL_COUNT =
ConfigOptions.key("bucket-id.total-count")
.intType()
.defaultValue(100)
.withDescription(
"Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value.");

public static final ConfigOption<Duration> NODE_OFFLINE_TIMEOUT =
ConfigOptions.key("node-offline.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned.");

public static final ConfigOption<Duration> ASSIGN_INTERVAL =
ConfigOptions.key("bucket-assign.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");

public static final ConfigOption<Duration> BUCKET_TABLE_SYNC_INTERVAL =
ConfigOptions.key("bucket-table-sync.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class AmoroServiceContainer {
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);

public static final String SERVER_CONFIG_FILENAME = "config.yaml";
private static boolean IS_MASTER_SLAVE_MODE = false;

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
Expand All @@ -110,6 +113,7 @@ public class AmoroServiceContainer {
private TServer optimizingServiceServer;
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
private AmsAssignService amsAssignService;

public AmoroServiceContainer() throws Exception {
initConfig();
Expand All @@ -128,15 +132,22 @@ public static void main(String[] args) {
LOG.info("AMS service has been shut down");
}));
service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
if (IS_MASTER_SLAVE_MODE) {
// Even if one does not become the master, it cannot block the subsequent logic.
service.registAndElect();
// Regardless of whether tp becomes the master, the service needs to be activated.
service.startOptimizingService();
} else {
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
}
}
}
} catch (Throwable t) {
Expand All @@ -145,6 +156,10 @@ public static void main(String[] args) {
}
}

public void registAndElect() throws Exception {
haContainer.registAndElect();
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand All @@ -171,8 +186,25 @@ public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();

// In master-slave mode, create BucketAssignStore and AmsAssignService
BucketAssignStore bucketAssignStore = null;
if (IS_MASTER_SLAVE_MODE && haContainer != null && haContainer.getZkClient() != null) {
String clusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
bucketAssignStore = new ZkBucketAssignStore(haContainer.getZkClient(), clusterName);
// Create and start AmsAssignService for bucket assignment
amsAssignService =
new AmsAssignService(haContainer, serviceConfig, haContainer.getZkClient());
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
}

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
new DefaultTableService(
serviceConfig,
catalogManager,
tableRuntimeFactoryManager,
haContainer,
bucketAssignStore);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
Expand Down Expand Up @@ -214,6 +246,11 @@ public void disposeOptimizingService() {
LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing());
optimizingServiceServer.stop();
}
if (amsAssignService != null) {
LOG.info("Stopping AmsAssignService...");
amsAssignService.stop();
amsAssignService = null;
}
if (tableService != null) {
LOG.info("Stopping table service...");
tableService.dispose();
Expand Down Expand Up @@ -256,6 +293,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
}

private void startThriftService() {
Expand Down
Loading