Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e527acb
[Subtask]: Use a new configuration item to control whether master & s…
Oct 29, 2025
064f4ea
[Subtask]: Use a new configuration item to control whether master & s…
Oct 30, 2025
879e398
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
bb1593b
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
2c60a4d
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
a3acd97
[Subtask]: Replace zk with mocking. #3919
Nov 10, 2025
b635c0d
[Subtask]: Replace zk with mocking. #3919
Nov 10, 2025
3335790
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 10, 2025
79a1d85
Merge branch 'amoro#3919' into amoro#3921
Nov 10, 2025
80ba8f2
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 10, 2025
0c91452
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 11, 2025
f2ecc06
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 12, 2025
b1da165
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
9fdda5c
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
d4d9073
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
d0cb6b2
[Subtask]: Fix unit test failure issue #3923
Nov 14, 2025
cf7b3ad
[Subtask]: In master-slave mode, each AMS should automatically senses…
Nov 14, 2025
4ec6be4
[Subtask]: Modify the optimizer to support obtaining tasks from each …
Nov 17, 2025
62d2af1
[Subtask]: Modify the optimizer to support obtaining tasks from each …
Nov 17, 2025
8506dd5
[Subtask]: Optimize the logic for retrieving the AMS list from ZooKee…
Nov 19, 2025
06cbb06
[Subtask]: Supports forwarding OpenAPI requests to the master node in…
Nov 25, 2025
7c99b2a
Merge remote-tracking branch 'origin/master' into amoro#3963
Nov 25, 2025
8aad41c
[Subtask]: Fixing conflicts and unit test case failures. #3963
Nov 25, 2025
b19685e
[Subtask]: Fix unit test case failures. #3963
Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription("Enable master & slave mode, which supports horizontal scaling of AMS.");

public static final ConfigOption<Integer> BUCKET_ID_TOTAL_COUNT =
ConfigOptions.key("bucket-id.total-count")
.intType()
.defaultValue(100)
.withDescription(
"Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value.");

public static final ConfigOption<Duration> NODE_OFFLINE_TIMEOUT =
ConfigOptions.key("node-offline.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned.");

public static final ConfigOption<Duration> ASSIGN_INTERVAL =
ConfigOptions.key("bucket-assign.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");

public static final ConfigOption<Duration> BUCKET_TABLE_SYNC_INTERVAL =
ConfigOptions.key("bucket-table-sync.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down Expand Up @@ -525,4 +560,48 @@ public class AmoroManagementConf {
public static final String METRIC_REPORTERS = "metric-reports";

public static final String EVENT_LISTENERS = "event-listeners";

public static final ConfigOption<Duration> REQUEST_FORWARDER_TIMEOUT =
ConfigOptions.key("request-forwarder.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription("Timeout duration for request forwarding to leader node.");

public static final ConfigOption<Integer> REQUEST_FORWARDER_MAX_RETRIES =
ConfigOptions.key("request-forwarder.max-retries")
.intType()
.defaultValue(3)
.withDescription("Maximum number of retry attempts for request forwarding.");

public static final ConfigOption<Duration> REQUEST_FORWARDER_RETRY_BACKOFF =
ConfigOptions.key("request-forwarder.retry-backoff")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("Backoff duration between retry attempts for request forwarding.");

public static final ConfigOption<Integer> REQUEST_FORWARDER_CIRCUIT_BREAKER_THRESHOLD =
ConfigOptions.key("request-forwarder.circuit-breaker.threshold")
.intType()
.defaultValue(5)
.withDescription("Number of consecutive failures before opening the circuit breaker.");

public static final ConfigOption<Duration> REQUEST_FORWARDER_CIRCUIT_BREAKER_TIMEOUT =
ConfigOptions.key("request-forwarder.circuit-breaker.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription(
"Timeout duration for circuit breaker to remain open before attempting to close.");

public static final ConfigOption<Integer> REQUEST_FORWARDER_MAX_CONNECTIONS =
ConfigOptions.key("request-forwarder.max-connections")
.intType()
.defaultValue(100)
.withDescription("Maximum number of connections for request forwarding HTTP client.");

public static final ConfigOption<Integer> REQUEST_FORWARDER_MAX_CONNECTIONS_PER_ROUTE =
ConfigOptions.key("request-forwarder.max-connections-per-route")
.intType()
.defaultValue(20)
.withDescription(
"Maximum number of connections per route for request forwarding HTTP client.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
Expand All @@ -33,6 +35,7 @@
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.RequestForwardedException;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
Expand Down Expand Up @@ -96,6 +99,7 @@ public class AmoroServiceContainer {
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);

public static final String SERVER_CONFIG_FILENAME = "config.yaml";
private static boolean IS_MASTER_SLAVE_MODE = false;

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
Expand All @@ -110,6 +114,7 @@ public class AmoroServiceContainer {
private TServer optimizingServiceServer;
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
private AmsAssignService amsAssignService;

public AmoroServiceContainer() throws Exception {
initConfig();
Expand All @@ -128,15 +133,22 @@ public static void main(String[] args) {
LOG.info("AMS service has been shut down");
}));
service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
if (IS_MASTER_SLAVE_MODE) {
// Even if one does not become the master, it cannot block the subsequent logic.
service.registAndElect();
// Regardless of whether tp becomes the master, the service needs to be activated.
service.startOptimizingService();
} else {
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
}
}
}
} catch (Throwable t) {
Expand All @@ -145,6 +157,10 @@ public static void main(String[] args) {
}
}

