Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry including="**/*.java" kind="src" output="target/classes" path="src">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target/
23 changes: 23 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>Phancisth&apos;s Fork</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
3 changes: 3 additions & 0 deletions .settings/org.eclipse.core.resources.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
eclipse.preferences.version=1
encoding/<project>=UTF-8
encoding/src=UTF-8
8 changes: 8 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.7
103 changes: 37 additions & 66 deletions src/lia/util/net/copy/FDTWriterSession.java

Large diffs are not rendered by default.

152 changes: 127 additions & 25 deletions src/lia/util/net/copy/monitoring/ConsoleReportingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import lia.util.net.copy.monitoring.base.AbstractAccountableMonitoringTask;
import lia.util.net.copy.transport.TCPTransportProvider;

import java.net.SocketException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -20,6 +21,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;


/// Libraries for writing rate monitor to a file
import java.io.*;
import lia.util.net.copy.*;

/**
* This class is the only class which should report to the stdout
*
Expand All @@ -33,12 +39,16 @@ public class ConsoleReportingTask extends AbstractAccountableMonitoringTask {

private static final DiskReaderManager diskReaderManager = DiskReaderManager.getInstance();
private static final ConsoleReportingTask thisInstace = new ConsoleReportingTask();
// private final DateFormat dateFormat = new SimpleDateFormat("dd/MM/yy HH:mm:ss");
private final DateFormat dateFormat = new SimpleDateFormat("dd/MM HH:mm:ss");
private final DateFormat dateFormat = new SimpleDateFormat("dd/MM/yy\tHH:mm:ss");
// private final DateFormat dateFormat = new SimpleDateFormat("dd/MM HH:mm:ss");
private final Set<FDTSession> oldReaderSessions = new TreeSet<FDTSession>();
private final Set<FDTSession> oldWriterSessions = new TreeSet<FDTSession>();
private final boolean customLog;


//StringBufferf for transfer rate bufferedWriter
//private StringBuffer sbf = new StringBuffer();
private int writeRateToFileCount = 0;

private ConsoleReportingTask() {
super(null);
if (logger.isLoggable(Level.FINER)) {
Expand All @@ -51,9 +61,9 @@ public static final ConsoleReportingTask getInstance() {
return thisInstace;
}

private final boolean reportStatus(final Set<FDTSession> currentSessionSet, final Set<FDTSession> oldSessionSet,
final String tag, final StringBuilder sb) {
boolean shouldReport = false;
private final boolean reportStatus(final Set<FDTSession> currentSessionSet, final Set<FDTSession> oldSessionSet, final String tag, final StringBuilder sb, final String buffTag) {

boolean shouldReport = false;

if (oldSessionSet.size() > 0) {
double totalReadRate = 0;
Expand All @@ -72,14 +82,11 @@ private final boolean reportStatus(final Set<FDTSession> currentSessionSet, fina
if (tcpTransportProvider == null) {
// this is real big .... BUG?????
logger.log(Level.WARNING,
" [ ConsoleReportingTask ] The session: " + fdtSession
.sessionID() + " is no longer "
+ "available, but canot remove trasport provider from monitoring queue. It's probably a BUG in FDT");
" [ ConsoleReportingTask ] The session: " + fdtSession.sessionID() + " is no longer available, but canot remove trasport provider from monitoring queue. It's probably a BUG in FDT");
continue;
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, " [ ConsoleReportingTask ] Removing tcpTransportProvider "
+ tcpTransportProvider + " for session: " + fdtSession.sessionID());
logger.log(Level.FINE, " [ ConsoleReportingTask ] Removing tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
remove(tcpTransportProvider);
it.remove();
Expand All @@ -97,20 +104,40 @@ private final boolean reportStatus(final Set<FDTSession> currentSessionSet, fina
totalReadRate += totalRate;

if (reportMultipleSessions) {
sb.append("\n").append(fdtSession.sessionID());
sb.append("\n");
sb.append(fdtSession.sessionID());
}
sb.append(tag).append(Utils.formatWithBitFactor(8 * totalRate, 0, "/s")).append("\tAvg: ")
.append(Utils.formatWithBitFactor(8 * avgTotalRate, 0, "/s"));
sb.append(tag);
sb.append(Utils.formatWithBitFactor(8 * totalRate, 0, "/s"));
sb.append("\tAvg: ");
sb.append(Utils.formatWithBitFactor(8 * avgTotalRate, 0, "/s"));

final long dtMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fdtSession.startTimeNanos);
if (fdtSession.getSize() > 0 && dtMillis > 20 * 1000) {
final long tcpSize = tcpTransportProvider.getUtilBytes();
final double cSize = (tcpSize <= 0L) ? 0D : tcpSize;
if (cSize > 0) {
final double tSize = (fdtSession.getSize() <= 0L) ? 0D : fdtSession.getSize();
sb.append(" ").append(Utils.percentDecimalFormat((cSize * 100) / tSize)).append("%");
sb.append("\t");
sb.append(Utils.percentDecimalFormat((cSize * 100) / tSize));
sb.append("%");
sb.append("\t");

final double remainingSeconds = (fdtSession.getSize() - cSize) / avgTotalRate;
sb.append(" ( ").append(Utils.getETA((long) remainingSeconds)).append(" )");
sb.append(" ( ");
sb.append(Utils.getETA((long) remainingSeconds));
sb.append(" )");

sb.append("\t");
sb.append(buffTag);
try {
if (buffTag=="SO_SNDBUF: ") {
sb.append(tcpTransportProvider.getSNDBUFSize());
} else sb.append("null"); //Currently don't support SO_RCVBUF yet
} catch (SocketException e) {
e.printStackTrace();
}

}
}
}
Expand All @@ -130,13 +157,11 @@ private final boolean reportStatus(final Set<FDTSession> currentSessionSet, fina
if (tcpTransportProvider != null) {
if (addIfAbsent(tcpTransportProvider, logger.isLoggable(Level.FINER) ? true : false)) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, " [ ConsoleReportingTask ] Adding tcpTransportProvider "
+ tcpTransportProvider + " for session: " + fdtSession.sessionID());
logger.log(Level.FINE, " [ ConsoleReportingTask ] Adding tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
} else {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, " [ ConsoleReportingTask ] Unable to add tcpTransportProvider "
+ tcpTransportProvider + " for session: " + fdtSession.sessionID());
logger.log(Level.FINE, " [ ConsoleReportingTask ] Unable to add tcpTransportProvider " + tcpTransportProvider + " for session: " + fdtSession.sessionID());
}
}

Expand All @@ -148,18 +173,20 @@ private final boolean reportStatus(final Set<FDTSession> currentSessionSet, fina
return shouldReport;
}

private void reportStatus() {
private void reportStatus() throws IOException {
StringBuilder sb = new StringBuilder(8192);

boolean shouldReport = (reportStatus(diskWriterManager.getSessions(), oldWriterSessions, "Net In: ", sb)
|| reportStatus(diskReaderManager.getSessions(), oldReaderSessions, "Net Out: ", sb));

boolean shouldReport = (reportStatus(diskWriterManager.getSessions(), oldWriterSessions, "Net In: ", sb, "SO_RCVBUF: ") || reportStatus(diskReaderManager.getSessions(), oldReaderSessions, "Net Out: ", sb, "SO_SNDBUF: "));

if (shouldReport) {
logger.info(sb.toString());
}

//Parse StringBuilder sb as argument for writeRateToFile Method
writeRateToFile(sb.toString());

}

@Override
public void rateComputed() {
try {
Expand All @@ -168,5 +195,80 @@ public void rateComputed() {
logger.log(Level.WARNING, " [ ConsoleReportingTask ] Got exception while reporting", t1);
}
}

//Take the Net_IN/Net_OUT log and write to a separate text file
private void writeRateToFile(String terminalRateLog) throws IOException{

String newRateLog = terminalRateLog;
String buffTag, netTag;
if(newRateLog.contains("Net In")) {
buffTag = "SO_RCVBUF:";
netTag = "NET_IN";
} else {
buffTag = "SO_SNDBUF:";
netTag = "NET_OUT";
}
/**
* Original format:
*
* Net {In/Out}: X.XXX {Gb/s|Mb/s}\tAvg: Y.YYY {Gb/s|Mb/s}\tZZ.ZZ% ( RemainingTime )\t{SO_SNDBUF|SO_RCVBUF} Size: AAAAAAA
*
* Target format:
* X.XXX\tY.YYY\tZZ.ZZ\tRemainingTime\tAAAAAA
*
* **/
if(newRateLog.contains("Net Out: ")) {
newRateLog = newRateLog.replaceAll("Net Out: ", "");
newRateLog = newRateLog.replaceAll("SO_SNDBUF: ", "");
} else {
newRateLog = newRateLog.replaceAll("Net In: ", "");
newRateLog = newRateLog.replaceAll("SO_RCVBUF: ", "");
}

