-
Notifications
You must be signed in to change notification settings - Fork 27
Description
@rosenbaumalex @katyakats HELLO!
We modificate the code according to your performance. Sadly we got kref_get warning when eqh is reusing. So we now only reuse the msgpool for sending messages. it works okay.
It happens sometimes that the ConnectionRdma will receive a SESSION_CLOSED as the first event, not SESSION_ESTABLISHED. we make sure that all eqhs we used are new, not reusing the former eqh.
14/10/17 07:33:06 WARN TaskSetManager: Lost TID 1195 (task 0.0:145)
14/10/17 07:33:06 WARN TaskSetManager: Loss was due to java.net.ConnectException
java.net.ConnectException: org.apache.spark.network.ConnectionRdma@6f03feb6 could not connect to 12.12.12.7 on port 53089, got SESSION_CLOSED
at org.apache.spark.network.ConnectionRdma.connect(ConnectionRdma.scala:34)
at org.apache.spark.network.ConnectionManagerRdma.sendMessage(ConnectionManagerRdma.scala:301)
at org.apache.spark.network.ConnectionManagerRdma.sendMessageReliably(ConnectionManagerRdma.scala:381)
at ....
A simple version of this problemd boy - - ConnectionRdma is as followed
please give some ideas
and, after this one fails, the following can still run good.
we make an estimation, that this problem only happens in small possibility.
private[spark] class ConnectionRdma(val uri: URI, val size: Int) extends Logging {
var eqh: EventQueueHandler = null
var msgPool: MsgPool = null
var cs: ClientSession = null
var thread: Thread = null
var established = false
var connectErrorType: EventName = null
var close = false
def connect() {
eqh = new EventQueueHandler(null)
msgPool = MyResourcesManager.getMsgPool(size)
val startTime = System.nanoTime
cs = new ClientSession(eqh, uri, new ClientCallbacks)
eqh.runEventLoop(1, -1) // session established event
if (!established) {
throw new ConnectException(this.toString + " could not connect to " + uri.getHost + " on port "
+ uri.getPort + ", got " + connectErrorType)
}
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() = {
close = true
}
})
val endTime = System.nanoTime
logDebug(cs.toString + " session established with host " + uri.getHost + ", time taken to open: "
+ (endTime - startTime))
}
class ClientCallbacks extends ClientSession.Callbacks {
def onMsgError(msg: Msg, reason: EventReason) {
if (reason != EventReason.MSG_FLUSHED) {
logError(this.toString + " onMsgErrorCallback, " + reason)
}
msgPool.releaseMsg(msg)
}
def onSessionEstablished() {
established = true
}
def onSessionEvent(event: EventName, reason: EventReason) {
logInfo(this.toString + " onSessionEvent " + event)
if (event == EventName.SESSION_CLOSED || event == EventName.SESSION_ERROR
|| event == EventName.SESSION_REJECT) {
// normal exit
connectErrorType = event;
close = true
eqh.breakEventLoop
}
}
def onResponse(m: Msg) {
if (m.getIn.limit != 0) {
println("Time: R " + m.getIn.getInt + " " + System.currentTimeMillis)
}
msgPool.releaseMsg(m)
}
}
}