Skip to content

Commit 2b345fa

Browse files
committed
[core] fix applyGroupSequences
1 parent 3cefede commit 2b345fa

File tree

3 files changed

+46
-128
lines changed

3 files changed

+46
-128
lines changed

srtcore/core.cpp

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
32363236

32373237
// Check if there exists a group that this one is a peer of.
32383238
CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup);
3239-
bool was_empty = true;
32403239
if (gp)
32413240
{
32423241
if (gp->type() != gtp)
@@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
32483247
}
32493248

32503249
HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id());
3251-
3252-
if (!gp->groupEmpty())
3253-
was_empty = false;
32543250
}
32553251
else
32563252
{
@@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
32733269

32743270
gp->set_peerid(peergroup);
32753271
gp->deriveSettings(this);
3272+
gp->syncWithSocket(s->core(), HSD_RESPONDER);
32763273

32773274
// This can only happen on a listener (it's only called on a site that is
32783275
// HSD_RESPONDER), so it was a response for a groupwise connection.
@@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
32843281
<< gp->id());
32853282
}
32863283

3287-
{
3288-
ScopedLock glock (*gp->exp_groupLock());
3289-
if (gp->closing())
3290-
{
3291-
HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process");
3292-
}
3293-
3294-
if (was_empty)
3295-
{
3296-
gp->syncWithSocket(s->core(), HSD_RESPONDER);
3297-
}
3298-
}
3299-
33003284
// Setting non-blocking reading for group socket.
33013285
s->core().m_config.bSynRecving = false;
33023286
s->core().m_config.bSynSending = false;
@@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp)
33963380

33973381
// These are the values that are normally set initially by setters.
33983382
int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck;
3399-
if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)))
3400-
{
3401-
HLOGC(gmlog.Debug,
3402-
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
3403-
<< " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck
3404-
<< " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")");
3383+
gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn));
3384+
HLOGC(gmlog.Debug,
3385+
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
3386+
<< " (shift by " << CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND=%" << m_iSndLastAck << " -> %"
3387+
<< snd_isn << " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")");
3388+
if (rcv_isn != m_iRcvLastAck)
34053389
setInitialRcvSeq(rcv_isn);
3390+
if (snd_isn != m_iSndLastAck)
34063391
setInitialSndSeq(snd_isn);
3407-
}
3408-
else
3409-
{
3410-
HLOGC(gmlog.Debug,
3411-
log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%"
3412-
<< m_iSndLastAck);
3413-
}
34143392
}
34153393
#endif
34163394

@@ -7686,7 +7664,10 @@ void srt::CUDT::dropToGroupRecvBase()
76867664
// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
76877665
// but this is an intended order.
76887666
if (m_parent->m_GroupOf)
7667+
{
76897668
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
7669+
m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo);
7670+
}
76907671
}
76917672
if (group_recv_base == SRT_SEQNO_NONE)
76927673
return;

srtcore/group.cpp

Lines changed: 31 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof,
6363
}
6464

6565
// [[using locked(this->m_GroupLock)]];
66-
bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn)
66+
void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn)
6767
{
68-
if (m_bConnected) // You are the first one, no need to change.
69-
{
70-
IF_HEAVY_LOGGING(string update_reason = "what?");
71-
// Find a socket that is declared connected and is not
72-
// the socket that caused the call.
73-
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
74-
{
75-
if (gi->id == target)
76-
continue;
77-
78-
CUDT& se = gi->ps->core();
79-
if (!se.m_bConnected)
80-
continue;
81-
82-
// Found it. Get the following sequences:
83-
// For sending, the sequence that is about to be sent next.
84-
// For receiving, the sequence of the latest received packet.
85-
86-
// SndCurrSeqNo is initially set to ISN-1, this next one is
87-
// the sequence that is about to be stamped on the next sent packet
88-
// over that socket. Using this field is safer because it is atomic
89-
// and its affinity is to the same thread as the sending function.
90-
91-
// NOTE: the groupwise scheduling sequence might have been set
92-
// already. If so, it means that it was set by either:
93-
// - the call of this function on the very first conencted socket (see below)
94-
// - the call to `sendBroadcast` or `sendBackup`
95-
// In both cases, we want THIS EXACTLY value to be reported
96-
if (m_iLastSchedSeqNo != -1)
97-
{
98-
w_snd_isn = m_iLastSchedSeqNo;
99-
IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq");
100-
}
101-
else
102-
{
103-
w_snd_isn = se.m_iSndNextSeqNo;
104-
105-
// Write it back to the groupwise scheduling sequence so that
106-
// any next connected socket will take this value as well.
107-
m_iLastSchedSeqNo = w_snd_isn;
108-
IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending");
109-
}
110-
111-
// RcvCurrSeqNo is increased by one because it happens that at the
112-
// synchronization moment it's already past reading and delivery.
113-
// This is redundancy, so the redundant socket is connected at the moment
114-
// when the other one is already transmitting, so skipping one packet
115-
// even if later transmitted is less troublesome than requesting a
116-
// "mistakenly seen as lost" packet.
117-
w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo);
118-
119-
HLOGC(gmlog.Debug,
120-
log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn)
121-
<< " snd %" << (w_snd_isn) << " as " << update_reason);
122-
return false;
123-
}
124-
}
125-
126-
// If the GROUP (!) is not connected, or no running/pending socket has been found.
127-
// // That is, given socket is the first one.
128-
// The group data should be set up with its own data. They should already be passed here
129-
// in the variables.
130-
//
131-
// Override the schedule sequence of the group in this case because whatever is set now,
132-
// it's not valid.
133-
134-
HLOGC(gmlog.Debug,
135-
log << "applyGroupSequences: no socket found connected and transmitting, @" << target
136-
<< " not changing sequences, storing snd-seq %" << (w_snd_isn));
137-
138-
set_currentSchedSequence(w_snd_isn);
139-
140-
return true;
68+
// LastSchedSeqNo is initially set to the ISN of first connected socket,
69+
// its' also updated in group send functions to the next scheduling seq.
70+
w_snd_isn = m_iLastSchedSeqNo;
71+
72+
// RcvCurrSeqNo is increased by one because it happens that at the
73+
// synchronization moment it's already past reading and delivery.
74+
// This is redundancy, so the redundant socket is connected at the moment
75+
// when the other one is already transmitting, so skipping one packet
76+
// even if later transmitted is less troublesome than requesting a
77+
// "mistakenly seen as lost" packet.
78+
w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo);
14179
}
14280

