33
33
#include " ../jrd/met_proto.h"
34
34
#include " ../jrd/pag_proto.h"
35
35
#include " ../jrd/tra_proto.h"
36
+ #include " ../common/classes/Spinlock.h"
36
37
37
38
#include < atomic>
39
+ #include < mutex>
38
40
39
41
#ifdef WIN_NT
40
42
#include < process.h>
@@ -100,11 +102,13 @@ namespace
100
102
event_t clientEvent;
101
103
USHORT bufferSize;
102
104
std::atomic<Tag> tag;
105
+ std::atomic_uint seq;
106
+ SpinLock bufferMutex;
103
107
char userName[USERNAME_LENGTH + 1 ]; // \0 if has PROFILE_ANY_ATTACHMENT
104
108
alignas (FB_ALIGNMENT) UCHAR buffer[4096 ];
105
109
};
106
110
107
- static const USHORT VERSION = 2 ;
111
+ static const USHORT VERSION = 3 ;
108
112
109
113
public:
110
114
ProfilerIpc (thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false );
@@ -179,7 +183,7 @@ class Jrd::ProfilerListener final
179
183
listener->watcherThread ();
180
184
}
181
185
182
- void processCommand (thread_db* tdbb);
186
+ void processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer );
183
187
184
188
private:
185
189
Attachment* const attachment;
@@ -736,6 +740,8 @@ ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmen
736
740
{
737
741
Guard guard (this );
738
742
743
+ header->seq = 0 ;
744
+
739
745
if (sharedMemory->eventInit (&header->serverEvent ) != FB_SUCCESS)
740
746
(Arg::Gds (isc_random) << " ProfilerIpc eventInit(serverEvent) failed" ).raise ();
741
747
}
@@ -817,18 +823,17 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
817
823
}
818
824
});
819
825
820
- const SLONG value = sharedMemory->eventClear (&header->clientEvent );
826
+ const SLONG clientEventCounter = sharedMemory->eventClear (&header->clientEvent );
827
+
828
+ std::unique_lock bufferMutexLock (header->bufferMutex );
821
829
822
- const Tag oldTag = header->tag .exchange (tag);
823
- switch (oldTag)
830
+ switch (header->tag )
824
831
{
825
832
case Tag::NOP:
826
- header->tag = oldTag;
827
833
(Arg::Gds (isc_random) << " Remote attachment failed to start listener thread" ).raise ();
828
834
break ;
829
835
830
836
case Tag::SERVER_EXITED:
831
- header->tag = oldTag;
832
837
(Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
833
838
break ;
834
839
@@ -846,41 +851,49 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
846
851
fb_assert (inSize <= sizeof (header->buffer ));
847
852
memcpy (header->buffer , in, inSize);
848
853
854
+ header->tag = tag;
855
+ const auto seq = ++header->seq ;
856
+
857
+ bufferMutexLock.unlock ();
858
+
849
859
if (sharedMemory->eventPost (&header->serverEvent ) != FB_SUCCESS)
850
860
(Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
851
861
862
+ const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
863
+ const int serverPID = header->serverEvent .event_pid ;
864
+
865
+ while (true )
852
866
{
853
- const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
867
+ { // scope
868
+ EngineCheckout cout (tdbb, FB_FUNCTION);
854
869
855
- const int serverPID = header->serverEvent .event_pid ;
856
- while (true )
857
- {
870
+ if (sharedMemory->eventWait (&header->clientEvent , clientEventCounter, TIMEOUT) == FB_SUCCESS)
871
+ break ;
872
+
873
+ if (serverPID != getpid () && !ISC_check_process_existence (serverPID))
858
874
{
859
- EngineCheckout cout (tdbb, FB_FUNCTION);
860
- if (sharedMemory->eventWait (&header->clientEvent , value, TIMEOUT) == FB_SUCCESS)
861
- break ;
875
+ // Server process was died or exited
876
+ fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
862
877
863
- if (serverPID != getpid () && ! ISC_check_process_existence (serverPID) )
878
+ if (header-> tag == tag )
864
879
{
865
- // Server process was died or exited
866
- fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
867
-
868
- if (header->tag == tag)
880
+ header->tag = Tag::SERVER_EXITED;
881
+ if (header->serverEvent .event_pid )
869
882
{
870
- header->tag = Tag::SERVER_EXITED;
871
- if (header->serverEvent .event_pid )
872
- {
873
- sharedMemory->eventFini (&header->serverEvent );
874
- header->serverEvent .event_pid = 0 ;
875
- }
883
+ sharedMemory->eventFini (&header->serverEvent );
884
+ header->serverEvent .event_pid = 0 ;
876
885
}
877
- break ;
878
886
}
887
+
888
+ break ;
879
889
}
880
- JRD_reschedule (tdbb, true );
881
890
}
891
+
892
+ JRD_reschedule (tdbb, true );
882
893
}
883
894
895
+ bufferMutexLock.lock ();
896
+
884
897
switch (header->tag )
885
898
{
886
899
case Tag::SERVER_EXITED:
@@ -977,7 +990,7 @@ void ProfilerListener::watcherThread()
977
990
{
978
991
while (!exiting)
979
992
{
980
- const SLONG value = sharedMemory->eventClear (&header->serverEvent );
993
+ const SLONG serverEventCounter = sharedMemory->eventClear (&header->serverEvent );
981
994
982
995
if (startup)
983
996
{
@@ -986,18 +999,38 @@ void ProfilerListener::watcherThread()
986
999
}
987
1000
else
988
1001
{
1002
+ ProfilerIpc::Tag tag;
1003
+ unsigned seq;
1004
+ UCharBuffer buffer;
1005
+
989
1006
fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
990
1007
991
1008
try
992
1009
{
993
1010
FbLocalStatus statusVector;
994
1011
EngineContextHolder tdbb (&statusVector, attachment->getInterface (), FB_FUNCTION);
995
1012
996
- processCommand (tdbb);
997
- header->tag = ProfilerIpc::Tag::RESPONSE;
1013
+ { // scope
1014
+ std::unique_lock bufferMutexLock (header->bufferMutex );
1015
+
1016
+ if (header->userName [0 ] && attachment->getUserName () != header->userName )
1017
+ status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1018
+
1019
+ fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
1020
+
1021
+ tag = header->tag ;
1022
+ seq = header->seq ;
1023
+ memcpy (buffer.getBuffer (header->bufferSize , false ), header->buffer , header->bufferSize );
1024
+ }
1025
+
1026
+ processCommand (tdbb, tag, buffer);
1027
+
1028
+ tag = ProfilerIpc::Tag::RESPONSE;
998
1029
}
999
1030
catch (const status_exception& e)
1000
1031
{
1032
+ tag = ProfilerIpc::Tag::EXCEPTION;
1033
+
1001
1034
// // TODO: Serialize status vector instead of formated message.
1002
1035
1003
1036
const ISC_STATUS* status = e.value ();
@@ -1012,32 +1045,51 @@ void ProfilerListener::watcherThread()
1012
1045
errorMsg += temp;
1013
1046
}
1014
1047
1015
- header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ) - 1 );
1016
- strncpy ((char *) header->buffer , errorMsg.c_str (), sizeof (header->buffer ));
1017
- header->buffer [header->bufferSize ] = ' \0 ' ;
1018
-
1019
- header->tag = ProfilerIpc::Tag::EXCEPTION;
1048
+ header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ));
1049
+ memcpy (header->buffer , errorMsg.c_str (), header->bufferSize );
1020
1050
}
1021
1051
1022
- sharedMemory->eventPost (&header->clientEvent );
1052
+ fb_assert (buffer.getCount () <= sizeof (header->buffer ));
1053
+
1054
+ { // scope
1055
+ std::unique_lock bufferMutexLock (header->bufferMutex , std::try_to_lock);
1056
+
1057
+ // Otherwise a client lost interest in the response.
1058
+ if (bufferMutexLock.owns_lock () && header->seq == seq)
1059
+ {
1060
+ if (header->seq == seq)
1061
+ {
1062
+ header->tag = tag;
1063
+ header->bufferSize = buffer.getCount ();
1064
+ memcpy (header->buffer , buffer.begin (), buffer.getCount ());
1065
+
1066
+ sharedMemory->eventPost (&header->clientEvent );
1067
+ }
1068
+ }
1069
+ }
1023
1070
}
1024
1071
1025
1072
if (exiting)
1026
1073
break ;
1027
1074
1028
- sharedMemory->eventWait (&header->serverEvent , value , 0 );
1075
+ sharedMemory->eventWait (&header->serverEvent , serverEventCounter , 0 );
1029
1076
}
1030
1077
}
1031
1078
catch (const Exception& ex)
1032
1079
{
1033
1080
iscLogException (" Error in profiler watcher thread\n " , ex);
1034
1081
}
1035
1082
1036
- const ProfilerIpc::Tag oldTag = header->tag .exchange (ProfilerIpc::Tag::SERVER_EXITED);
1037
- if (oldTag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1038
- {
1039
- fb_assert (header->clientEvent .event_pid );
1040
- sharedMemory->eventPost (&header->clientEvent );
1083
+ { // scope
1084
+ std::unique_lock bufferMutexLock (header->bufferMutex );
1085
+
1086
+ if (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1087
+ {
1088
+ fb_assert (header->clientEvent .event_pid );
1089
+ sharedMemory->eventPost (&header->clientEvent );
1090
+ }
1091
+
1092
+ header->tag = ProfilerIpc::Tag::SERVER_EXITED;
1041
1093
}
1042
1094
1043
1095
try
@@ -1051,70 +1103,75 @@ void ProfilerListener::watcherThread()
1051
1103
}
1052
1104
}
1053
1105
1054
- void ProfilerListener::processCommand (thread_db* tdbb)
1106
+ void ProfilerListener::processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer )
1055
1107
{
1056
- const auto header = ipc->sharedMemory ->getHeader ();
1057
1108
const auto profilerManager = attachment->getProfilerManager (tdbb);
1058
1109
1059
- if (header->userName [0 ] && attachment->getUserName () != header->userName )
1060
- status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1061
-
1062
1110
using Tag = ProfilerIpc::Tag;
1063
1111
1064
- switch (header-> tag )
1112
+ switch (tag)
1065
1113
{
1066
1114
case Tag::CANCEL_SESSION:
1115
+ fb_assert (buffer.isEmpty ());
1067
1116
profilerManager->cancelSession ();
1068
- header-> bufferSize = 0 ;
1117
+ buffer. resize ( 0 ) ;
1069
1118
break ;
1070
1119
1071
1120
case Tag::DISCARD:
1121
+ fb_assert (buffer.isEmpty ());
1072
1122
profilerManager->discard ();
1073
- header-> bufferSize = 0 ;
1123
+ buffer. resize ( 0 ) ;
1074
1124
break ;
1075
1125
1076
1126
case Tag::FINISH_SESSION:
1077
1127
{
1078
- const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(header->buffer );
1079
- fb_assert (sizeof (*in) == header->bufferSize );
1128
+ const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(buffer.begin ());
1129
+ fb_assert (sizeof (*in) == buffer.getCount ());
1130
+
1080
1131
profilerManager->finishSession (tdbb, in->flush );
1081
- header->bufferSize = 0 ;
1132
+
1133
+ buffer.resize (0 );
1082
1134
break ;
1083
1135
}
1084
1136
1085
1137
case Tag::FLUSH:
1138
+ fb_assert (buffer.isEmpty ());
1086
1139
profilerManager->flush ();
1087
- header-> bufferSize = 0 ;
1140
+ buffer. resize ( 0 ) ;
1088
1141
break ;
1089
1142
1090
1143
case Tag::PAUSE_SESSION:
1091
1144
{
1092
- const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(header->buffer );
1093
- fb_assert (sizeof (*in) == header->bufferSize );
1145
+ const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(buffer.begin ());
1146
+ fb_assert (sizeof (*in) == buffer.getCount ());
1147
+
1094
1148
profilerManager->pauseSession (in->flush );
1095
- header->bufferSize = 0 ;
1149
+
1150
+ buffer.resize (0 );
1096
1151
break ;
1097
1152
}
1098
1153
1099
1154
case Tag::RESUME_SESSION:
1155
+ fb_assert (buffer.isEmpty ());
1100
1156
profilerManager->resumeSession ();
1101
- header-> bufferSize = 0 ;
1157
+ buffer. resize ( 0 ) ;
1102
1158
break ;
1103
1159
1104
1160
case Tag::SET_FLUSH_INTERVAL:
1105
1161
{
1106
- const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(header-> buffer );
1107
- fb_assert (sizeof (*in) == header-> bufferSize );
1162
+ const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(buffer. begin () );
1163
+ fb_assert (sizeof (*in) == buffer. getCount () );
1108
1164
1109
1165
profilerManager->setFlushInterval (in->flushInterval );
1110
- header->bufferSize = 0 ;
1166
+
1167
+ buffer.resize (0 );
1111
1168
break ;
1112
1169
}
1113
1170
1114
1171
case Tag::START_SESSION:
1115
1172
{
1116
- const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(header-> buffer );
1117
- fb_assert (sizeof (*in) == header-> bufferSize );
1173
+ const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(buffer. begin () );
1174
+ fb_assert (sizeof (*in) == buffer. getCount () );
1118
1175
1119
1176
const string description (in->description .str ,
1120
1177
in->descriptionNull ? 0 : in->description .length );
@@ -1125,14 +1182,12 @@ void ProfilerListener::processCommand(thread_db* tdbb)
1125
1182
const string pluginOptions (in->pluginOptions .str ,
1126
1183
in->pluginOptionsNull ? 0 : in->pluginOptions .length );
1127
1184
1128
- const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(header->buffer );
1129
- static_assert (sizeof (*out) <= sizeof (header->buffer ), " Buffer size too small" );
1130
- header->bufferSize = sizeof (*out);
1185
+ const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(buffer.begin ());
1186
+ buffer.resize (sizeof (*out));
1131
1187
1132
1188
out->sessionIdNull = FB_FALSE;
1133
1189
out->sessionId = profilerManager->startSession (tdbb, flushInterval,
1134
1190
pluginName, description, pluginOptions);
1135
-
1136
1191
break ;
1137
1192
}
1138
1193
0 commit comments