public void registAndElect() throws Exception {
haContainer.registAndElect();
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand All @@ -171,11 +187,29 @@ public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();

// In master-slave mode, create BucketAssignStore and AmsAssignService
BucketAssignStore bucketAssignStore = null;
if (IS_MASTER_SLAVE_MODE && haContainer != null && haContainer.getZkClient() != null) {
String clusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
bucketAssignStore = new ZkBucketAssignStore(haContainer.getZkClient(), clusterName);
// Create and start AmsAssignService for bucket assignment
amsAssignService =
new AmsAssignService(haContainer, serviceConfig, haContainer.getZkClient());
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
}

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
new DefaultTableService(
serviceConfig,
catalogManager,
tableRuntimeFactoryManager,
haContainer,
bucketAssignStore);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
new DefaultOptimizingService(
serviceConfig, catalogManager, optimizerManager, tableService, haContainer);

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down Expand Up @@ -214,6 +248,11 @@ public void disposeOptimizingService() {
LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing());
optimizingServiceServer.stop();
}
if (amsAssignService != null) {
LOG.info("Stopping AmsAssignService...");
amsAssignService.stop();
amsAssignService = null;
}
if (tableService != null) {
LOG.info("Stopping table service...");
tableService.dispose();
Expand Down Expand Up @@ -256,6 +295,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
}

private void startThriftService() {
Expand All @@ -271,10 +311,58 @@ private void startThriftServer(TServer server, String threadName) {
}

private void initHttpService() {
// Create request forwarder for master-slave mode
org.apache.amoro.server.dashboard.RequestForwarder requestForwarder = null;
if (haContainer != null && haContainer.isMasterSlaveMode()) {
// Get configuration values for request forwarder
int timeoutMs =
(int) serviceConfig.get(AmoroManagementConf.REQUEST_FORWARDER_TIMEOUT).toMillis();
int maxRetries = serviceConfig.getInteger(AmoroManagementConf.REQUEST_FORWARDER_MAX_RETRIES);
int retryBackoffMs =
(int) serviceConfig.get(AmoroManagementConf.REQUEST_FORWARDER_RETRY_BACKOFF).toMillis();
int circuitBreakerThreshold =
serviceConfig.getInteger(AmoroManagementConf.REQUEST_FORWARDER_CIRCUIT_BREAKER_THRESHOLD);
long circuitBreakerTimeoutMs =
serviceConfig
.get(AmoroManagementConf.REQUEST_FORWARDER_CIRCUIT_BREAKER_TIMEOUT)
.toMillis();
int maxConnections =
serviceConfig.getInteger(AmoroManagementConf.REQUEST_FORWARDER_MAX_CONNECTIONS);
int maxConnectionsPerRoute =
serviceConfig.getInteger(AmoroManagementConf.REQUEST_FORWARDER_MAX_CONNECTIONS_PER_ROUTE);

requestForwarder =
new org.apache.amoro.server.dashboard.RequestForwarder(
haContainer,
timeoutMs,
maxRetries,
retryBackoffMs,
circuitBreakerThreshold,
circuitBreakerTimeoutMs);

LOG.info(
"Request forwarder initialized with configuration: timeout={}ms, maxRetries={}, "
+ "retryBackoff={}ms, circuitBreakerThreshold={}, circuitBreakerTimeout={}ms, "
+ "maxConnections={}, maxConnectionsPerRoute={}",
timeoutMs,
maxRetries,
retryBackoffMs,
circuitBreakerThreshold,
circuitBreakerTimeoutMs,
maxConnections,
maxConnectionsPerRoute);
}

DashboardServer dashboardServer =
new DashboardServer(
serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);
serviceConfig,
catalogManager,
tableManager,
optimizerManager,
terminalManager,
requestForwarder);
RestCatalogService restCatalogService =
new RestCatalogService(catalogManager, tableManager, requestForwarder);

httpServer =
Javalin.create(
Expand Down Expand Up @@ -304,6 +392,79 @@ private void initHttpService() {
dashboardServer.preHandleRequest(ctx);
}
});

