Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package com.sun.javatest.regtest.agent;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* Handles stdout/stderr process output from the agent.
*/
public class AgentProcessLogger {

/**
* Constructs a thread pool to handle agent process output
* and creates stdout and stderr readers
*
* @param p agent process
*/
public AgentProcessLogger(Process p) {
executorService = Executors.newFixedThreadPool(2, runnable -> {
Thread th = new Thread(runnable);
th.setDaemon(true);
return th;
});
stdOut = new BufferedReader(new InputStreamReader(p.getInputStream()));
stdErr = new BufferedReader(new InputStreamReader(p.getErrorStream()));
}

/**
* Starts logging output and error streams to the specified consumer
*
* @param logConsumer log consumer, has two parameters - stream name and
* the log line
*/
public void startLogging(BiConsumer<String, String> logConsumer,
Map<String, PrintWriter> processStreamWriters,
Function<String, PrintWriter> mappingFunction) {
if (inputDone != null || errorDone != null) {
throw new RuntimeException("call stopLogging first");
}
if (processStreamWriters != null) {
processStreamWriters.computeIfAbsent("stdout", mappingFunction);
processStreamWriters.computeIfAbsent("stderr", mappingFunction);
}
inputDone = executorService.submit(() -> captureLog("stdout", stdOut, logConsumer));
errorDone = executorService.submit(() -> captureLog("stderr", stdErr, logConsumer));
}


/**
* Waits for the logging tasks to finish
*
* @param timeout shutdown timeout
* @param timeUnit shutdown time unit
*
* @throws ExecutionException the logger threw an unexpected exception
* @throws InterruptedException the logger was interrupted
* @throws TimeoutException logging task failed to stop within 60 seconds
*/
public void stopLogging(int timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
inputDone.get(timeout, timeUnit);
errorDone.get(timeout, timeUnit);
inputDone = null;
errorDone = null;
}

/**
* Wait for logging tasks to finish and shutdown the thread pool
*
* @param timeout shutdown timeout
* @param timeUnit shutdown time unit
*/
public void shutdown(int timeout, TimeUnit timeUnit) {
try {
stopLogging(timeout, timeUnit);
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
// ignore exception, the process is terminating
}
executorService.shutdown();
}

/**
* Forward log lines to the consumer, stop forwarding on the separator
* line
*
* @param streamName name of the stream
* @param reader process's stream reader
*/
private Void captureLog(String streamName, BufferedReader reader, BiConsumer<String, String> consumer) {
try {
String line;
while ((line = reader.readLine()) != null) {
int endMarker = line.indexOf(AgentServer.PROCESS_OUTPUT_SEPARATOR);
if (endMarker < 0) {
consumer.accept(streamName, line);
continue;
}
if (endMarker > 0) {
line = line.substring(0, endMarker);
consumer.accept(streamName, line);
}
break;
}
} catch (IOException ex) {
// ignore the exception, the reader might be closed
}
return null;
}

private final ExecutorService executorService;
private final BufferedReader stdOut;
private final BufferedReader stdErr;
private Future<Void> inputDone;
private Future<Void> errorDone;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Writer;
Expand Down Expand Up @@ -93,6 +95,9 @@ public static void main(String... args) {
public static final byte KEEPALIVE = 5;
public static final byte CLOSE = 6;

public static final String PROCESS_OUTPUT_SEPARATOR =
"------ This line is the stdout/stderr output separator ------";

/**
* Send KEEPALIVE bytes periodically to a stream.
* The bytes are written every {@code WRITE_TIMEOUT} milliseconds.
Expand Down Expand Up @@ -236,6 +241,7 @@ public void run() throws IOException {
try {
int op;
while ((op = in.read()) != -1) {
writeProcessOutputSeparator();
switch (op) {
case DO_COMPILE:
doCompile();
Expand All @@ -252,6 +258,8 @@ public void run() throws IOException {
throw new Error("Agent.Server: unexpected op: " + op);
}
out.flush();
// signal end of section output for the log writer
writeProcessOutputSeparator();
}
} finally {
keepAlive.finished();
Expand All @@ -260,6 +268,20 @@ public void run() throws IOException {
}
}

private void writeProcessOutputSeparator() {
try {
processStdOut.write(PROCESS_OUTPUT_SEPARATOR);
processStdOut.write(System.lineSeparator());
processStdOut.flush();
processStdErr.write(PROCESS_OUTPUT_SEPARATOR);
processStdErr.write(System.lineSeparator());
processStdErr.flush();
}
catch (IOException e ){
// ignore exception as the agent process may be killed
}
}

private void doCompile() throws IOException {
if (traceServer) {
traceOut.println("Agent.Server.doCompile");
Expand Down Expand Up @@ -388,7 +410,8 @@ void log(String message) {
private final PrintWriter logWriter;
private final int id;
private final Map<OutputKind, Writer> writers = new EnumMap<>(OutputKind.class);

private final OutputStreamWriter processStdOut = new FileWriter(FileDescriptor.out);
private final OutputStreamWriter processStdErr = new FileWriter(FileDescriptor.err);
/**
* Create an output stream for output to be sent back to the client via the server connection.
* @param kind the kind of stream
Expand Down
83 changes: 35 additions & 48 deletions src/share/classes/com/sun/javatest/regtest/exec/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
package com.sun.javatest.regtest.exec;


import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.InetAddress;
Expand All @@ -60,13 +57,16 @@
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.sun.javatest.Status;
import com.sun.javatest.TestResult;
import com.sun.javatest.WorkDirectory;
import com.sun.javatest.regtest.TimeoutHandler;
import com.sun.javatest.regtest.agent.ActionHelper;
import com.sun.javatest.regtest.agent.AgentProcessLogger;
import com.sun.javatest.regtest.agent.AgentServer;
import com.sun.javatest.regtest.agent.Alarm;
import com.sun.javatest.regtest.agent.Flags;
Expand Down Expand Up @@ -186,8 +186,9 @@ private Agent(File dir, JDK jdk, List<String> vmOpts, Map<String, String> envVar
env.clear();
env.putAll(envVars);
agentServerProcess = process = pb.start();
copyAgentProcessStream("stdout", process.getInputStream());
copyAgentProcessStream("stderr", process.getErrorStream());

processLogger = new AgentProcessLogger(process);
startAgentLog();

try {
final int ACCEPT_TIMEOUT = (int) (60 * 1000 * timeoutFactor);
Expand Down Expand Up @@ -219,29 +220,11 @@ private Agent(File dir, JDK jdk, List<String> vmOpts, Map<String, String> envVar
}

/**
* Reads the output written by an agent process, and copies it either to
* the current TestResult object (when one is available) or to the agent's
* log file, if output is found while there is no test using the agent.
*
* @param name the name of the stream
* @param in the stream
* Writes process input and error stream to the agent log.
*/
void copyAgentProcessStream(final String name, final InputStream in) {
Thread t = new Thread() {
@Override
public void run() {
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = inReader.readLine()) != null) {
handleProcessStreamLine(name, line);
}
} catch (IOException e) {
// ignore
}
}
};
t.setDaemon(true);
t.start();
private void startAgentLog() {
processLogger.startLogging( (String stream, String logLine) -> log(stream + ": " + logLine),
null, null);
}

