Processors are used at various stages of a pipeline. Generally, a processor pre-processes data before it is exported (e.g. modify attributes or sample) or helps ensure that data makes it through a pipeline successfully (e.g. batch/retry).
IMPORTANT: All processors only work with trace receivers/exporters today.
Some important aspects of pipelines and processors to be aware of:
Supported processors (sorted alphabetically):
- Attributes Processor
- Batch Processor
- Memory Limiter Processor
- Queued Retry Processor
- Resource Processor
- Sampling Processor
- Span Processor
The contributors repository has more processors that can be added to custom builds of the Collector.
The ownership of the TraceData
and MetricsData
in a pipeline is passed as the data travels
through the pipeline. The data is created by the receiver and then the ownership is passed
to the first processor when ConsumeTraceData
/ConsumeMetricsData
function is called.
Note: the receiver may be attached to multiple pipelines, in which case the same data will be passed to all attached pipelines via a data fan-out connector.
From data ownership perspective pipelines can work in 2 modes:
- Exclusive data ownership
- Shared data ownership
The mode is defined during startup based on data modification intent reported by the
processors. The intent is reported by each processor via MutatesConsumedData
field of
the struct returned by GetCapabilities
function. If any processor in the pipeline
declares an intent to modify the data then that pipeline will work in exclusive ownership
mode. In addition, any other pipeline that receives data from a receiver that is attached
to a pipeline with exclusive ownership mode will be also operating in exclusive ownership
mode.
In exclusive ownership mode the data is owned exclusively by a particular processor at a given moment of time and the processor is free to modify the data it owns.
Exclusive ownership mode is only applicable for pipelines that receive data from the same receiver. If a pipeline is marked to be in exclusive ownership mode then any data received from a shared receiver will be cloned at the fan-out connector before passing further to each pipeline. This ensures that each pipeline has its own exclusive copy of data and the data can be safely modified in the pipeline.
The exclusive ownership of data allows processors to freely modify the data while
they own it (e.g. see attributesprocessor
). The duration of ownership of the data
by processor is from the beginning of ConsumeTraceData
/ConsumeMetricsData
call
until the processor calls the next processor's ConsumeTraceData
/ConsumeMetricsData
function, which passes the ownership to the next processor. After that the processor
must no longer read or write the data since it may be concurrently modified by the
new owner.
Exclusive Ownership mode allows to easily implement processors that need to modify the data by simply declaring such intent.
In shared ownership mode no particular processor owns the data and no processor is allowed the modify the shared data.
In this mode no cloning is performed at the fan-out connector of receivers that
are attached to multiple pipelines. In this case all such pipelines will see
the same single shared copy of the data. Processors in pipelines operating in shared
ownership mode are prohibited from modifying the original data that they receive
via ConsumeTraceData
/ConsumeMetricsData
call. Processors may only read the data but
must not modify the data.
If the processor needs to modify the data while performing the processing but
does not want to incur the cost of data cloning that Exclusive mode brings then
the processor can declare that it does not modify the data and use any
different technique that ensures original data is not modified. For example,
the processor can implement copy-on-write approach for individual sub-parts of
TraceData
/MetricsData
argument. Any approach that does not mutate the
original TraceData
/MetricsData
argument (including referenced data, such as
Node
, Resource
, Spans
, etc) is allowed.
If the processor uses such technique it should declare that it does not intend
to modify the original data by setting MutatesConsumedData=false
in its capabilities
to avoid marking the pipeline for Exclusive ownership and to avoid the cost of
data cloning described in Exclusive Ownership section.
The order processors are specified in a pipeline is important as this is the order in which each processor is applied to traces and metrics.
The attribute processor and the span processor expose the option to provide a set of properties of a span to match against to determine if the span should be included or excluded from the processor. By default, all spans are processed by the processor.
To configure this option, under include
and/or exclude
:
- at least one of
services
,span_names
orattributes
is required.
Note: If both include
and exclude
are specified, the include
properties
are checked before the exclude
properties.
{span, attributes}:
# include and/or exclude can be specified. However, the include properties
# are always checked before the exclude properties.
{include, exclude}:
# At least one of services, span_names or attributes must be specified.
# It is supported to have more than one specified, but all of the specified
# conditions must evaluate to true for a match to occur.
# match_type controls how items in "services" and "span_names" arrays are
# interpreted. Possible values are "regexp" or "strict".
# This is a required field.
match_type: {strict, regexp}
# services specify an array of items to match the service name against.
# A match occurs if the span service name matches at least of the items.
# This is an optional field.
services: [<item1>, ..., <itemN>]
# The span name must match at least one of the items.
# This is an optional field.
span_names: [<item1>, ..., <itemN>]
# Attributes specifies the list of attributes to match against.
# All of these attributes must match exactly for a match to occur.
# Only match_type=strict is allowed if "attributes" are specified.
# This is an optional field.
attributes:
# Key specifies the attribute to match against.
- key: <key>
# Value specifies the exact value to match against.
# If not specified, a match occurs if the key is present in the attributes.
value: {value}
No processors are enabled by default, however multiple processors are recommended to be enabled. These are:
- memory_limiter
- any sampling processors
- batch
- any other processors
- queued_retry
In addition, it is important to note that the order of processors matters. The order above is the best practice. Refer to the individual processor documentation below for more information.
The attributes processor modifies attributes of a span. Please refer to config.go for the config spec.
It optionally supports the ability to include/exclude spans.
It takes a list of actions which are performed in order specified in the config. The supported actions are:
insert
: Inserts a new attribute in spans where the key does not already exist.update
: Updates an attribute in spans where the key does exist.upsert
: Performs insert or update. Inserts a new attribute in spans where the key does not already exist and updates an attribute in spans where the key does exist.delete
: Deletes an attribute from a span.hash
: Hashes (SHA1) an existing attribute value.
For the actions insert
, update
and upsert
,
key
is required- one of
value
orfrom_attribute
is required action
is required.
# Key specifies the attribute to act upon.
- key: <key>
action: {insert, update, upsert}
# Value specifies the value to populate for the key.
# The type is inferred from the configuration.
value: <value>
# Key specifies the attribute to act upon.
- key: <key>
action: {insert, update, upsert}
# FromAttribute specifies the attribute from the span to use to populate
# the value. If the attribute doesn't exist, no action is performed.
from_attribute: <other key>
For the delete
action,
key
is requiredaction: delete
is required.
# Key specifies the attribute to act upon.
- key: <key>
action: delete
For the hash
action,
key
is requiredaction: hash
is required.
# Key specifies the attribute to act upon.
- key: <key>
action: hash
The list of actions can be composed to create rich scenarios, such as back filling attribute, copying values to a new key, redacting sensitive information. The following is a sample configuration.
processors:
attributes/example:
actions:
- key: db.table
action: delete
- key: redacted_span
value: true
action: upsert
- key: copy_key
from_attribute: key_original
action: update
- key: account_id
value: 2245
- key: account_password
action: delete
- key: account_email
action: hash
Refer to config.yaml for detailed examples on using the processor.
The batch processor accepts spans and places them into batches grouped by node and resource. Batching helps better compress the data and reduce the number of outgoing connections required to transmit the data. This processor supports both size and time based batching.
It is highly recommended to configure the batch processor on every collector. The batch processor should be defined in the pipeline after the memory_limiter as well as any sampling processors. This is because batching should happen after any data drops such as sampling.
Please refer to config.go for the config spec.
The following configuration options can be modified:
num_tickers
(default = 4): Number of tickers that loop over batch bucketsremove_after_ticks
(default = 10): Number of ticks passed without a span arriving for a node at which time batcher is deletedsend_batch_size
(default = 8192): Number of spans after which a batch will be sent regardless of timetick_time
(default = 1s): Interval in which the tickers ticktimeout
(default = 1s): Time duration after which a batch will be sent regardless of size
Examples:
processors:
batch:
batch/2:
num_tickers: 10
remove_after_ticks: 20
send_batch_size: 1000
tick_time: 5s
timeout: 10s
Refer to config.yaml for detailed examples on using the processor.
The memory_limiter processor is used to prevent out of memory situations on the collector. Given that the amount and type of data a collector processes is environment specific and resource utilization of the collector is also dependent on the configured processors, it is important to put checks in place regarding memory usage. The memory_limiter processor offers the follow safeguards:
- Ability to define an interval when memory usage will be checked and if memory usage exceeds a defined limit will trigger GC to reduce memory consumption.
- Ability to define an interval when memory usage will be compared against the previous interval's value and if the delta exceeds a defined limit will trigger GC to reduce memory consumption.
In addition, there is a command line option (mem-ballast-size-mib
) which can be
used to define a ballast, which allocates memory and provides stability to the
heap. If defined, the ballast increases the base size of the heap so that GC
triggers are delayed and the number of GC cycles over time is reduced. While the
ballast is configured via the command line, today the same value configured on the
command line must also be defined in the memory_limiter processor.
Note that while these configuration options can help mitigate out of memory situations, they are not a replacement for properly sizing and configuring the collector. For example, if the limit or spike thresholds are crossed, the collector will return errors to all receive operations until enough memory is freed. This may result in dropped data.
It is highly recommended to configure the ballast command line option as well as the memory_limiter processor on every collector. The ballast should be configured to be 1/3 to 1/2 of the memory allocated to the collector. The memory_limiter processor should be the first processor defined in the pipeline (immediately after the receivers). This is to ensure that backpressure can be sent to applicable receivers and minimize the likelihood of dropped data when the memory_limiter gets triggered.
Please refer to config.go for the config spec.
The following configuration options must be changed:
check_interval
(default = 0s): Time between measurements of memory usage. Values below 1 second are not recommended since it can result in unnecessary CPU consumption.limit_mib
(default = 0): Maximum amount of memory, in MiB, targeted to be allocated by the process heap. Note that typically the total memory usage of process will be about 50MiB higher than this value.spike_limit_mib
(default = 0): Maximum spike expected between the measurements of memory usage. The value must be less thanlimit_mib
.
The following configuration options can also be modified:
ballast_size_mib
(default = 0): Must match themem-ballast-size-mib
command line option.
Examples:
processors:
memory_limiter:
ballast_size_mib: 2000
check_interval: 5s
limit_mib: 4000
spike_limit_mib: 500
Refer to config.yaml for detailed examples on using the processor.
The queued_retry processor uses a bounded queue to relay batches from the receiver or previous processor to the next processor. Received data is enqueued immediately if the queue is not full. At the same time, the processor has one or more workers which consume the data in the queue by sending them to the next processor or exporter. If relaying the data to the next processor or exporter in the pipeline fails, the processor retries after some backoff delay depending on the configuration.
Some important things to keep in mind with the queued_retry processor:
- Given that if the queue is full the data will be dropped, it is important to size the queue appropriately.
- Since the queue is based on batches and batch sizes are environment specific, it may not be easy to understand how much memory a queue will consume.
- Finally, the queue size is dependent on the deployment model of the collector. The agent deployment model typically has a smaller queue than the collector deployment model.
It is highly recommended to configure the queued_retry processor on every collector as it minimizes the likelihood of data being dropped due to delays in processing or issues exportering the data. This processor should be the last processor defined in the pipeline because issues that require retry are typically due to exportering.
Please refer to config.go for the config spec.
The following configuration options can be modified:
backoff_delay
(default = 5s): Time interval to wait before retryingnum_workers
(default = 10): Number of workers that dequeue batchesqueue_size
(default = 5000): Maximum number of batches kept in memory before data is droppedretry_on_failure
(default = true): Whether to retry on failure or give up and drop
Examples:
processors:
queued_retry/example:
backoff_delay: 5s
num_workers: 2
queue_size: 10
retry_on_failure: true
Refer to config.yaml for detailed examples on using the processor.
The resource processor can be used to add attributes to a given resource. Please refer to config.go for the config spec.
The following configuration options are required:
type
: Type of resource to which labels should be applied. Only applicable totype
ofhost
today.labels
: Map of key/value pairs that should be added to the definedtype
.
Examples:
processors:
resource:
type: "host"
labels: {
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
}
Refer to config.yaml for detailed examples on using the processor.
The sampling processor supports sampling spans. A couple of sampling processors are provided today and it is straight forward to add others.
The tail sampling processor samples traces based on a set of defined policies. Today, this processor only works with a single instance of the collector. Technically, trace ID aware load balancing could be used to support multiple collector instances, but this configuration has not been tested. Please refer to config.go for the config spec.
The following configuration options are required:
policies
(no default): Policies used to make a sampling decision
Multiple policies exist today and it is straight forward to add more. These include:
always_sample
: Sample all tracesnumeric_attribute
: Sample based on number attributesstring_attribute
: Sample based on string attributesrate_limiting
: Sample based on rate
The following configuration options can also be modified:
decision_wait
(default = 30s): Wait time since the first span of a trace before making a sampling decisionnum_traces
(default = 50000): Number of traces kept in memoryexpected_new_traces_per_sec
(default = 0): Expected number of new traces (helps in allocating data structures)
Examples:
processors:
tail_sampling:
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
policies:
[
{
name: test-policy-1,
type: always_sample
},
{
name: test-policy-2,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-4,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
]
Refer to config.yaml for detailed examples on using the processor.
The probabilistic sampler supports two types of sampling:
sampling.priority
semantic convention as defined by OpenTracing- Trace ID hashing
The sampling.priority
semantic convention takes priority over trace ID hashing. As the name
implies, trace ID hashing samples based on hash values determined by trace IDs. In order for
trace ID hashing to work, all collectors for a given tier (e.g. behind the same load balancer)
must have the same hash_seed
. It is also possible to leverage a different hash_seed
at
different collector tiers to support additional sampling requirements. Please refer to
config.go for the config spec.
The following configuration options can be modified:
hash_seed
(no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.sampling_percentage
(default = 0): Percentage at which traces are sampled; >= 100 samples all traces
Examples:
processors:
probabilistic_sampler:
hash_seed: 22
sampling_percentage: 15.3
Refer to config.yaml for detailed examples on using the processor.
The span processor modifies either the span name or attributes of a span based on the span name. Please refer to config.go for the config spec.
It optionally supports the ability to include/exclude spans.
The following actions are supported:
name
: Modify the name of attributes within a span
The following settings are required:
from_attributes
: The attribute value for the keys are used to create a new name in the order specified in the configuration.
The following settings can be optionally configured:
separator
: A string, which is specified will be used to split values
Note: If renaming is dependent on attributes being modified by the attributes
processor, ensure the span
processor is specified after the attributes
processor in the pipeline
specification.
span:
name:
# from_attributes represents the attribute keys to pull the values from to generate the
# new span name.
from_attributes: [<key1>, <key2>, ...]
# Separator is the string used to concatenate various parts of the span name.
separator: <value>
Example:
span:
name:
from_attributes: ["db.svc", "operation"]
separator: "::"
Refer to config.yaml for detailed examples on using the processor.
Takes a list of regular expressions to match span name against and extract
attributes from it based on subexpressions. Must be specified under the
to_attributes
section.
The following settings are required:
rules
: A list of rules to extract attribute values from span name. The values in the span name are replaced by extracted attribute names. Each rule in the list is regex pattern string. Span name is checked against the regex and if the regex matches then all named subexpressions of the regex are extracted as attributes and are added to the span. Each subexpression name becomes an attribute name and subexpression matched portion becomes the attribute value. The matched portion in the span name is replaced by extracted attribute name. If the attributes already exist in the span then they will be overwritten. The process is repeated for all rules in the order they are specified. Each subsequent rule works on the span name that is the output after processing the previous rule.break_after_match
(default = false): specifies if processing of rules should stop after the first match. If it is false rule processing will continue to be performed over the modified span name.
span/to_attributes:
name:
to_attributes:
rules:
- regexp-rule1
- regexp-rule2
- regexp-rule3
...
break_after_match: <true|false>
Example:
# Let's assume input span name is /api/v1/document/12345678/update
# Applying the following results in output span name /api/v1/document/{documentId}/update
# and will add a new attribute "documentId"="12345678" to the span.
span/to_attributes:
name:
to_attributes:
rules:
- ^\/api\/v1\/document\/(?P<documentId>.*)\/update$
Refer to config.yaml for detailed examples on using the processor.