if(newRateLog.contains(" Gb/s")) {
newRateLog = newRateLog.replaceAll(" Gb/s", "");
} else if (newRateLog.contains(" Mb/s")) {
newRateLog = newRateLog.replaceAll(" Mb/s", "");
} else newRateLog = newRateLog.replaceAll(" Kb/s", "");

newRateLog = newRateLog.replaceAll("Avg: ", "");
newRateLog = newRateLog.replaceAll("%", "");
newRateLog = newRateLog.replaceAll("\\( ","");
newRateLog = newRateLog.replaceAll(" \\)", "");


if (writeRateToFileCount>0) {
//After the file was already rreated, just append the new log the to the old file

BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode
//BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode

bwr.newLine();
bwr.write(writeRateToFileCount + "\t");
bwr.write(dateFormat.format(new Date()) + "\t");
bwr.write(newRateLog);
bwr.close();
writeRateToFileCount++;
} else {
//BufferedWriter for writing the rate to .txt file
//Create new file first when the this method is called for the first time

BufferedWriter bwr = new BufferedWriter(new FileWriter(new File("/tmp/transfer_rate.txt")));
//BufferedWriter bwr = new BufferedWriter(new FileWriter("/tmp/transfer_rate.txt", true)); //add true argument for append mode

/**
* Column Header for the .txt file separated with tab \t
*
* [NO]\t[DATE]\t[TIME]\t{NET_IN|NET_OUT}\t[AVG]\t[PERCENT COMPLETED]\t[TIME REMAINING]\t[SO_SNDBUF]
* **/
bwr.write("NO\tDATE\tTIME\t" + netTag +"\tAVG\tPERCENT COMPLETED\tTIME REMAINING\t" + buffTag);

bwr.write(writeRateToFileCount + "\t");
bwr.write(dateFormat.format(new Date()) + "\t");
bwr.write(newRateLog);
bwr.close();
writeRateToFileCount++;
}
}

}
17 changes: 15 additions & 2 deletions src/lia/util/net/copy/transport/TCPTransportProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
Expand Down Expand Up @@ -56,7 +58,9 @@ public abstract class TCPTransportProvider extends AbstractFDTIOEntity implement
ScheduledFuture<?> monitoringTaskFuture;