/**
Expand Down Expand Up @@ -275,30 +258,24 @@ public void run() {
*
* @param section the test result section to be used, or {@code null}
*/
private synchronized void captureProcessStreams(TestResult.Section section) {
private synchronized void captureProcessStreams(TestResult.Section section, int timeout, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
processLogger.stopLogging(timeout, timeUnit);
currentTestResultSection = section;
if (currentTestResultSection == null) {
for (PrintWriter pw : processStreamWriters.values()) {
pw.close();
}
processStreamWriters.clear();
}
}

/**
* Saves a line of output that was written by the agent to stdout (fd1) or stderr (fd2).
* If there is a current test result section, the line is saved there;
* otherwise it is written to the agent log file.
*
* @param name the name of the stream from which the line was read
* @param line the line that was read
*/
private synchronized void handleProcessStreamLine(String name, String line) {
if (currentTestResultSection == null) {
log(name + ": " + line);
startAgentLog();
} else {
processStreamWriters.computeIfAbsent(name, currentTestResultSection::createOutput)
.println(line);
processLogger.startLogging((String name, String line) -> {
Objects.requireNonNull(currentTestResultSection);
Objects.requireNonNull(processStreamWriters);
Objects.requireNonNull(name);
Objects.requireNonNull(line);
processStreamWriters.get(name).println(line);
}, processStreamWriters, currentTestResultSection::createOutput);
}
}

Expand Down Expand Up @@ -409,18 +386,25 @@ public void run() {
Status actionStatus = null;
keepAlive.setEnabled(false);
try {
captureProcessStreams(trs);
synchronized (out) {
agentAction.send();
}
// The agent sends process output separator in response
// to receiving a command. Wait for the separator and
// redirect log to the test result section
captureProcessStreams(trs, timeout, TimeUnit.SECONDS);
trace(actionName + ": request sent");
actionStatus = readResults(trs);
// The agent will be disposed on exception.
// Reset the agent log only if the agent can be reused.
// The agent will send process output separator on
// command execution.
captureProcessStreams(null, timeout, TimeUnit.SECONDS);
return actionStatus;
} catch (IOException e) {
} catch (InterruptedException | TimeoutException | ExecutionException | IOException e) {
trace(actionName + ": error " + e);
throw new Fault(e);
} finally {
captureProcessStreams(null);
alarm.cancel();
keepAlive.setEnabled(true);
if (alarm.didFire()) {
Expand Down Expand Up @@ -509,7 +493,9 @@ public void close() {
alarm.cancel();
Thread.interrupted(); // clear any interrupted status
}

// Ensure that thread pool threads are shut down
// and the agent log is fully written
processLogger.shutdown(60, TimeUnit.SECONDS);
log("Closed");
}

Expand Down Expand Up @@ -626,6 +612,7 @@ private void log(String message, PrintStream out) {
final List<String> vmOpts;
final File execDir;
final Process process;
final AgentProcessLogger processLogger;
final DataInputStream in;
final DataOutputStream out;
final KeepAlive keepAlive;
Expand Down