// Handle RequestForwardedException - request was successfully forwarded to leader
// This must be registered before the generic Exception handler
httpServer.exception(
RequestForwardedException.class,
(e, ctx) -> {
// Request was forwarded, response data is stored in the exception
// Re-apply response data to ensure it's not lost during exception handling
if (e.hasResponseData()) {
// Set status code
ctx.status(e.getStatusCode());

// Set response body
byte[] responseBody = e.getResponseBody();
if (responseBody != null) {
// For 204/304, don't set body
if (e.getStatusCode() != 204 && e.getStatusCode() != 304) {
ctx.result(responseBody);
}
} else if (e.getStatusCode() != 204
&& e.getStatusCode() != 304
&& ctx.path().startsWith("/api/")) {
// Fallback: set empty JSON if body is null for API endpoints
ctx.result("{}".getBytes(java.nio.charset.StandardCharsets.UTF_8));
}

// Set content type
String contentType = e.getContentType();
if (contentType != null && !contentType.isEmpty()) {
ctx.contentType(contentType);
} else if (e.getStatusCode() != 204
&& e.getStatusCode() != 304
&& ctx.path().startsWith("/api/")) {
ctx.contentType("application/json");
}

// Set response headers
if (e.getResponseHeaders() != null) {
for (java.util.Map.Entry<String, String> entry : e.getResponseHeaders().entrySet()) {
ctx.header(entry.getKey(), entry.getValue());
}
}

LOG.debug(
"RequestForwardedException handled: restored response status {}, body-size: {}, content-type: {} for path: {}",
e.getStatusCode(),
responseBody != null ? responseBody.length : 0,
contentType,
ctx.path());
} else {
// Response data not available in exception, check if it was set in context
int currentStatus = ctx.status();
if (currentStatus > 0) {
LOG.debug(
"RequestForwardedException caught: response already set in context, status: {} for path: {}",
currentStatus,
ctx.path());
} else {
LOG.error(
"RequestForwardedException caught but no response data available! Path: {}",
ctx.path());
// Set a proper error response
ctx.status(io.javalin.http.HttpCode.INTERNAL_SERVER_ERROR);
ctx.contentType("application/json");
ctx.json(
new org.apache.amoro.server.dashboard.response.ErrorResponse(
io.javalin.http.HttpCode.INTERNAL_SERVER_ERROR,
"Request forwarding completed but response was not properly set",
""));
}
}
});

httpServer.exception(
Exception.class,
(e, ctx) -> {
Expand Down Expand Up @@ -534,6 +695,12 @@ private void initContainerConfig() {
containerProperties.putIfAbsent(
OptimizerProperties.AMS_OPTIMIZER_URI,
AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME));
// Add master-slave mode flag to container properties
// Read from serviceConfig directly since IS_MASTER_SLAVE_MODE is set after
// initContainerConfig()
if (serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE)) {
containerProperties.put(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE, "true");
}
// put addition system properties
container.setProperties(containerProperties);
containerList.add(container);
Expand Down
Loading