diff --git a/.classpath b/.classpath
new file mode 100644
index 0000000..aff554c
--- /dev/null
+++ b/.classpath
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/.project b/.project
new file mode 100644
index 0000000..a7c35d6
--- /dev/null
+++ b/.project
@@ -0,0 +1,23 @@
+
+
+ Phancisth's Fork
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+ org.eclipse.m2e.core.maven2Builder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+ org.eclipse.m2e.core.maven2Nature
+
+
diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs
new file mode 100644
index 0000000..7a53139
--- /dev/null
+++ b/.settings/org.eclipse.core.resources.prefs
@@ -0,0 +1,3 @@
+eclipse.preferences.version=1
+encoding/=UTF-8
+encoding/src=UTF-8
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..a3b98fd
--- /dev/null
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,8 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
+org.eclipse.jdt.core.compiler.compliance=1.7
+org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
+org.eclipse.jdt.core.compiler.release=disabled
+org.eclipse.jdt.core.compiler.source=1.7
diff --git a/src/lia/util/net/copy/FDTWriterSession.java b/src/lia/util/net/copy/FDTWriterSession.java
index 3b4a652..73d6576 100644
--- a/src/lia/util/net/copy/FDTWriterSession.java
+++ b/src/lia/util/net/copy/FDTWriterSession.java
@@ -171,8 +171,7 @@ private void finalCleanup() {
sb.append("\n Started: ").append(new Date(startTimeMillis));
sb.append("\n Ended: ").append(endDate);
long period = System.nanoTime() - startTimeNanos;
- sb.append("\n Transfer period: ")
- .append(Utils.getETA(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTimeNanos)));
+ sb.append("\n Transfer period: ").append(Utils.getETA(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTimeNanos)));
sb.append("\n TotalBytes: ").append(getTotalBytes());
long utilBytes = 0;
if (transportProvider != null) {
@@ -181,15 +180,12 @@ private void finalCleanup() {
try {
if (!Utils.updateTotalReadCounter(transportProvider.getUtilBytes())) {
if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- " [ FDTWriterSession ] Unable to update the contor in the update file.");
+ logger.log(Level.FINEST," [ FDTWriterSession ] Unable to update the contor in the update file.");
}
}
} catch (Throwable tu) {
if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- " [ FDTWriterSession ] Unable to update the contor in the update file. Cause: ",
- tu);
+ logger.log(Level.FINEST," [ FDTWriterSession ] Unable to update the contor in the update file. Cause: ", tu);
}
} finally {
transportProvider.close(downMessage(), downCause());
@@ -206,31 +202,25 @@ private void finalCleanup() {
}
if (config.getMonitor().equals(Config.OPENTSDB)) {
MonitoringUtils monUtils = new MonitoringUtils(config, this);
- monUtils.monitorEndStats(((downCause() == null) && (downMessage() == null)), getTotalBytes(), utilBytes,
- startTimeMillis, endDate.getTime(), period, "Writers");
+ monUtils.monitorEndStats(((downCause() == null) && (downMessage() == null)), getTotalBytes(), utilBytes, startTimeMillis, endDate.getTime(), period, "Writers");
}
} catch (Throwable t) {
- logger.log(Level.WARNING,
- "[ FDTWriterSession ] [ finalCleanup ] [ HANDLED ] Exception getting final statistics. Smth went wrong! Cause: ",
- t);
+ logger.log(Level.WARNING, "[ FDTWriterSession ] [ finalCleanup ] [ HANDLED ] Exception getting final statistics. Smth went wrong! Cause: ", t);
}
try {
if (dwm.removeSession(this, downMessage(), downCause())) {
if (isFine) {
- logger.log(Level.FINE,
- "[ FDTWriterSession ] [ finalCleanup ] Successfully removing session from DiskWriterManager");
+ logger.log(Level.FINE, "[ FDTWriterSession ] [ finalCleanup ] Successfully removing session from DiskWriterManager");
}
} else {
if (isFine) {
- logger.log(Level.FINE,
- "[ FDTWriterSession ] [ finalCleanup ] Removing session from DiskWriterManager returned FALSE should have been true!!");
+ logger.log(Level.FINE, "[ FDTWriterSession ] [ finalCleanup ] Removing session from DiskWriterManager returned FALSE should have been true!!");
}
}
} catch (Throwable fine) {
if (isFine) {
- logger.log(Level.FINE, "[ FDTWriterSession ] [ finalCleanup ] exception removing session. Cause:",
- fine);
+ logger.log(Level.FINE, "[ FDTWriterSession ] [ finalCleanup ] exception removing session. Cause:", fine);
}
}
@@ -240,16 +230,14 @@ private void finalCleanup() {
fileSession.close(downMessage(), downCause());
} catch (Throwable ign) {
if (isFinest) {
- logger.log(Level.FINEST, "[ FDTWriterSession ] [ finalCleanup ] closing file session: "
- + fileSession + " got exception. Cause: ", ign);
+ logger.log(Level.FINEST, "[ FDTWriterSession ] [ finalCleanup ] closing file session: " + fileSession + " got exception. Cause: ", ign);
}
}
}
} catch (Throwable ignOutter) {
if (isFinest) {
logger.log(Level.FINEST,
- "[ FDTWriterSession ] [ finalCleanup ] closing file sessions got exception. Cause: ",
- ignOutter);
+ "[ FDTWriterSession ] [ finalCleanup ] closing file sessions got exception. Cause: ", ignOutter);
}
}
@@ -266,8 +254,7 @@ private void finalCleanup() {
} catch (Throwable ignTransport) {
if (isFinest) {
logger.log(Level.FINEST,
- "[ FDTWriterSession ] [ finalCleanup ] closing transport got exception. Cause: ",
- ignTransport);
+ "[ FDTWriterSession ] [ finalCleanup ] closing transport got exception. Cause: ", ignTransport);
}
}
@@ -278,8 +265,7 @@ private void finalCleanup() {
} catch (Throwable ignCtrl) {
if (isFinest) {
logger.log(Level.FINEST,
- "[ FDTWriterSession ] [ finalCleanup ] closing control channel got exception. Cause: ",
- ignCtrl);
+ "[ FDTWriterSession ] [ finalCleanup ] closing control channel got exception. Cause: ", ignCtrl);
}
}
try {
@@ -287,8 +273,7 @@ private void finalCleanup() {
} catch (Throwable ignore) {
if (isFinest) {
logger.log(Level.FINEST,
- "[ FDTWriterSession ] [ finalCleanup ] finishing session in session manager got exception. Cause: ",
- ignore);
+ "[ FDTWriterSession ] [ finalCleanup ] finishing session in session manager got exception. Cause: ", ignore);
}
}
}
@@ -298,8 +283,7 @@ protected void internalClose() throws Exception {
if (logger.isLoggable(Level.FINEST)) {
Thread.dumpStack();
- logger.log(Level.FINEST, " [ FDTWriterSession ] enters internalClose downMsg: " + downMessage()
- + " , downCause: " + downCause());
+ logger.log(Level.FINEST, " [ FDTWriterSession ] enters internalClose downMsg: " + downMessage() + " , downCause: " + downCause());
Thread.dumpStack();
}
@@ -316,9 +300,7 @@ protected void internalClose() throws Exception {
checkFinished(null);
} else {
final String downLogMsg = (downMessage == null) ? "N/A" : downMessage;
- logger.log(Level.INFO,
- "\nThe FDTWriterSession ( " + sessionID + " ) finished with error(s). Cause: " + downLogMsg,
- downCause());
+ logger.log(Level.INFO, "\nThe FDTWriterSession ( " + sessionID + " ) finished with error(s). Cause: " + downLogMsg, downCause());
finalCleanup();
}
@@ -346,8 +328,7 @@ private void sendFinishedSessions() throws Exception {
@Override
public void handleInitFDTSessionConf(CtrlMsg ctrlMsg) throws Exception {
// this should be never called ( for the moment ... )
- logger.log(Level.WARNING,
- "[ FDTWriterSession ] handleInitFDTSessionConf must not be called on the writer side. Msg: " + ctrlMsg);
+ logger.log(Level.WARNING, "[ FDTWriterSession ] handleInitFDTSessionConf must not be called on the writer side. Msg: " + ctrlMsg);
FDTProcolException fpe = new FDTProcolException("Illegal message INIT_FDT_CONF in WriterSesssion");
fpe.fillInStackTrace();
throw fpe;
@@ -368,8 +349,7 @@ public void handleFinalFDTSessionConf(CtrlMsg ctrlMsg) throws Exception {
logger.log(Level.SEVERE, "Unable to transfer to storage: " + "Storage configuration is not found.");
} else {
this.destinationDir = config.storageParams().localFileDir();
- logger.log(Level.WARNING, "Destination directory has been " + "changed from " + sccm.destinationDir
- + " to " + this.destinationDir);
+ logger.log(Level.WARNING, "Destination directory has been " + "changed from " + sccm.destinationDir + " to " + this.destinationDir);
}
}
@@ -393,13 +373,11 @@ public void handleFinalFDTSessionConf(CtrlMsg ctrlMsg) throws Exception {
noTmp = true;
}
boolean noLock = false;
- if (config.isNoLockFlagSet() || controlChannel.remoteConf.get("-nolock") != null
- || controlChannel.remoteConf.get("-nolocks") != null) {
+ if (config.isNoLockFlagSet() || controlChannel.remoteConf.get("-nolock") != null || controlChannel.remoteConf.get("-nolocks") != null) {
noLock = true;
}
- final FileChannelProvider fcp = Config.getInstance().getFileChannelProviderFactory()
- .newWriterFileChannelProvider(this);
+ final FileChannelProvider fcp = Config.getInstance().getFileChannelProviderFactory().newWriterFileChannelProvider(this);
final String preProcessFiltersProp = config.getPreFilters();
boolean hasPreProc = false;
@@ -447,24 +425,21 @@ public void handleFinalFDTSessionConf(CtrlMsg ctrlMsg) throws Exception {
}
if (bPreProccessing) {
- logger.log(Level.INFO,
- "Preprocessing took: " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sTime)) + " ms.");
+ logger.log(Level.INFO, "Preprocessing took: " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sTime)) + " ms.");
}
}
for (final FileSession fws : fileSessions.values()) {
if (resumeManager.isFinished(fws)) {
if (isFiner) {
- logger.log(Level.FINER,
- "\n\n\n ====> [ FDTWriterSession ] the file session " + fws.sessionID + " is Finished!");
+ logger.log(Level.FINER, "\n\n\n ====> [ FDTWriterSession ] the file session " + fws.sessionID + " is Finished!");
}
addAndGetUtilBytes(fws.sessionSize());
addAndGetTotalBytes(fws.sessionSize());
super.finishFileSession(fws.sessionID, null);
} else {
if (isFiner) {
- logger.log(Level.FINER, "\n\n\n ====> [ FDTWriterSession ] <====== the file session "
- + fws.sessionID + " is NOT Finished! <============ ");
+ logger.log(Level.FINER, "\n\n\n ====> [ FDTWriterSession ] <====== the file session " + fws.sessionID + " is NOT Finished! <============ ");
}
}
}
@@ -481,8 +456,7 @@ public void handleFinalFDTSessionConf(CtrlMsg ctrlMsg) throws Exception {
throw new FDTProcolException(" Non null transport provider !");
}
} else if (role == CLIENT) {
- transportProvider = new TCPSessionReader(this, this, InetAddress.getByName(config.getHostName()),
- transferPort, config.getSockNum());
+ transportProvider = new TCPSessionReader(this, this, InetAddress.getByName(config.getHostName()), transferPort, config.getSockNum());
}
// Notify the reader that he can start to send the data
@@ -540,8 +514,10 @@ public void handleEndFDTSession(CtrlMsg ctrlMsg) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("\n\n===\tRemote MD5 Sums\t===");
for (Map.Entry entry : md5Sums.entrySet()) {
- sb.append("\n").append(Utils.md5ToString(entry.getValue())).append(" ")
- .append(fileSessions.get(entry.getKey()).fileName());
+ sb.append("\n");
+ sb.append(Utils.md5ToString(entry.getValue()));
+ sb.append(" ");
+ sb.append(fileSessions.get(entry.getKey()).fileName());
}
sb.append("\n===\tEND Remote MD5 Sums\t=== \n");
logger.log(Level.INFO, sb.toString());
@@ -550,12 +526,10 @@ public void handleEndFDTSession(CtrlMsg ctrlMsg) throws Exception {
} else if (ctrlMsg.message instanceof String) {
remoteDownMsg = (String) ctrlMsg.message;
close(remoteDownMsg, null);
- logger.log(Level.WARNING, "\n\n [ FDTWriterSession ] Remote FDTReaderSession for session [ " + sessionID
- + " ] finished with errors:\n" + remoteDownMsg + "\n");
+ logger.log(Level.WARNING, "\n\n [ FDTWriterSession ] Remote FDTReaderSession for session [ " + sessionID + " ] finished with errors:\n" + remoteDownMsg + "\n");
}
} else {
- logger.log(Level.INFO, "[ FDTWriterSession ] Remote FDTReaderSession for session [ " + sessionID
- + " ] finished ok. Waiting for our side to finish.");
+ logger.log(Level.INFO, "[ FDTWriterSession ] Remote FDTReaderSession for session [ " + sessionID + " ] finished ok. Waiting for our side to finish.");
}
}
@@ -607,8 +581,7 @@ private boolean doPreprocess(String[] preProcessFilters, Map [ FDTWriterSession ] [preProcess] the file session "
- + fws.sessionID + "/" + fws.fileName() + " is finished!");
+ logger.log(Level.FINE, "\n\n\n ====> [ FDTWriterSession ] [preProcess] the file session " + fws.sessionID + "/" + fws.fileName() + " is finished!");
}
addAndGetUtilBytes(fws.sessionSize());
addAndGetTotalBytes(fws.sessionSize());
@@ -692,8 +662,10 @@ private boolean doPostProcessing() throws Exception {
} finally {
StringBuffer sb = new StringBuffer();
if (filtersCount > 0) {
- sb.append("[ FDTWriterSession ] Postprocessing: ").append(filtersCount).append(" filters in ")
- .append(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sTime)).append(" ms");
+ sb.append("[ FDTWriterSession ] Postprocessing: ");
+ sb.append(filtersCount).append(" filters in ");
+ sb.append(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - sTime));
+ sb.append(" ms");
} else {
sb.append("[ FDTWriterSession ] No post processing filters defined/processed.");
}
@@ -735,8 +707,7 @@ private void postPprocess(ProcessorInfo processorInfo, String filterName) throws
private void checkFinished(Throwable finishCause) {
if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "\n\n\n\n\n\n ---------------- [ FDTWriterSession ] finishedSessions.size(). "
- + finishedSessions.size() + " fileSessions.size() " + fileSessions.size());
+ logger.log(Level.FINER, "\n\n\n\n\n\n ---------------- [ FDTWriterSession ] finishedSessions.size(). " + finishedSessions.size() + " fileSessions.size() " + fileSessions.size());
}
if (finishedSessions.size() == fileSessions.size()) {
diff --git a/src/lia/util/net/copy/monitoring/ConsoleReportingTask.java b/src/lia/util/net/copy/monitoring/ConsoleReportingTask.java
index d54f323..8585c81 100644
--- a/src/lia/util/net/copy/monitoring/ConsoleReportingTask.java
+++ b/src/lia/util/net/copy/monitoring/ConsoleReportingTask.java
@@ -10,6 +10,7 @@
import lia.util.net.copy.monitoring.base.AbstractAccountableMonitoringTask;
import lia.util.net.copy.transport.TCPTransportProvider;
+import java.net.SocketException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -20,6 +21,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+
+/// Libraries for writing rate monitor to a file
+import java.io.*;
+import lia.util.net.copy.*;
+
/**
* This class is the only class which should report to the stdout
*
@@ -33,12 +39,16 @@ public class ConsoleReportingTask extends AbstractAccountableMonitoringTask {
private static final DiskReaderManager diskReaderManager = DiskReaderManager.getInstance();
private static final ConsoleReportingTask thisInstace = new ConsoleReportingTask();
- // private final DateFormat dateFormat = new SimpleDateFormat("dd/MM/yy HH:mm:ss");
- private final DateFormat dateFormat = new SimpleDateFormat("dd/MM HH:mm:ss");
+ private final DateFormat dateFormat = new SimpleDateFormat("dd/MM/yy\tHH:mm:ss");
+ // private final DateFormat dateFormat = new SimpleDateFormat("dd/MM HH:mm:ss");
private final Set oldReaderSessions = new TreeSet();
private final Set oldWriterSessions = new TreeSet();
private final boolean customLog;
-
+
+ //StringBufferf for transfer rate bufferedWriter
+ //private StringBuffer sbf = new StringBuffer();
+ private int writeRateToFileCount = 0;
+
private ConsoleReportingTask() {
super(null);
if (logger.isLoggable(Level.FINER)) {
@@ -51,9 +61,9 @@ public static final ConsoleReportingTask getInstance() {
return thisInstace;
}
- private final boolean reportStatus(final Set currentSessionSet, final Set oldSessionSet,
- final String tag, final StringBuilder sb) {
- boolean shouldReport = false;
+ private final boolean reportStatus(final Set currentSessionSet, final Set oldSessionSet, final String tag, final StringBuilder sb, final String buffTag) {
+
+ boolean shouldReport = false;
if (oldSessionSet.size() > 0) {
double totalReadRate = 0;
@@ -72,14 +82,11 @@ private final boolean reportStatus(final Set currentSessionSet, fina
if (tcpTransportProvider == null) {
// this is real big .... BUG?????
logger.log(Level.WARNING,
- " [ ConsoleReportingTask ] The session: " + fdtSession
- .sessionID() + " is no longer "
- + "available, but canot remove trasport provider from monitoring queue. It's probably a BUG in FDT");
+ " [ ConsoleReportingTask ] The session: " + fdtSession.sessionID() + " is no longer available, but canot remove trasport provider from monitoring queue. It's probably a BUG in FDT");
continue;
}
if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, " [ ConsoleReportingTask ] Removing tcpTransportProvider "
- + tcpTransportProvider + " for session: " + fdtSession.sessionID());
+ logger.log(Level.FINE, " [ ConsoleReportingTask ] Removing tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
remove(tcpTransportProvider);
it.remove();
@@ -97,10 +104,13 @@ private final boolean reportStatus(final Set currentSessionSet, fina
totalReadRate += totalRate;
if (reportMultipleSessions) {
- sb.append("\n").append(fdtSession.sessionID());
+ sb.append("\n");
+ sb.append(fdtSession.sessionID());
}
- sb.append(tag).append(Utils.formatWithBitFactor(8 * totalRate, 0, "/s")).append("\tAvg: ")
- .append(Utils.formatWithBitFactor(8 * avgTotalRate, 0, "/s"));
+ sb.append(tag);
+ sb.append(Utils.formatWithBitFactor(8 * totalRate, 0, "/s"));
+ sb.append("\tAvg: ");
+ sb.append(Utils.formatWithBitFactor(8 * avgTotalRate, 0, "/s"));
final long dtMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fdtSession.startTimeNanos);
if (fdtSession.getSize() > 0 && dtMillis > 20 * 1000) {
@@ -108,9 +118,26 @@ private final boolean reportStatus(final Set currentSessionSet, fina
final double cSize = (tcpSize <= 0L) ? 0D : tcpSize;
if (cSize > 0) {
final double tSize = (fdtSession.getSize() <= 0L) ? 0D : fdtSession.getSize();
- sb.append(" ").append(Utils.percentDecimalFormat((cSize * 100) / tSize)).append("%");
+ sb.append("\t");
+ sb.append(Utils.percentDecimalFormat((cSize * 100) / tSize));
+ sb.append("%");
+ sb.append("\t");
+
final double remainingSeconds = (fdtSession.getSize() - cSize) / avgTotalRate;
- sb.append(" ( ").append(Utils.getETA((long) remainingSeconds)).append(" )");
+ sb.append(" ( ");
+ sb.append(Utils.getETA((long) remainingSeconds));
+ sb.append(" )");
+
+ sb.append("\t");
+ sb.append(buffTag);
+ try {
+ if (buffTag=="SO_SNDBUF: ") {
+ sb.append(tcpTransportProvider.getSNDBUFSize());
+ } else sb.append("null"); //Currently don't support SO_RCVBUF yet
+ } catch (SocketException e) {
+ e.printStackTrace();
+ }
+
}
}
}
@@ -130,13 +157,11 @@ private final boolean reportStatus(final Set currentSessionSet, fina
if (tcpTransportProvider != null) {
if (addIfAbsent(tcpTransportProvider, logger.isLoggable(Level.FINER) ? true : false)) {
if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, " [ ConsoleReportingTask ] Adding tcpTransportProvider "
- + tcpTransportProvider + " for session: " + fdtSession.sessionID());
+ logger.log(Level.FINE, " [ ConsoleReportingTask ] Adding tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
} else {
if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, " [ ConsoleReportingTask ] Unable to add tcpTransportProvider "
- + tcpTransportProvider + " for session: " + fdtSession.sessionID());
+ logger.log(Level.FINE, " [ ConsoleReportingTask ] Unable to add tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
}
@@ -148,18 +173,20 @@ private final boolean reportStatus(final Set currentSessionSet, fina
return shouldReport;
}
- private void reportStatus() {
+ private void reportStatus() throws IOException {
StringBuilder sb = new StringBuilder(8192);
-
- boolean shouldReport = (reportStatus(diskWriterManager.getSessions(), oldWriterSessions, "Net In: ", sb)
- || reportStatus(diskReaderManager.getSessions(), oldReaderSessions, "Net Out: ", sb));
+
+ boolean shouldReport = (reportStatus(diskWriterManager.getSessions(), oldWriterSessions, "Net In: ", sb, "SO_RCVBUF: ") || reportStatus(diskReaderManager.getSessions(), oldReaderSessions, "Net Out: ", sb, "SO_SNDBUF: "));
if (shouldReport) {
logger.info(sb.toString());
}
+
+ //Parse StringBuilder sb as argument for writeRateToFile Method
+ writeRateToFile(sb.toString());
}
-
+
@Override
public void rateComputed() {
try {
@@ -168,5 +195,80 @@ public void rateComputed() {
logger.log(Level.WARNING, " [ ConsoleReportingTask ] Got exception while reporting", t1);
}
}
+
+ //Take the Net_IN/Net_OUT log and write to a separate text file
+ private void writeRateToFile(String terminalRateLog) throws IOException{
+
+ String newRateLog = terminalRateLog;
+ String buffTag, netTag;
+ if(newRateLog.contains("Net In")) {
+ buffTag = "SO_RCVBUF:";
+ netTag = "NET_IN";
+ } else {
+ buffTag = "SO_SNDBUF:";
+ netTag = "NET_OUT";
+ }
+ /**
+ * Original format:
+ *
+ * Net {In/Out}: X.XXX {Gb/s|Mb/s}\tAvg: Y.YYY {Gb/s|Mb/s}\tZZ.ZZ% ( RemainingTime )\t{SO_SNDBUF|SO_RCVBUF} Size: AAAAAAA
+ *
+ * Target format:
+ * X.XXX\tY.YYY\tZZ.ZZ\tRemainingTime\tAAAAAA
+ *
+ * **/
+ if(newRateLog.contains("Net Out: ")) {
+ newRateLog = newRateLog.replaceAll("Net Out: ", "");
+ newRateLog = newRateLog.replaceAll("SO_SNDBUF: ", "");
+ } else {
+ newRateLog = newRateLog.replaceAll("Net In: ", "");
+ newRateLog = newRateLog.replaceAll("SO_RCVBUF: ", "");
+ }
+
+ if(newRateLog.contains(" Gb/s")) {
+ newRateLog = newRateLog.replaceAll(" Gb/s", "");
+ } else if (newRateLog.contains(" Mb/s")) {
+ newRateLog = newRateLog.replaceAll(" Mb/s", "");
+ } else newRateLog = newRateLog.replaceAll(" Kb/s", "");
+
+ newRateLog = newRateLog.replaceAll("Avg: ", "");
+ newRateLog = newRateLog.replaceAll("%", "");
+ newRateLog = newRateLog.replaceAll("\\( ","");
+ newRateLog = newRateLog.replaceAll(" \\)", "");
+
+
+ if (writeRateToFileCount>0) {
+ //After the file was already rreated, just append the new log the to the old file
+
+ BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode
+ //BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode
+
+ bwr.newLine();
+ bwr.write(writeRateToFileCount + "\t");
+ bwr.write(dateFormat.format(new Date()) + "\t");
+ bwr.write(newRateLog);
+ bwr.close();
+ writeRateToFileCount++;
+ } else {
+ //BufferedWriter for writing the rate to .txt file
+ //Create new file first when the this method is called for the first time
+
+ BufferedWriter bwr = new BufferedWriter(new FileWriter(new File("/tmp/transfer_rate.txt")));
+ //BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode
+
+ /**
+ * Column Header for the .txt file separated with tab \t
+ *
+ * [NO]\t[DATE]\t[TIME]\t{NET_IN|NET_OUT}\t[AVG]\t[PERCENT COMPLETED]\t[TIME REMAINING]\t[SO_SNDBUF]
+ * **/
+ bwr.write("NO\tDATE\tTIME\t" + netTag +"\tAVG\tPERCENT COMPLETED\tTIME REMAINING\t" + buffTag);
+
+ bwr.write(writeRateToFileCount + "\t");
+ bwr.write(dateFormat.format(new Date()) + "\t");
+ bwr.write(newRateLog);
+ bwr.close();
+ writeRateToFileCount++;
+ }
+ }
}
diff --git a/src/lia/util/net/copy/transport/TCPTransportProvider.java b/src/lia/util/net/copy/transport/TCPTransportProvider.java
index 4314739..08d9fa6 100644
--- a/src/lia/util/net/copy/transport/TCPTransportProvider.java
+++ b/src/lia/util/net/copy/transport/TCPTransportProvider.java
@@ -17,6 +17,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
+import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -56,7 +58,9 @@ public abstract class TCPTransportProvider extends AbstractFDTIOEntity implement
ScheduledFuture> monitoringTaskFuture;
ScheduledFuture> limiterTask;
-
+
+ private static SocketChannel sc;
+
public TCPTransportProvider(FDTSession fdtSession) throws Exception {
this(fdtSession, new LinkedBlockingQueue());
}
@@ -132,6 +136,7 @@ private static final List tryToConnect(InetSocketAddress addr, in
if (windowSize > 0) {
s.setSendBufferSize(windowSize);
+ sc.setOption(StandardSocketOptions.SO_SNDBUF, windowSize);
}
final String sdpConfFlag = System.getProperty("com.sun.sdp.conf");
final boolean bSDP = (sdpConfFlag != null && !sdpConfFlag.isEmpty());
@@ -187,7 +192,7 @@ private static final List tryToConnect(InetSocketAddress addr, in
for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
SelectionKey ssk = it.next();
it.remove();
- SocketChannel sc = (SocketChannel) ssk.channel();
+ sc = (SocketChannel) ssk.channel();
if (ssk.isConnectable()) {
ssk.interestOps(ssk.interestOps() & ~SelectionKey.OP_CONNECT);
while (!sc.finishConnect()) {
@@ -284,6 +289,14 @@ public int getNumberOfStreams() {
return channels.size();
}
}
+
+ public int getSNDBUFSize() throws SocketException {
+ return sc.socket().getSendBufferSize();
+ }
+
+ public int getRCVBUFSize() throws SocketException {
+ return sc.socket().getReceiveBufferSize();
+ }
public InetAddress getRemoteEndPointAddress() {
return null;