diff --git a/lib/spandex_datadog/api_server.ex b/lib/spandex_datadog/api_server.ex index 47694c4..49e45ad 100644 --- a/lib/spandex_datadog/api_server.ex +++ b/lib/spandex_datadog/api_server.ex @@ -3,72 +3,116 @@ defmodule SpandexDatadog.ApiServer do Implements worker for sending spans to datadog as GenServer in order to send traces async. """ - use GenServer + @behaviour :gen_statem + require Logger - alias Spandex.{ - Span, - Trace - } + alias Spandex.{Span, Trace} + alias SpandexDatadog.Formatter defmodule State do @moduledoc false - @type t :: %State{} + @type t :: %__MODULE__{ + send_interval: non_neg_integer(), + export_timeout: non_neg_integer(), + runner_pid: pid(), + handed_off_table: :ets.tab(), + max_queue_size: non_neg_integer(), + size_check_interval: non_neg_integer(), + http: atom(), + url: String.t(), + host: String.t(), + port: non_neg_integer(), + verbose?: boolean() + } defstruct [ - :asynchronous_send?, + :send_interval, + :export_timeout, + :runner_pid, + :handed_off_table, + :max_queue_size, + :size_check_interval, :http, :url, :host, :port, - :verbose?, - :waiting_traces, - :batch_size, - :sync_threshold, - :agent_pid + :verbose? ] end # Same as HTTPoison.headers - @type headers :: [{atom, binary}] | [{binary, binary}] | %{binary => binary} | any + @type headers :: [{atom(), binary()}] | [{binary(), binary()}] | %{binary() => binary()} | any() @headers [{"Content-Type", "application/msgpack"}] + @current_table_key {__MODULE__, :current_table} + @table_1 :"#{__MODULE__}-1" + @table_2 :"#{__MODULE__}-2" + + @enabled_key {__MODULE__, :enabled} + + # https://github.com/DataDog/datadog-agent/blob/fa227f0015a5da85617e8af30778fcfc9521e568/pkg/trace/api/api.go#L46 + @dd_limit 10 * 1024 * 1024 + + @ets_batch 100 + @start_link_opts Optimal.schema( opts: [ host: :string, port: [:integer, :string], verbose?: :boolean, http: :atom, - batch_size: :integer, + api_adapter: :atom, + max_queue_size: :integer, + send_interval: :integer, + export_timeout: :integer, + size_check_interval: :integer, sync_threshold: :integer, - api_adapter: :atom + batch_size: :integer ], defaults: [ host: "localhost", port: 8126, verbose?: false, - batch_size: 10, - sync_threshold: 20, - api_adapter: SpandexDatadog.ApiServer + api_adapter: SpandexDatadog.ApiServer, + max_queue_size: 20 * 1024 * 1024, + send_interval: :timer.seconds(2), + export_timeout: :timer.minutes(1), + size_check_interval: 500 ], required: [:http], describe: [ verbose?: "Only to be used for debugging: All finished traces will be logged", host: "The host the agent can be reached at", port: "The port to use when sending traces to the agent", - batch_size: "The number of traces that should be sent in a single batch", - sync_threshold: - "The maximum number of processes that may be sending traces at any one time. This adds backpressure", http: "The HTTP module to use for sending spans to the agent. Currently only HTTPoison has been tested", - api_adapter: "Which api adapter to use. Currently only used for testing" + api_adapter: "Which api adapter to use. Currently only used for testing", + send_interval: "Interval for sending a batch", + export_timeout: "Timeout to allow each export operation to run", + size_check_interval: "Interval to check the size of the buffer", + sync_threshold: "depreciated", + batch_size: "depreciated" ] ) + # Public Functions + + defdelegate format(trace_or_span), to: Formatter + defdelegate format(span, priority, baggage), to: Formatter + + def child_spec(opts) do + %{ + id: __MODULE__, + type: :worker, + start: {__MODULE__, :start_link, [opts]} + } + end + @doc """ - Starts genserver with given options. + Starts server with given options. #{Optimal.Doc.document(@start_link_opts)} """ @@ -76,39 +120,26 @@ defmodule SpandexDatadog.ApiServer do def start_link(opts) do opts = Optimal.validate!(opts, @start_link_opts) - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end + case Keyword.get(opts, :name) do + nil -> + :gen_statem.start_link({:local, __MODULE__}, __MODULE__, opts, []) - @doc """ - Builds server state. - """ - @spec init(opts :: Keyword.t()) :: {:ok, State.t()} - def init(opts) do - {:ok, agent_pid} = Agent.start_link(fn -> 0 end) - - state = %State{ - asynchronous_send?: true, - host: opts[:host], - port: opts[:port], - verbose?: opts[:verbose?], - http: opts[:http], - waiting_traces: [], - batch_size: opts[:batch_size], - sync_threshold: opts[:sync_threshold], - agent_pid: agent_pid - } + name -> + :gen_statem.start_link({:local, name}, __MODULE__, opts, []) + end + end - {:ok, state} + def callback_mode do + [:state_functions, :state_enter] end @doc """ Send spans asynchronously to DataDog. """ @spec send_trace(Trace.t(), Keyword.t()) :: :ok - def send_trace(%Trace{} = trace, opts \\ []) do - :telemetry.span([:spandex_datadog, :send_trace], %{trace: trace}, fn -> - timeout = Keyword.get(opts, :timeout, 30_000) - result = GenServer.call(__MODULE__, {:send_trace, trace}, timeout) + def send_trace(%Trace{} = trace, _opts \\ []) do + :telemetry.span([:spandex_datadog, :insert_trace], %{trace: trace}, fn -> + result = do_insert(trace) {result, %{trace: trace}} end) end @@ -116,277 +147,303 @@ defmodule SpandexDatadog.ApiServer do @deprecated "Please use send_trace/2 instead" @doc false @spec send_spans([Span.t()], Keyword.t()) :: :ok - def send_spans(spans, opts \\ []) when is_list(spans) do - timeout = Keyword.get(opts, :timeout, 30_000) + def send_spans(spans, _opts \\ []) when is_list(spans) do trace = %Trace{spans: spans} - GenServer.call(__MODULE__, {:send_trace, trace}, timeout) + do_insert(trace) end - @doc false - def handle_call({:send_trace, trace}, _from, state) do - state = - state - |> enqueue_trace(trace) - |> maybe_flush_traces() - - {:reply, :ok, state} + def enable do + Logger.info("%{__MODULE__} is enabled") + :persistent_term.put(@enabled_key, true) end - @spec send_and_log([Trace.t()], State.t()) :: :ok - def send_and_log(traces, %{verbose?: verbose?} = state) do - headers = @headers ++ [{"X-Datadog-Trace-Count", length(traces)}] + def disable do + Logger.info("%{__MODULE__} is disabled") + :persistent_term.put(@enabled_key, false) + end - response = - traces - |> Enum.map(&format/1) - |> encode() - |> push(headers, state) + def is_enabled do + :persistent_term.get(@enabled_key, true) + end - if verbose? do - Logger.debug(fn -> "Trace response: #{inspect(response)}" end) - end + # Callbacks - :ok + def init(opts) do + Process.flag(:trap_exit, true) + + _tid1 = new_export_table(@table_1) + _tid2 = new_export_table(@table_2) + :persistent_term.put(@current_table_key, @table_1) + + enable() + + {:ok, :idle, + %State{ + host: opts[:host], + port: opts[:port], + verbose?: opts[:verbose?], + http: opts[:http], + handed_off_table: :undefined, + max_queue_size: opts[:max_queue_size], + export_timeout: opts[:export_timeout], + size_check_interval: opts[:size_check_interval], + send_interval: opts[:send_interval] + }} end - @deprecated "Please use format/3 instead" - @spec format(Trace.t()) :: map() - def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do - Enum.map(spans, fn span -> format(span, priority, baggage) end) - end + def idle( + :enter, + _old_state, + %State{send_interval: send_interval, size_check_interval: size_check_interval} = state + ) do + if state.verbose?, do: Logger.debug("idle(:enter, _, _)") - @deprecated "Please use format/3 instead" - @spec format(Span.t()) :: map() - def format(%Span{} = span), do: format(span, 1, []) + actions = [ + {{:timeout, :export_spans}, send_interval, :export_spans}, + {{:timeout, :check_table_size}, size_check_interval, :check_table_size} + ] - @spec format(Span.t(), integer(), Keyword.t()) :: map() - def format(%Span{} = span, priority, _baggage) do - %{ - trace_id: span.trace_id, - span_id: span.id, - name: span.name, - start: span.start, - duration: (span.completion_time || SpandexDatadog.Adapter.now()) - span.start, - parent_id: span.parent_id, - error: error(span.error), - resource: span.resource || span.name, - service: span.service, - type: span.type, - meta: meta(span), - metrics: - metrics(span, %{ - _sampling_priority_v1: priority - }) - } + {:keep_state_and_data, actions} end - # Private Helpers - - defp enqueue_trace(state, trace) do - if state.verbose? do - Logger.info(fn -> "Adding trace to stack with #{Enum.count(trace.spans)} spans" end) - end + def idle(_, :export_spans, state) do + if state.verbose?, do: Logger.debug("idle(_, :export_spans, _)") + {:next_state, :exporting, state} + end - %State{state | waiting_traces: [trace | state.waiting_traces]} + def idle(event_type, event, state) do + if state.verbose?, do: Logger.debug("idle/3") + handle_event_(:idle, event_type, event, state) end - defp maybe_flush_traces(%{waiting_traces: traces, batch_size: size} = state) when length(traces) < size do - state + def exporting({:timeout, :export_spans}, :export_spans, state) do + if state.verbose?, do: Logger.debug("exporting({:timeout, :export_spans}, :export_spans, _)") + {:keep_state_and_data, [:postpone]} end - defp maybe_flush_traces(state) do - %{ - asynchronous_send?: async?, - verbose?: verbose?, - waiting_traces: traces - } = state - - if verbose? do - span_count = Enum.reduce(traces, 0, fn trace, acc -> acc + length(trace.spans) end) - Logger.info(fn -> "Sending #{length(traces)} traces, #{span_count} spans." end) - Logger.debug(fn -> "Trace: #{inspect(traces)}" end) - end + def exporting( + :enter, + _old_state, + %State{ + export_timeout: exporting_timeout, + send_interval: send_interval, + size_check_interval: size_check_interval + } = state + ) do + if state.verbose?, do: Logger.debug("exporting(:enter, _, _)") + {old_table_name, runner_pid} = export_spans(state) + + actions = [ + {:state_timeout, exporting_timeout, :exporting_timeout}, + {{:timeout, :export_spans}, send_interval, :export_spans}, + {{:timeout, :check_table_size}, size_check_interval, :check_table_size} + ] - if async? do - if below_sync_threshold?(state) do - Task.start(fn -> - try do - send_and_log(traces, state) - after - Agent.update(state.agent_pid, fn count -> count - 1 end) - end - end) - else - # We get benefits from running in a separate process (like better GC) - # So we async/await here to mimic the behavour above but still apply backpressure - task = Task.async(fn -> send_and_log(traces, state) end) - Task.await(task) - end - else - send_and_log(traces, state) - end + {:keep_state, %State{state | runner_pid: runner_pid, handed_off_table: old_table_name}, actions} + end - %State{state | waiting_traces: []} + def exporting(:state_timeout, :exporting_timeout, %State{handed_off_table: exporting_table} = state) do + if state.verbose?, do: Logger.debug("exporting(:state_timeout, _, _)") + # kill current exporting process because it is taking too long + # which deletes the exporting table, so create a new one and + # repeat the state to force another span exporting immediately + state = kill_runner(state) + new_export_table(exporting_table) + {:repeat_state, state} end - defp below_sync_threshold?(state) do - Agent.get_and_update(state.agent_pid, fn count -> - if count < state.sync_threshold do - {true, count + 1} - else - {false, count} - end - end) + # important to verify runner_pid and from_pid are the same in case it was sent + # after kill_runner was called but before it had done the unlink + def exporting(:info, {:EXIT, from_pid, _}, %State{runner_pid: from_pid} = state) do + if state.verbose?, do: Logger.debug("exporting(:info, {:EXIT, _, _}, _)") + complete_exporting(state) end - @spec meta(Span.t()) :: map - defp meta(span) do - %{} - |> add_datadog_meta(span) - |> add_error_data(span) - |> add_http_data(span) - |> add_sql_data(span) - |> add_tags(span) - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.into(%{}) + # important to verify runner_pid and from_pid are the same in case it was sent + # after kill_runner was called but before it had done the unlink + def exporting(:info, {:completed, from_pid}, %State{runner_pid: from_pid} = state) do + if state.verbose?, do: Logger.debug("exporting(:info, {:completed, _}, _)") + complete_exporting(state) end - @spec add_datadog_meta(map, Span.t()) :: map - defp add_datadog_meta(meta, %Span{env: nil}), do: meta + def exporting(event_type, event, state) do + if state.verbose?, do: Logger.debug("exporting/3") + handle_event_(:exporting, event_type, event, state) + end - defp add_datadog_meta(meta, %Span{env: env}) do - Map.put(meta, :env, env) + def terminate(_, _, state) do + if state.verbose?, do: Logger.debug("terminate/3") + # TODO: flush buffers to export + :ok end - @spec add_error_data(map, Span.t()) :: map - defp add_error_data(meta, %{error: nil}), do: meta + # Private Functions - defp add_error_data(meta, %{error: error}) do - meta - |> add_error_type(error[:exception]) - |> add_error_message(error[:exception]) - |> add_error_stacktrace(error[:stacktrace]) + defp handle_event_(_state, {:timeout, :check_table_size}, :check_table_size, %State{max_queue_size: :infinity}) do + :keep_state_and_data end - @spec add_error_type(map, Exception.t() | nil) :: map - defp add_error_type(meta, nil), do: meta - defp add_error_type(meta, exception), do: Map.put(meta, "error.type", exception.__struct__) + defp handle_event_(_state, {:timeout, :check_table_size}, :check_table_size, %State{ + max_queue_size: max_queue_size + }) do + tab_memory = :ets.info(current_table(), :memory) + tab_size = :ets.info(current_table(), :size) + bytes = tab_memory * :erlang.system_info(:wordsize) + :telemetry.execute([:spandex_datadog, :buffer, :info], %{bytes: bytes, items: tab_size}) - @spec add_error_message(map, Exception.t() | nil) :: map - defp add_error_message(meta, nil), do: meta + case bytes do + m when m >= max_queue_size -> + disable() + :keep_state_and_data - defp add_error_message(meta, exception), - do: Map.put(meta, "error.msg", Exception.message(exception)) - - @spec add_error_stacktrace(map, list | nil) :: map - defp add_error_stacktrace(meta, nil), do: meta + _ -> + enable() + :keep_state_and_data + end + end - defp add_error_stacktrace(meta, stacktrace), - do: Map.put(meta, "error.stack", Exception.format_stacktrace(stacktrace)) + defp handle_event_(_, _, _, _) do + :keep_state_and_data + end - @spec add_http_data(map, Span.t()) :: map - defp add_http_data(meta, %{http: nil}), do: meta + defp do_insert(trace) do + try do + case is_enabled() do + true -> + :telemetry.execute([:spandex_datadog, :insert_trace, :inserted], %{}) + :ets.insert(current_table(), {System.monotonic_time(), trace}) - defp add_http_data(meta, %{http: http}) do - status_code = - if http[:status_code] do - to_string(http[:status_code]) + _ -> + :telemetry.execute([:spandex_datadog, :insert_trace, :dropped], %{}) + :dropped end + catch + :error, :badarg -> + {:error, :no_batch_span_processor} - meta - |> Map.put("http.url", http[:url]) - |> Map.put("http.status_code", status_code) - |> Map.put("http.method", http[:method]) + _, _ -> + {:error, :other} + end end - @spec add_sql_data(map, Span.t()) :: map - defp add_sql_data(meta, %{sql_query: nil}), do: meta + defp complete_exporting(%State{handed_off_table: exporting_table} = state) when exporting_table != :undefined do + new_export_table(exporting_table) + {:next_state, :idle, %State{state | runner_pid: :undefined, handed_off_table: :undefined}} + end + + defp kill_runner(%State{runner_pid: runner_pid} = state) do + :erlang.unlink(runner_pid) + :erlang.exit(runner_pid, :kill) + %State{state | runner_pid: :undefined, handed_off_table: :undefined} + end + + defp new_export_table(name) do + :ets.new(name, [:public, :named_table, {:write_concurrency, true}, :duplicate_bag]) + end - defp add_sql_data(meta, %{sql_query: sql}) do - meta - |> Map.put("sql.query", sql[:query]) - |> Map.put("sql.rows", sql[:rows]) - |> Map.put("sql.db", sql[:db]) + defp current_table do + :persistent_term.get(@current_table_key) end - @spec add_tags(map, Span.t()) :: map - defp add_tags(meta, %{tags: nil}), do: meta + defp export_spans(%State{} = state) do + current_table = current_table() - defp add_tags(meta, %{tags: tags}) do - tags = tags |> Keyword.delete(:analytics_event) + new_current_table = + case current_table do + @table_1 -> @table_2 + @table_2 -> @table_1 + end + + # an atom is a single word so this does not trigger a global GC + :persistent_term.put(@current_table_key, new_current_table) + # set the table to accept inserts + enable() - Map.merge( - meta, - tags - |> Enum.map(fn {k, v} -> {k, term_to_string(v)} end) - |> Enum.into(%{}) - ) + self = self() + runner_pid = :erlang.spawn_link(fn -> do_send_spans(self, state) end) + :ets.give_away(current_table, runner_pid, :export) + {current_table, runner_pid} end - @spec metrics(Span.t(), map) :: map - defp metrics(span, initial_value = %{}) do - initial_value - |> add_metrics(span) - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.into(%{}) + # Additional benefit of using a separate process is calls to `register` won't + # timeout if the actual exporting takes longer than the call timeout + defp do_send_spans(from_pid, state) do + receive do + {:"ETS-TRANSFER", table, ^from_pid, :export} -> + table_name = :ets.rename(table, :current_send_table) + export(table_name, state) + :ets.delete(table_name) + completed(from_pid) + end end - @spec add_metrics(map, Span.t()) :: map - defp add_metrics(metrics, %{tags: nil}), do: metrics + defp completed(from_pid) do + send(from_pid, {:completed, self()}) + end - defp add_metrics(metrics, %{tags: tags}) do - with analytics_event <- tags |> Keyword.get(:analytics_event), - true <- analytics_event != nil do - Map.merge( - metrics, - %{"_dd1.sr.eausr" => 1} - ) - else - _ -> - metrics + defp export(trace_table, state) do + # don't let a export exception crash us + # and return true if export failed + try do + Stream.resource(fn -> stream_start(trace_table) end, &stream_next/1, &stream_after/1) + |> Stream.map(&format_trace/1) + |> Stream.map(&Msgpax.pack_fragment!/1) + # One byte overhead for msgpack and one byte for the first entry + |> Stream.chunk_while({[], 0, 2}, &stream_chunk/2, &stream_chunk_after/1) + |> Stream.map(&{length(&1), Msgpax.pack!(&1)}) + |> Stream.map(fn {count, payload} -> + :telemetry.span([:spandex_datadog, :send_traces], %{events: count}, fn -> + headers = @headers ++ [{"X-Datadog-Trace-Count", count}] + res = push(payload, headers, state) + + if state.verbose?, do: Logger.debug("push_result: #{inspect(res)}") + + {true, %{events: count}} + end) + + :ok + end) + |> Stream.run() + + true + catch + e -> + Logger.error("export threw exception: #{inspect(e)}") + true end end - @spec error(nil | Keyword.t()) :: integer - defp error(nil), do: 0 + defp stream_start(trace_table) do + :ets.select(trace_table, [{{:"$1", :"$2"}, [], [:"$2"]}], @ets_batch) + end + + defp stream_next(:"$end_of_table"), do: {:halt, :ok} + defp stream_next({:continue, continuation}), do: {[], :ets.select(continuation)} + defp stream_next({traces, continuation}), do: {traces, {:continue, continuation}} - defp error(keyword) do - if Enum.any?(keyword, fn {_, v} -> not is_nil(v) end) do - 1 - else - 0 - end + defp stream_after(_), do: :ok + + defp format_trace(%Trace{spans: spans, priority: priority, baggage: baggage}) do + Enum.map(spans, &Formatter.format(&1, priority, baggage)) end - @spec encode(data :: term) :: iodata | no_return - defp encode(data), - do: data |> deep_remove_nils() |> Msgpax.pack!(data) + defp stream_chunk(fragment, {acc, cur_len, overhead}) do + case IO.iodata_length(fragment.data) do + length when length + overhead > @dd_limit -> + {:cont, {acc, cur_len}, overhead} - @spec push(body :: iodata(), headers, State.t()) :: any() - defp push(body, headers, %State{http: http, host: host, port: port}), - do: http.put("#{host}:#{port}/v0.3/traces", body, headers) + length when length + overhead + cur_len > @dd_limit -> + {:cont, acc, {[fragment], length, 2}} - @spec deep_remove_nils(term) :: term - defp deep_remove_nils(term) when is_map(term) do - term - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) - |> Enum.into(%{}) - end - - defp deep_remove_nils(term) when is_list(term) do - if Keyword.keyword?(term) do - term - |> Enum.reject(fn {_k, v} -> is_nil(v) end) - |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) - else - Enum.map(term, &deep_remove_nils/1) + length -> + {:cont, {[fragment | acc], cur_len + length, overhead + 1}} end end - defp deep_remove_nils(term), do: term + defp stream_chunk_after({[], _, _}), do: {:cont, {[], 0, 2}} + defp stream_chunk_after({acc, _, _}), do: {:cont, Enum.reverse(acc), {[], 0, 2}} - defp term_to_string(term) when is_binary(term), do: term - defp term_to_string(term) when is_atom(term), do: term - defp term_to_string(term), do: inspect(term) + @spec push(body :: iodata(), headers, State.t()) :: any() + defp push(body, headers, %State{http: http, host: host, port: port}), + do: http.put("#{host}:#{port}/v0.3/traces", body, headers) end diff --git a/lib/spandex_datadog/formatter.ex b/lib/spandex_datadog/formatter.ex new file mode 100644 index 0000000..6406dbd --- /dev/null +++ b/lib/spandex_datadog/formatter.ex @@ -0,0 +1,182 @@ +defmodule SpandexDatadog.Formatter do + @moduledoc """ + Formats Traces for sending to Datadog + """ + + alias Spandex.{Span, Trace} + + @deprecated "Please use format/3 instead" + @spec format(Trace.t()) :: map() + def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do + Enum.map(spans, fn span -> format(span, priority, baggage) end) + end + + @deprecated "Please use format/3 instead" + @spec format(Span.t()) :: map() + def format(%Span{} = span), do: format(span, 1, []) + + @spec format(Span.t(), integer(), Keyword.t()) :: map() + def format(%Span{} = span, priority, _baggage) do + %{ + trace_id: span.trace_id, + span_id: span.id, + name: span.name, + start: span.start, + duration: (span.completion_time || SpandexDatadog.Adapter.now()) - span.start, + parent_id: span.parent_id, + error: error(span.error), + resource: span.resource || span.name, + service: span.service, + type: span.type, + meta: meta(span), + metrics: + metrics(span, %{ + _sampling_priority_v1: priority + }) + } + |> deep_remove_nils() + end + + @spec error(nil | Keyword.t()) :: integer + defp error(nil), do: 0 + + defp error(keyword) do + if Enum.any?(keyword, fn {_, v} -> not is_nil(v) end) do + 1 + else + 0 + end + end + + @spec meta(Span.t()) :: map + defp meta(span) do + %{} + |> add_datadog_meta(span) + |> add_error_data(span) + |> add_http_data(span) + |> add_sql_data(span) + |> add_tags(span) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.into(%{}) + end + + @spec add_datadog_meta(map, Span.t()) :: map + defp add_datadog_meta(meta, %Span{env: nil}), do: meta + + defp add_datadog_meta(meta, %Span{env: env}) do + Map.put(meta, :env, env) + end + + @spec add_error_data(map, Span.t()) :: map + defp add_error_data(meta, %{error: nil}), do: meta + + defp add_error_data(meta, %{error: error}) do + meta + |> add_error_type(error[:exception]) + |> add_error_message(error[:exception]) + |> add_error_stacktrace(error[:stacktrace]) + end + + @spec add_error_type(map, Exception.t() | nil) :: map + defp add_error_type(meta, nil), do: meta + defp add_error_type(meta, exception), do: Map.put(meta, "error.type", exception.__struct__) + + @spec add_error_message(map, Exception.t() | nil) :: map + defp add_error_message(meta, nil), do: meta + + defp add_error_message(meta, exception), + do: Map.put(meta, "error.msg", Exception.message(exception)) + + @spec add_error_stacktrace(map, list | nil) :: map + defp add_error_stacktrace(meta, nil), do: meta + + defp add_error_stacktrace(meta, stacktrace), + do: Map.put(meta, "error.stack", Exception.format_stacktrace(stacktrace)) + + @spec add_http_data(map, Span.t()) :: map + defp add_http_data(meta, %{http: nil}), do: meta + + defp add_http_data(meta, %{http: http}) do + status_code = + if http[:status_code] do + to_string(http[:status_code]) + end + + meta + |> Map.put("http.url", http[:url]) + |> Map.put("http.status_code", status_code) + |> Map.put("http.method", http[:method]) + end + + @spec add_sql_data(map, Span.t()) :: map + defp add_sql_data(meta, %{sql_query: nil}), do: meta + + defp add_sql_data(meta, %{sql_query: sql}) do + meta + |> Map.put("sql.query", sql[:query]) + |> Map.put("sql.rows", sql[:rows]) + |> Map.put("sql.db", sql[:db]) + end + + @spec add_tags(map, Span.t()) :: map + defp add_tags(meta, %{tags: nil}), do: meta + + defp add_tags(meta, %{tags: tags}) do + tags = tags |> Keyword.delete(:analytics_event) + + Map.merge( + meta, + tags + |> Enum.map(fn {k, v} -> {k, term_to_string(v)} end) + |> Enum.into(%{}) + ) + end + + @spec metrics(Span.t(), map) :: map + defp metrics(span, initial_value = %{}) do + initial_value + |> add_metrics(span) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.into(%{}) + end + + @spec add_metrics(map, Span.t()) :: map + defp add_metrics(metrics, %{tags: nil}), do: metrics + + defp add_metrics(metrics, %{tags: tags}) do + with analytics_event <- tags |> Keyword.get(:analytics_event), + true <- analytics_event != nil do + Map.merge( + metrics, + %{"_dd1.sr.eausr" => 1} + ) + else + _ -> + metrics + end + end + + @spec deep_remove_nils(term) :: term + defp deep_remove_nils(term) when is_map(term) do + term + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) + |> Enum.into(%{}) + end + + defp deep_remove_nils(term) when is_list(term) do + if Keyword.keyword?(term) do + term + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end) + else + Enum.map(term, &deep_remove_nils/1) + end + end + + defp deep_remove_nils(term), do: term + + defp term_to_string(term) when is_binary(term), do: term + defp term_to_string(term) when is_atom(term), do: term + defp term_to_string(term), do: inspect(term) +end diff --git a/mix.exs b/mix.exs index e1fef22..fa922bc 100644 --- a/mix.exs +++ b/mix.exs @@ -58,7 +58,7 @@ defmodule SpandexDatadog.MixProject do {:spandex, "~> 3.0"}, {:telemetry, "~> 0.4"}, {:httpoison, "~> 0.13 or ~> 1.0", only: :test}, - {:msgpax, "~> 2.2.1"} + {:msgpax, "~> 2.3"} ] end end diff --git a/mix.lock b/mix.lock index ee7edda..bc2aedc 100644 --- a/mix.lock +++ b/mix.lock @@ -13,16 +13,16 @@ "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, "makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, + "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, - "msgpax": {:hex, :msgpax, "2.2.4", "7b3790ef684089076b63c0f08c2f4b079c6311daeb006b69e4ed2bf67518291e", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "b351b6d992d79624a8430a99d21a41b36b1b90edf84326a294e9f4a2de11f089"}, + "msgpax": {:hex, :msgpax, "2.3.0", "14f52ad249a3f77b5e2d59f6143e6c18a6e74f34666989e22bac0a465f9835cc", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "65c36846a62ed5615baf7d7d47babb6541313a6c0b6d2ff19354bd518f52df7e"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "optimal": {:hex, :optimal, "0.3.6", "46bbf52fbbbd238cda81e02560caa84f93a53c75620f1fe19e81e4ae7b07d1dd", [:mix], [], "hexpm", "1a06ea6a653120226b35b283a1cd10039550f2c566edcdec22b29316d73640fd"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, - "plug": {:hex, :plug, "1.11.0", "f17217525597628298998bc3baed9f8ea1fa3f1160aa9871aee6df47a6e4d38e", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2d9c633f0499f9dc5c2fd069161af4e2e7756890b81adcbb2ceaa074e8308876"}, - "plug_crypto": {:hex, :plug_crypto, "1.2.0", "1cb20793aa63a6c619dd18bb33d7a3aa94818e5fd39ad357051a67f26dfa2df6", [:mix], [], "hexpm", "a48b538ae8bf381ffac344520755f3007cc10bd8e90b240af98ea29b69683fc2"}, + "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, + "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "spandex": {:hex, :spandex, "3.0.3", "91aa318f3de696bb4d931adf65f7ebdbe5df25cccce1fe8fd376a44c46bcf69b", [:mix], [{:decorator, "~> 1.2", [hex: :decorator, repo: "hexpm", optional: true]}, {:optimal, "~> 0.3.3", [hex: :optimal, repo: "hexpm", optional: false]}, {:plug, ">= 1.0.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "e3e6c319d0ab478ddc9a39102a727a410c962b4d51c0932c72279b86d3b17044"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/test/api_server_test.exs b/test/api_server_test.exs index c2dd6cb..970063c 100644 --- a/test/api_server_test.exs +++ b/test/api_server_test.exs @@ -1,8 +1,6 @@ defmodule SpandexDatadog.ApiServerTest do use ExUnit.Case, async: false - import ExUnit.CaptureLog - alias Spandex.{ Span, Trace @@ -12,14 +10,28 @@ defmodule SpandexDatadog.ApiServerTest do defmodule TestOkApiServer do def put(url, body, headers) do - send(self(), {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) + pid = :persistent_term.get(:test_pid) + + data = + body + |> Msgpax.unpack!() + |> hd() + + send(pid, {:put_datadog_spans, data, url, headers}) {:ok, %HTTPoison.Response{status_code: 200}} end end defmodule TestErrorApiServer do def put(url, body, headers) do - send(self(), {:put_datadog_spans, body |> Msgpax.unpack!() |> hd(), url, headers}) + pid = :persistent_term.get(:test_pid) + + data = + body + |> Msgpax.unpack!() + |> hd() + + send(pid, {:put_datadog_spans, data, url, headers}) {:error, %HTTPoison.Error{id: :foo, reason: :bar}} end end @@ -31,14 +43,7 @@ defmodule SpandexDatadog.ApiServerTest do end end - defmodule TelemetryRecorderPDict do - def handle_event(event, measurements, metadata, _cfg) do - Process.put(event, {measurements, metadata}) - end - end - setup_all do - {:ok, agent_pid} = Agent.start_link(fn -> 0 end) trace_id = 4_743_028_846_331_200_905 {:ok, span_1} = @@ -84,86 +89,73 @@ defmodule SpandexDatadog.ApiServerTest do trace: trace, url: "localhost:8126/v0.3/traces", state: %ApiServer.State{ - asynchronous_send?: false, host: "localhost", port: "8126", http: TestOkApiServer, - verbose?: false, - waiting_traces: [], - batch_size: 1, - agent_pid: agent_pid + verbose?: false } ] } end describe "ApiServer.send_trace/2" do - test "executes telemetry on success", %{trace: trace} do + test "executes telemetry on success", %{test: test, trace: trace} do + self = self() + + :persistent_term.put(:test_pid, self) + :telemetry.attach_many( - "log-response-handler", + "#{test}", [ - [:spandex_datadog, :send_trace, :start], - [:spandex_datadog, :send_trace, :stop], - [:spandex_datadog, :send_trace, :exception] + [:spandex_datadog, :insert_trace, :start], + [:spandex_datadog, :insert_trace, :stop], + [:spandex_datadog, :insert_trace, :exception] ], - &TelemetryRecorderPDict.handle_event/4, + fn name, measurements, metadata, _ -> + send(self, {:telemetry_event, name, measurements, metadata}) + end, nil ) - ApiServer.start_link(http: TestOkApiServer) + start_supervised!({ApiServer, http: TestOkApiServer}) ApiServer.send_trace(trace) - {start_measurements, start_metadata} = Process.get([:spandex_datadog, :send_trace, :start]) + assert_receive {:telemetry_event, [:spandex_datadog, :insert_trace, :start], start_measurements, start_metadata}, + 5000 + assert start_measurements[:system_time] assert trace == start_metadata[:trace] - {stop_measurements, stop_metadata} = Process.get([:spandex_datadog, :send_trace, :stop]) + assert_receive {:telemetry_event, [:spandex_datadog, :insert_trace, :stop], stop_measurements, stop_metadata}, + 5000 + assert stop_measurements[:duration] assert trace == stop_metadata[:trace] - refute Process.get([:spandex_datadog, :send_trace, :exception]) + refute_receive {:telemetry_event, [:spandex_datadog, :insert_trace, :exception], _} end - test "executes telemetry on exception", %{trace: trace} do + test "sends the trace", %{test: test, trace: trace, url: url} do + self = self() + :persistent_term.put(:test_pid, self()) + :telemetry.attach_many( - "log-response-handler", + "#{test}", [ - [:spandex_datadog, :send_trace, :start], - [:spandex_datadog, :send_trace, :stop], - [:spandex_datadog, :send_trace, :exception] + [:spandex_datadog, :send_traces, :start], + [:spandex_datadog, :send_traces, :stop], + [:spandex_datadog, :send_traces, :exception] ], - &TelemetryRecorderPDict.handle_event/4, + fn name, measurements, metadata, _ -> + send(self, {:telemetry_event, name, measurements, metadata}) + end, nil ) - ApiServer.start_link(http: TestSlowApiServer, batch_size: 0, sync_threshold: 0) - - catch_exit(ApiServer.send_trace(trace, timeout: 1)) - - {start_measurements, start_metadata} = Process.get([:spandex_datadog, :send_trace, :start]) - assert start_measurements[:system_time] - assert trace == start_metadata[:trace] - - refute Process.get([:spandex_datadog, :send_trace, :stop]) - - {exception_measurements, exception_metadata} = Process.get([:spandex_datadog, :send_trace, :exception]) - assert exception_measurements[:duration] - assert trace == start_metadata[:trace] - assert :exit == exception_metadata[:kind] - assert nil == exception_metadata[:error] - assert is_list(exception_metadata[:stacktrace]) - end - end + start_supervised!({ApiServer, http: TestOkApiServer, verbose?: true, send_interval: 50}) - describe "ApiServer.handle_call/3 - :send_trace" do - test "doesn't log anything when verbose?: false", %{trace: trace, state: state, url: url} do - log = - capture_log(fn -> - ApiServer.handle_call({:send_trace, trace}, self(), state) - end) - - assert log == "" + ApiServer.send_trace(trace) formatted = [ %{ @@ -227,27 +219,22 @@ defmodule SpandexDatadog.ApiServerTest do {"X-Datadog-Trace-Count", 1} ] - assert_received {:put_datadog_spans, ^formatted, ^url, ^headers} - end + assert_receive {:put_datadog_spans, ^formatted, ^url, ^headers} - test "doesn't care about the response result", %{trace: trace, state: state, url: url} do - state = - state - |> Map.put(:verbose?, true) - |> Map.put(:http, TestErrorApiServer) + assert_receive {:telemetry_event, [:spandex_datadog, :send_traces, :start], start_measurements, _metadata} + assert start_measurements[:system_time] - [enqueue, processing, received_spans, response] = - capture_log(fn -> - {:reply, :ok, _} = ApiServer.handle_call({:send_trace, trace}, self(), state) - end) - |> String.split("\n") - |> Enum.reject(fn s -> s == "" end) + assert_receive {:telemetry_event, [:spandex_datadog, :send_traces, :stop], stop_measurements, _metadata} + assert stop_measurements[:duration] - assert enqueue =~ ~r/Adding trace to stack with 3 spans/ + refute_receive {:telemetry_event, [:spandex_datadog, :send_traces, :exception], _} + end - assert processing =~ ~r/Sending 1 traces, 3 spans/ + test "doesn't care about the response result", %{trace: trace, url: url} do + :persistent_term.put(:test_pid, self()) + start_supervised!({ApiServer, http: TestErrorApiServer, verbose?: true, send_interval: 50}) - assert received_spans =~ ~r/Trace: \[%Spandex.Trace{/ + ApiServer.send_trace(trace) formatted = [ %{ @@ -306,8 +293,12 @@ defmodule SpandexDatadog.ApiServerTest do } ] - assert response =~ ~r/Trace response: {:error, %HTTPoison.Error{id: :foo, reason: :bar}}/ - assert_received {:put_datadog_spans, ^formatted, ^url, _} + headers = [ + {"Content-Type", "application/msgpack"}, + {"X-Datadog-Trace-Count", 1} + ] + + assert_receive {:put_datadog_spans, ^formatted, ^url, ^headers} end end end