ScheduledFuture<?> limiterTask;


private static SocketChannel sc;

public TCPTransportProvider(FDTSession fdtSession) throws Exception {
this(fdtSession, new LinkedBlockingQueue<FDTSelectionKey>());
}
Expand Down Expand Up @@ -132,6 +136,7 @@ private static final List<SocketChannel> tryToConnect(InetSocketAddress addr, in

if (windowSize > 0) {
s.setSendBufferSize(windowSize);
sc.setOption(StandardSocketOptions.SO_SNDBUF, windowSize);
}
final String sdpConfFlag = System.getProperty("com.sun.sdp.conf");
final boolean bSDP = (sdpConfFlag != null && !sdpConfFlag.isEmpty());
Expand Down Expand Up @@ -187,7 +192,7 @@ private static final List<SocketChannel> tryToConnect(InetSocketAddress addr, in
for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
SelectionKey ssk = it.next();
it.remove();
SocketChannel sc = (SocketChannel) ssk.channel();
sc = (SocketChannel) ssk.channel();
if (ssk.isConnectable()) {
ssk.interestOps(ssk.interestOps() & ~SelectionKey.OP_CONNECT);
while (!sc.finishConnect()) {
Expand Down Expand Up @@ -284,6 +289,14 @@ public int getNumberOfStreams() {
return channels.size();
}
}

public int getSNDBUFSize() throws SocketException {
return sc.socket().getSendBufferSize();
}

public int getRCVBUFSize() throws SocketException {
return sc.socket().getReceiveBufferSize();
}

public InetAddress getRemoteEndPointAddress() {
return null;
Expand Down