88import com .datadog .debugger .agent .JsonSnapshotSerializer ;
99import com .datadog .debugger .agent .ProbeStatus ;
1010import com .datadog .debugger .probe .LogProbe ;
11+ import com .datadog .debugger .probe .MetricProbe ;
1112import com .datadog .debugger .util .MoshiHelper ;
1213import com .datadog .debugger .util .MoshiSnapshotTestHelper ;
1314import com .squareup .moshi .JsonAdapter ;
1415import com .squareup .moshi .Types ;
1516import datadog .trace .bootstrap .debugger .CapturedContext ;
1617import java .io .IOException ;
18+ import java .net .DatagramPacket ;
19+ import java .net .DatagramSocket ;
20+ import java .net .SocketException ;
1721import java .nio .file .Files ;
1822import java .nio .file .Path ;
1923import java .nio .file .Paths ;
2327import java .util .Collections ;
2428import java .util .List ;
2529import java .util .UUID ;
30+ import java .util .concurrent .ArrayBlockingQueue ;
31+ import java .util .concurrent .BlockingQueue ;
2632import java .util .concurrent .TimeUnit ;
2733import java .util .function .Function ;
2834import okhttp3 .HttpUrl ;
@@ -44,7 +50,9 @@ public abstract class BaseIntegrationTest {
4450 protected static final int REQUEST_WAIT_TIMEOUT = 10 ;
4551 private static final Path LOG_FILE_BASE =
4652 Paths .get (
47- buildDirectory (), "reports" , "testProcess." + DebuggerIntegrationTest .class .getName ());
53+ buildDirectory (),
54+ "reports" ,
55+ "testProcess." + SimpleAppDebuggerIntegrationTest .class .getName ());
4856 private static final String INFO_CONTENT =
4957 "{\" endpoints\" : [\" v0.4/traces\" , \" debugger/v1/input\" , \" v0.7/config\" ]}" ;
5058 private static final MockResponse AGENT_INFO_RESPONSE =
@@ -54,7 +62,7 @@ public abstract class BaseIntegrationTest {
5462
5563 protected MockWebServer datadogAgentServer ;
5664 private MockDispatcher probeMockDispatcher ;
57-
65+ private StatsDServer statsDServer ;
5866 private HttpUrl probeUrl ;
5967 private HttpUrl snapshotUrl ;
6068 protected Path logFilePath ;
@@ -77,6 +85,9 @@ void setup(TestInfo testInfo) throws Exception {
7785 probeUrl = datadogAgentServer .url (PROBE_URL_PATH );
7886 LOG .info ("DatadogAgentServer on {}" , datadogAgentServer .getPort ());
7987 snapshotUrl = datadogAgentServer .url (SNAPSHOT_URL_PATH );
88+ statsDServer = new StatsDServer ();
89+ statsDServer .start ();
90+ LOG .info ("statsDServer on {}" , statsDServer .getPort ());
8091 logFilePath = LOG_FILE_BASE .resolve (testInfo .getDisplayName () + ".log" );
8192 }
8293
@@ -86,6 +97,7 @@ void teardown() throws Exception {
8697 targetProcess .destroyForcibly ();
8798 }
8899 datadogAgentServer .shutdown ();
100+ statsDServer .close ();
89101 }
90102
91103 protected ProcessBuilder createProcessBuilder (Path logFilePath , String ... params ) {
@@ -109,16 +121,10 @@ protected List<String> getDebuggerCommandParams() {
109121 "-Ddd.jmxfetch.start-delay=0" ,
110122 "-Ddd.jmxfetch.enabled=false" ,
111123 "-Ddd.dynamic.instrumentation.enabled=true" ,
112- // "-Ddd.remote_config.enabled=true", // default
113124 "-Ddd.remote_config.poll_interval.seconds=1" ,
114- /*"-Ddd.remote_config.integrity_check.enabled=false",
115- "-Ddd.dynamic.instrumentation.probe.url=http://localhost:"
116- + probeServer.getPort()
117- + PROBE_URL_PATH,
118- "-Ddd.dynamic.instrumentation.snapshot.url=http://localhost:"
119- + snapshotServer.getPort()
120- + SNAPSHOT_URL_PATH,*/
121125 "-Ddd.trace.agent.url=http://localhost:" + datadogAgentServer .getPort (),
126+ "-Ddd.jmxfetch.statsd.port=" + statsDServer .getPort (),
127+ "-Ddd.dynamic.instrumentation.classfile.dump.enabled=true" ,
122128 // to verify each snapshot upload one by one
123129 "-Ddd.dynamic.instrumentation.upload.batch.size=1" ,
124130 // flush uploads every 100ms to have quick tests
@@ -156,6 +162,10 @@ protected ProbeStatus retrieveProbeStatusRequest() throws Exception {
156162 return probeStatuses .get (0 );
157163 }
158164
165+ protected String retrieveStatsdMessage (String str ) {
166+ return statsDServer .waitForMessage (str );
167+ }
168+
159169 private MockResponse datadogAgentDispatch (RecordedRequest request ) {
160170 LOG .info ("datadogAgentDispatch request path: {}" , request .getPath ());
161171 if (request .getPath ().equals ("/info" )) {
@@ -221,6 +231,13 @@ protected Configuration createConfig(LogProbe logProbe) {
221231 return createConfig (Arrays .asList (logProbe ));
222232 }
223233
234+ protected Configuration createMetricConfig (MetricProbe metricProbe ) {
235+ return Configuration .builder ()
236+ .setService (getAppId ())
237+ .addMetricProbes (Collections .singletonList (metricProbe ))
238+ .build ();
239+ }
240+
224241 protected Configuration createConfig (Collection <LogProbe > logProbes ) {
225242 return new Configuration (getAppId (), logProbes );
226243 }
@@ -301,4 +318,60 @@ public void setDispatcher(Function<RecordedRequest, MockResponse> dispatcher) {
301318 this .dispatcher = dispatcher ;
302319 }
303320 }
321+
322+ private static class StatsDServer extends Thread {
323+ private final DatagramSocket socket ;
324+ private final BlockingQueue <String > msgQueue = new ArrayBlockingQueue <>(32 );
325+ private volatile String lastMessage ;
326+ private volatile boolean running = true ;
327+
328+ StatsDServer () throws SocketException {
329+ socket = new DatagramSocket ();
330+ }
331+
332+ @ Override
333+ public void run () {
334+ byte [] buf = new byte [1024 ];
335+ DatagramPacket packet = new DatagramPacket (buf , buf .length );
336+ while (running ) {
337+ try {
338+ socket .receive (packet );
339+ } catch (IOException e ) {
340+ throw new RuntimeException (e );
341+ }
342+ lastMessage = new String (packet .getData (), 0 , packet .getLength ());
343+ System .out .println ("received statsd: " + lastMessage );
344+ try {
345+ msgQueue .offer (lastMessage , 30 , TimeUnit .SECONDS );
346+ } catch (InterruptedException e ) {
347+ throw new RuntimeException (e );
348+ }
349+ }
350+ socket .close ();
351+ }
352+
353+ String lastMessage () {
354+ return lastMessage ;
355+ }
356+
357+ String waitForMessage (String str ) {
358+ String msg ;
359+ do {
360+ try {
361+ msg = msgQueue .poll (30 , TimeUnit .SECONDS );
362+ } catch (InterruptedException e ) {
363+ throw new RuntimeException (e );
364+ }
365+ } while (msg != null && !msg .contains (str ));
366+ return msg ;
367+ }
368+
369+ void close () {
370+ running = false ;
371+ }
372+
373+ int getPort () {
374+ return socket .getLocalPort ();
375+ }
376+ }
304377}
0 commit comments