@@ -585,6 +585,57 @@ static void mergeDirPerPartDirs(cDirDesc *parent, cDirDesc *dir, const char *cur
585
585
}
586
586
}
587
587
588
+ constexpr int64_t oneSecondNS = 1000 * 1000 * 1000 ; // 1 second in nanoseconds
589
+ constexpr int64_t oneHourNS = 60 * 60 * oneSecondNS; // 1 hour in nanoseconds
590
+ class XRefPeriodicTimer : public PeriodicTimer
591
+ {
592
+ public:
593
+ XRefPeriodicTimer () = default ;
594
+ XRefPeriodicTimer (unsigned seconds, bool suppressFirst, const char *_clustname)
595
+ : clustname(_clustname) { reset (seconds, suppressFirst, clustname); }
596
+
597
+ unsigned calcElapsedMinutes () const
598
+ {
599
+ int64_t elapsedNS = cycle_to_nanosec (lastElapsedCycles - startCycles);
600
+ return elapsedNS / oneSecondNS / 60 ;
601
+ }
602
+
603
+ bool hasElapsed ()
604
+ {
605
+ // MORE: Could make PeriodicTimer::hasElapsed thread safe and remove CriticalBlock
606
+ CriticalBlock block (timerSect);
607
+ return PeriodicTimer::hasElapsed ();
608
+ }
609
+
610
+ void reset (unsigned seconds, bool suppressFirst, const char *_clustname)
611
+ {
612
+ clustname = _clustname;
613
+ PeriodicTimer::reset (seconds*1000 , suppressFirst);
614
+ startCycles = lastElapsedCycles;
615
+ }
616
+
617
+ // Double the time period until it reaches 1 hour
618
+ void updatePeriod ()
619
+ {
620
+ int64_t timePeriodNS = cycle_to_nanosec (timePeriodCycles) + 1 ; // 1 second is lost converting to nanoseconds
621
+ if (timePeriodNS < oneHourNS)
622
+ {
623
+ int64_t newTimePeriodNS = timePeriodNS >= oneHourNS / 2 ? oneHourNS : timePeriodNS * 2 ;
624
+ timePeriodCycles = nanosec_to_cycle (newTimePeriodNS);
625
+
626
+ unsigned intervalMinutes = newTimePeriodNS / oneSecondNS / 60 ;
627
+ if (clustname)
628
+ DBGLOG (LOGPFX " [%s] Heartbeat interval increased to %u minutes" , clustname, intervalMinutes);
629
+ else
630
+ DBGLOG (LOGPFX " Heartbeat interval increased to %u minutes" , intervalMinutes);
631
+ }
632
+ }
633
+
634
+ private:
635
+ CriticalSection timerSect;
636
+ const char *clustname = nullptr ; // Cluster name for logging
637
+ cycle_t startCycles = 0 ;
638
+ };
588
639
589
640
class CNewXRefManagerBase
590
641
{
@@ -599,40 +650,30 @@ class CNewXRefManagerBase
599
650
unsigned lastlog;
600
651
unsigned sfnum;
601
652
unsigned fnum;
653
+ XRefPeriodicTimer heartbeatTimer;
654
+
602
655
Owned<IPropertyTree> foundbranch;
603
656
Owned<IPropertyTree> lostbranch;
604
657
Owned<IPropertyTree> orphansbranch;
605
658
Owned<IPropertyTree> dirbranch;
606
659
607
- void log (const char * format, ...) __attribute__((format(printf, 2 , 3 )))
660
+ void log (bool forceStatusUpdate, const char * format, ...) __attribute__((format(printf, 3 , 4 )))
608
661
{
609
- CriticalBlock block (logsect) ;
662
+ StringBuffer line ;
610
663
va_list args;
611
664
va_start (args, format);
612
- StringBuffer line;
613
665
line.valist_appendf (format, args);
614
666
va_end (args);
667
+
615
668
if (clustname.get ())
616
669
PROGLOG (LOGPFX " [%s] %s" ,clustname.get (),line.str ());
617
670
else
618
671
PROGLOG (LOGPFX " %s" ,line.str ());
619
- if (logconn) {
620
- logcache.set (line.str ());
621
- updateStatus (false );
622
- }
623
- }
624
672
625
- void statlog (const char * format, ...) __attribute__((format(printf, 2 , 3 )))
626
- {
627
- CriticalBlock block (logsect);
628
- va_list args;
629
- va_start (args, format);
630
- StringBuffer line;
631
- line.valist_appendf (format, args);
632
- va_end (args);
633
673
if (logconn) {
674
+ CriticalBlock block (logsect);
634
675
logcache.set (line.str ());
635
- updateStatus (false );
676
+ updateStatus (forceStatusUpdate );
636
677
}
637
678
}
638
679
@@ -684,6 +725,34 @@ class CNewXRefManagerBase
684
725
685
726
}
686
727
728
+ void startHeartbeat (const char * op)
729
+ {
730
+ heartbeatTimer.reset (60 , true , clustname.get ()); // 1 minute interval
731
+ log (true , " %s heartbeat started (interval: 1 minute)" , op);
732
+ }
733
+
734
+ void checkHeartbeat (const char * op)
735
+ {
736
+ time_t now = time (NULL );
737
+ if (!heartbeatTimer.hasElapsed ())
738
+ return ;
739
+
740
+ unsigned elapsedMinutes = heartbeatTimer.calcElapsedMinutes ();
741
+ unsigned elapsedHours = elapsedMinutes / 60 ;
742
+ unsigned remainingMinutes = elapsedMinutes % 60 ;
743
+
744
+ struct tm *utc_tm = gmtime (&now);
745
+ char timestamp[32 ];
746
+ strftime (timestamp, sizeof (timestamp), " %Y-%m-%d %H:%M:%S" , utc_tm);
747
+
748
+ if (elapsedHours > 0 )
749
+ log (true , " %s - elapsed: %uh %um (%s UTC)" , op, elapsedHours, remainingMinutes, timestamp);
750
+ else
751
+ log (true , " %s - elapsed: %um (%s UTC)" , op, elapsedMinutes, timestamp);
752
+
753
+ heartbeatTimer.updatePeriod ();
754
+ }
755
+
687
756
void addBranch (IPropertyTree *root,const char *name,IPropertyTree *branch)
688
757
{
689
758
if (!branch)
@@ -716,13 +785,13 @@ class CNewXRefManagerBase
716
785
xpath.insert (0 ," /DFU/XREF/" );
717
786
logconn.setown (querySDS ().connect (xpath.str (),myProcessSession (),0 ,INFINITE));
718
787
}
719
- log (" Starting" );
788
+ log (false , " Starting" );
720
789
}
721
790
722
791
void finish (bool aborted)
723
792
{
724
793
if (aborted)
725
- log (" Aborted" );
794
+ log (false , " Aborted" );
726
795
logconn.clear (); // final message done by save to eclwatch
727
796
}
728
797
@@ -751,7 +820,7 @@ class CNewXRefManagerBase
751
820
{
752
821
if (abort)
753
822
return ;
754
- log (" Saving information" );
823
+ log (false , " Saving information" );
755
824
Owned<IPropertyTree> croot = createPTree (" Cluster" );
756
825
croot->setProp (" @name" ,clustname);
757
826
if (!rootdir.isEmpty ())
@@ -825,7 +894,7 @@ class CNewXRefManager: public CNewXRefManagerBase
825
894
lostbranch.setown (createPTree (" Lost" ));
826
895
orphansbranch.setown (createPTree (" Orphans" ));
827
896
dirbranch.setown (createPTree (" Directories" ));
828
- log (" Max memory = %d MB" , maxMb);
897
+ log (false , " Max memory = %d MB" , maxMb);
829
898
830
899
StringBuffer userName;
831
900
serverConfig->getProp (" @sashaUser" , userName);
@@ -1037,6 +1106,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1037
1106
1038
1107
bool scanDirectory (unsigned node,const SocketEndpoint &ep,StringBuffer &path, unsigned drv, cDirDesc *pdir, IFile *cachefile, unsigned level)
1039
1108
{
1109
+ checkHeartbeat (" Directory scan" );
1040
1110
size32_t dsz = path.length ();
1041
1111
if (pdir==NULL )
1042
1112
pdir = root;
@@ -1168,7 +1238,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1168
1238
SocketEndpoint localEP;
1169
1239
localEP.setLocalHost (0 );
1170
1240
addPathSepChar (path).append (' d' ).append (i+1 );
1171
- parent.log (" Scanning %s directory %s" ,parent.storagePlane ->queryProp (" @name" ),path.str ());
1241
+ parent.log (false , " Scanning %s directory %s" ,parent.storagePlane ->queryProp (" @name" ),path.str ());
1172
1242
if (!parent.scanDirectory (0 ,localEP,path,0 ,parent.root ,NULL ,1 ))
1173
1243
{
1174
1244
ok = false ;
@@ -1178,7 +1248,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1178
1248
else
1179
1249
{
1180
1250
SocketEndpoint ep = parent.rawgrp ->queryNode (i).endpoint ();
1181
- parent.log (" Scanning %s directory %s" ,ep.getEndpointHostText (tmp).str (),path.str ());
1251
+ parent.log (false , " Scanning %s directory %s" ,ep.getEndpointHostText (tmp).str (),path.str ());
1182
1252
if (!parent.scanDirectory (i,ep,path,0 ,NULL ,NULL ,0 )) {
1183
1253
ok = false ;
1184
1254
return ;
@@ -1187,7 +1257,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1187
1257
i = (i+r)%n;
1188
1258
setReplicateFilename (path,1 );
1189
1259
ep = parent.rawgrp ->queryNode (i).endpoint ();
1190
- parent.log (" Scanning %s directory %s" ,ep.getEndpointHostText (tmp.clear ()).str (),path.str ());
1260
+ parent.log (false , " Scanning %s directory %s" ,ep.getEndpointHostText (tmp.clear ()).str (),path.str ());
1191
1261
if (!parent.scanDirectory (i,ep,path,1 ,NULL ,NULL ,0 )) {
1192
1262
ok = false ;
1193
1263
}
@@ -1203,11 +1273,12 @@ class CNewXRefManager: public CNewXRefManagerBase
1203
1273
numMaxThreads = numuniqnodes;
1204
1274
if (numThreads > numMaxThreads)
1205
1275
numThreads = numMaxThreads;
1276
+ startHeartbeat (" Directory scan" ); // Initialize heartbeat mechanism
1206
1277
afor.For (numMaxThreads,numThreads,true ,numThreads>1 );
1207
1278
if (afor.ok )
1208
- log (" Directory scan complete" );
1279
+ log (true , " Directory scan complete" );
1209
1280
else
1210
- log (" Errors occurred during scan" );
1281
+ log (true , " Errors occurred during scan" );
1211
1282
return afor.ok ;
1212
1283
}
1213
1284
@@ -1253,7 +1324,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1253
1324
{
1254
1325
if (abort)
1255
1326
return ;
1256
- parent.log (" Process file %s" ,name.str ());
1327
+ parent.log (false , " Process file %s" ,name.str ());
1257
1328
parent.fnum ++;
1258
1329
1259
1330
Owned<IFileDescriptor> fdesc;
@@ -1354,7 +1425,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1354
1425
} filescan (*this ,abort);
1355
1426
1356
1427
filescan.scan ();
1357
- log (" File scan complete" );
1428
+ log (true , " File scan complete" );
1358
1429
1359
1430
}
1360
1431
@@ -1688,6 +1759,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1688
1759
1689
1760
void listOrphans (cDirDesc *d,StringBuffer &basedir,StringBuffer &scope,bool &abort,unsigned int recentCutoffDays)
1690
1761
{
1762
+ checkHeartbeat (" Orphan scan" );
1691
1763
if (abort)
1692
1764
return ;
1693
1765
if (!d) {
@@ -1755,23 +1827,24 @@ class CNewXRefManager: public CNewXRefManagerBase
1755
1827
void listOrphans (bool &abort,unsigned int recentCutoffDays)
1756
1828
{
1757
1829
// also does directories
1758
- log (" Scanning for orphans" );
1830
+ log (true ," Scanning for orphans" );
1831
+ startHeartbeat (" Orphan scan" );
1759
1832
StringBuffer basedir;
1760
1833
StringBuffer scope;
1761
1834
listOrphans (NULL ,basedir,scope,abort,recentCutoffDays);
1762
1835
if (abort)
1763
1836
return ;
1764
- log (" Orphan scan complete" );
1837
+ log (true , " Orphan scan complete" );
1765
1838
sorteddirs.sort (compareDirs); // NB sort reverse
1766
1839
while (!abort&&sorteddirs.ordinality ())
1767
1840
dirbranch->addPropTree (" Directory" ,&sorteddirs.popGet ());
1768
- log (" Directories sorted" );
1841
+ log (true , " Directories sorted" );
1769
1842
}
1770
1843
1771
1844
1772
1845
void listLost (bool &abort,bool ignorelazylost,unsigned int recentCutoffDays)
1773
1846
{
1774
- log (" Scanning for lost files" );
1847
+ log (true , " Scanning for lost files" );
1775
1848
StringBuffer tmp;
1776
1849
ForEachItemIn (i0,lostfiles) {
1777
1850
if (abort)
@@ -1911,7 +1984,7 @@ class CNewXRefManager: public CNewXRefManagerBase
1911
1984
lostbranch->addPropTree (" File" ,ft.getClear ());
1912
1985
}
1913
1986
}
1914
- log (" Lost scan complete" );
1987
+ log (true , " Lost scan complete" );
1915
1988
}
1916
1989
1917
1990
@@ -2048,7 +2121,7 @@ class CSuperfileCheckManager: public CNewXRefManagerBase
2048
2121
void processSuperFile (IPropertyTree &file,StringBuffer &name)
2049
2122
{
2050
2123
parent.sfnum ++;
2051
- parent.log (" Scanning SuperFile %s" ,name.str ());
2124
+ parent.log (false , " Scanning SuperFile %s" ,name.str ());
2052
2125
unsigned numsub = file.getPropInt (" @numsubfiles" );
2053
2126
unsigned n = 0 ;
2054
2127
Owned<IPropertyTreeIterator> iter = file.getElements (" SubFile" );
@@ -2104,7 +2177,7 @@ class CSuperfileCheckManager: public CNewXRefManagerBase
2104
2177
2105
2178
bool fix = false ;
2106
2179
2107
- log (" Crossreferencing %d SuperFiles" ,superowner.ordinality ());
2180
+ log (false , " Crossreferencing %d SuperFiles" ,superowner.ordinality ());
2108
2181
ForEachItemIn (i1,superowner) {
2109
2182
const char *owner = superowner.item (i1);
2110
2183
const char *owned = superowned.item (i1);
@@ -2147,8 +2220,8 @@ class CSuperfileCheckManager: public CNewXRefManagerBase
2147
2220
}
2148
2221
}
2149
2222
}
2150
- }
2151
- log (" Crossreferencing %d Files" ,fileowned.ordinality ());
2223
+ }
2224
+ log (false , " Crossreferencing %d Files" ,fileowned.ordinality ());
2152
2225
ForEachItemIn (i3,fileowned) {
2153
2226
const char *fowner = fileowner.item (i3);
2154
2227
const char *fowned = fileowned.item (i3);
0 commit comments