14381
// NOTE: This function is now for DEBUG PURPOSES ONLY.
@@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
263201
, m_bSynSending(true)
264202
, m_bTsbPd(true)
265203
, m_bTLPktDrop(true)
266-
, m_iTsbPdDelay_us(0)
267204
// m_*EID and m_*Epolld fields will be initialized
268205
// in the constructor body.
269206
, m_iSndTimeOut(-1)
@@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
276213
, m_bClosing(false)
277214
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
278215
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
216+
, m_iRcvCurrSeqNo(SRT_SEQNO_NONE)
279217
{
280218
setupMutex(m_GroupLock, "Group");
281219
setupMutex(m_RcvDataLock, "RcvData");
@@ -845,6 +783,8 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
845783
set_currentSchedSequence(core.ISN());
846784
}
847785

786+
set_currentRecvSequence(core.m_iRcvCurrSeqNo);
787+
848788
// XXX
849789
// Might need further investigation as to whether this isn't
850790
// wrong for some cases. By having this -1 here the value will be
@@ -857,11 +797,6 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
857797
//
858798
// Previous implementation used setting to: core.m_iPeerISN
859799
resetInitialRxSequence();
860-
861-
// Get the latency (possibly fixed against the opposite side)
862-
// from the first socket (core.m_iTsbPdDelay_ms),
863-
// and set it on the current socket.
864-
set_latency(core.m_iTsbPdDelay_ms * int64_t(1000));
865800
}
866801

867802
void CUDTGroup::close()
@@ -2089,6 +2024,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
20892024
}
20902025
}
20912026

2027+
void CUDTGroup::updateRcvCurrSeqNo(int32_t seq)
2028+
{
2029+
ScopedLock lg(m_GroupLock);
2030+
2031+
if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE)
2032+
{
2033+
LOGC(grlog.Error,
2034+
log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq);
2035+
m_iRcvCurrSeqNo = seq;
2036+
}
2037+
else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0)
2038+
{
2039+
m_iRcvCurrSeqNo = seq;
2040+
}
2041+
}
2042+
20922043
int32_t CUDTGroup::getRcvBaseSeqNo()
20932044
{
20942045
ScopedLock lg(m_GroupLock);

srtcore/group.h

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,8 @@ class CUDTGroup
162162
{
163163
m_Group.erase(f);
164164

165-
// Reset sequence numbers on a dead group so that they are
166-
// initialized anew with the new alive connection within
167-
// the group.
168-
// XXX The problem is that this should be done after the
169-
// socket is considered DISCONNECTED, not when it's being
170-
// closed. After being disconnected, the sequence numbers
171-
// are no longer valid, and will be reinitialized when the
172-
// socket is connected again. This may stay as is for now
173-
// as in SRT it's not predicted to do anything with the socket
174-
// that was disconnected other than immediately closing it.
175165
if (m_Group.empty())
176166
{
177-
// When the group is empty, there's no danger that this
178-
// number will collide with any ISN provided by a socket.
179-
// Also since now every socket will derive this ISN.
180-
m_iLastSchedSeqNo = generateISN();
181-
resetInitialRxSequence();
182167
empty = true;
183168
}
184169
}
@@ -346,6 +331,7 @@ class CUDTGroup
346331
/// @param sock member socket ID (unused)
347332
/// @param sequence the latest packet sequence number available for reading.
348333
void updateReadState(SRTSOCKET sock, int32_t sequence);
334+
void updateRcvCurrSeqNo(int32_t seq);
349335

350336
void updateWriteState();
351337
void updateFailedLink();
@@ -630,7 +616,6 @@ class CUDTGroup
630616
bool m_bSynSending;
631617
bool m_bTsbPd;
632618
bool m_bTLPktDrop;
633-
int64_t m_iTsbPdDelay_us;
634619
int m_RcvEID;
635620
class CEPollDesc* m_RcvEpolld;
636621
int m_SndEID;
@@ -692,6 +677,7 @@ class CUDTGroup
692677
sync::Mutex m_RcvDataLock;
693678
sync::atomic<int32_t> m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
694679
sync::atomic<int32_t> m_iLastSchedMsgNo;
680+
sync::atomic<int32_t> m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group.
695681
// Statistics
696682

697683
struct Stats
@@ -803,7 +789,7 @@ class CUDTGroup
803789

804790
// Live state synchronization
805791
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
806-
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
792+
void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
807793

808794
/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
809795
/// @param srcMember a reference for synchronization.
@@ -816,8 +802,8 @@ class CUDTGroup
816802
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID);
817803
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
818804
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
805+
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentRecvSequence, m_iRcvCurrSeqNo);
819806
SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
820-
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
821807
SRTU_PROPERTY_RO(bool, closing, m_bClosing);
822808
};
823809

0 commit comments

Comments
 (0)