zipkin
为分布式链路调用监控系统,聚合各业务系统调用延迟数据,达到链路调用监控跟踪。
如图,在复杂的调用链路中假设存在一条调用链路响应缓慢,如何定位其中延迟高的服务呢?
- 日志: 通过分析调用链路上的每个服务日志得到结果
- zipkin:使用
zipkin
的web UI
可以一眼看出延迟高的服务
如图所示,各业务系统在彼此调用时,将特定的跟踪消息传递至zipkin
,zipkin在收集到跟踪信息后将其聚合处理、存储、展示等,用户可通过web UI
方便
获得网络延迟、调用链路、系统依赖等等。
zipkin
主要涉及四个组件 collector
storage
search
web UI
Collector
接收各service传输的数据Cassandra
作为Storage
的一种,也可以是mysql等,默认存储在内存中,配置cassandra
可以参考这里Query
负责查询Storage
中存储的数据,提供简单的JSON API
获取数据,主要提供给web UI
使用Web
提供简单的web界面
执行如下命令下载jar包
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
其为一个spring boot
功能,直接运行jar
nohup java -jar zipkin.jar &
使用zipkin
涉及几个概念
-
Span
:基本工作单元,一次链路调用(可以是RPC,DB等没有特定的限制)创建一个span
,通过一个64位ID标识它,span
通过还有其他的数据,例如描述信息,时间戳,key-value对的(Annotation)tag信息,parent-id
等,其中parent-id
可以表示span
调用链路来源,通俗的理解span
就是一次请求信息 -
Trace
:类似于树结构的Span
集合,表示一条调用链路,存在唯一标识 -
Annotation
: 注解,用来记录请求特定事件相关信息(例如时间),通常包含四个注解信息cs - Client Start,表示客户端发起请求
sr - Server Receive,表示服务端收到请求
ss - Server Send,表示服务端完成处理,并将结果发送给客户端
cr - Client Received,表示客户端获取到服务端返回信息
-
BinaryAnnotation
:提供一些额外信息,一般已key-value对出现
上图表示一请求链路,一条链路通过Trace Id
唯一标识,Span
标识发起的请求信息,各span
通过parent id
关联起来,如图
完成链路调用的记录后,如何来计算调用的延迟呢,这就需要利用Annotation
信息
sr-cs 得到请求发出延迟
ss-sr 得到服务端处理延迟
cr-cs 得到真个链路完成延迟
作为各调用链路,只需要负责将指定格式的数据发送给zipkin
即可,利用brave可快捷完成操作。
首先导入jar包pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.6.RELEASE</version>
</parent>
<!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-core -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-core</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-http -->
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-spancollector-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-web-servlet-filter</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-okhttp</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
</dependencies>
利用spring boot
创建工程
Application.java
package com.lkl.zipkin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
* Created by liaokailin on 16/7/27.
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(Application.class);
app.run(args);
}
}
建立controller
对外提供服务
HomeController.java
RestController
@RequestMapping("/")
public class HomeController {
@Autowired
private OkHttpClient client;
private Random random = new Random();
@RequestMapping("start")
public String start() throws InterruptedException, IOException {
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:9090/foo").get().build();
Response response = client.newCall(request).execute();
return " [service1 sleep " + sleep+" ms]" + response.body().toString();
}
HomeController
中利用OkHttpClient
调用发起http请求。在每次发起请求时则需要通过brave
记录Span
信息,并异步传递给zipkin
作为被调用方(服务端)也同样需要完成以上操作.
ZipkinConfig.java
package com.lkl.zipkin.config;
import com.github.kristofa.brave.Brave;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.http.DefaultSpanNameProvider;
import com.github.kristofa.brave.http.HttpSpanCollector;
import com.github.kristofa.brave.okhttp.BraveOkHttpRequestResponseInterceptor;
import com.github.kristofa.brave.servlet.BraveServletFilter;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by liaokailin on 16/7/27.
*/
@Configuration
public class ZipkinConfig {
@Autowired
private ZipkinProperties properties;
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName()); //指定state
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
@Bean
public BraveServletFilter braveServletFilter(Brave brave){
BraveServletFilter filter = new BraveServletFilter(brave.serverRequestInterceptor(),brave.serverResponseInterceptor(),new DefaultSpanNameProvider());
return filter;
}
@Bean
public OkHttpClient okHttpClient(Brave brave){
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new BraveOkHttpRequestResponseInterceptor(brave.clientRequestInterceptor(), brave.clientResponseInterceptor(), new DefaultSpanNameProvider()))
.build();
return client;
}
}
-
SpanCollector
配置收集器 -
Brave
各工具类的封装,其中builder.traceSampler(Sampler.ALWAYS_SAMPLE)
设置采样比率,0-1之间的百分比 -
BraveServletFilter
作为拦截器,需要serverRequestInterceptor
,serverResponseInterceptor
分别完成sr
和ss
操作 -
OkHttpClient
添加拦截器,需要clientRequestInterceptor
,clientResponseInterceptor
分别完成cs
和cr
操作,该功能由 brave中的brave-okhttp
模块提供,同样的道理如果需要记录数据库的延迟只要在数据库操作前后完成cs
和cr
即可,当然brave提供其封装。
以上还缺少一个配置信息ZipkinProperties.java
package com.lkl.zipkin.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Created by liaokailin on 16/7/28.
*/
@Configuration
@ConfigurationProperties(prefix = "com.zipkin")
public class ZipkinProperties {
private String serviceName;
private String url;
private int connectTimeout;
private int readTimeout;
private int flushInterval;
private boolean compressionEnabled;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
public int getFlushInterval() {
return flushInterval;
}
public void setFlushInterval(int flushInterval) {
this.flushInterval = flushInterval;
}
public boolean isCompressionEnabled() {
return compressionEnabled;
}
public void setCompressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
则可以在配置文件application.properties
中配置相关信息
com.zipkin.serviceName=service1
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=8080
那么其中的service1
即完成,同样的道理,修改配置文件(调整com.zipkin.serviceName
,以及server.port
)以及controller
对应的方法构造若干服务
service1
中访问http://localhost:8080/start
需要访问http://localhost:9090/foo
,则构造server2
提供该方法
server2
配置
com.zipkin.serviceName=service2
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=9090
controller
方法
@RequestMapping("foo")
public String foo() throws InterruptedException, IOException {
Random random = new Random();
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:9091/bar").get().build(); //service3
Response response = client.newCall(request).execute();
String result = response.body().string();
request = new Request.Builder().url("http://localhost:9092/tar").get().build(); //service4
response = client.newCall(request).execute();
result += response.body().string();
return " [service2 sleep " + sleep+" ms]" + result;
}
在server2
中调用server3
和server4
中的方法
方法分别为
@RequestMapping("bar")
public String bar() throws InterruptedException, IOException { //service3 method
Random random = new Random();
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service3 sleep " + sleep+" ms]";
}
@RequestMapping("tar")
public String tar() throws InterruptedException, IOException { //service4 method
Random random = new Random();
int sleep= random.nextInt(1000);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service4 sleep " + sleep+" ms]";
}
将工程修改后编译成jar
形式
执行
nohup java -jar server4.jar &
nohup java -jar server3.jar &
nohup java -jar server2.jar &
nohup java -jar server1.jar &
访问http://localhost:8080/start
后查看zipkin
的web UI
点击条目可以查看具体的延迟信息
以上完成了基本的操作,下面将从源码角度来看下brave
的实现
首先从SpanCollector
来入手
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
从名称上看HttpSpanCollector
是基于http
的span
收集器,因此超时配置是必须的,默认给出的超时时间较长,flushInterval
表示span
的传递
间隔,实际为定时任务执行的间隔时间.在HttpSpanCollector
中覆写了父类方法sendSpans
@Override
protected void sendSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout());
connection.setReadTimeout(config.readTimeout());
connection.setRequestMethod("POST");
connection.addRequestProperty("Content-Type", "application/json");
if (config.compressionEnabled()) {
connection.addRequestProperty("Content-Encoding", "gzip");
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
compressor.write(json);
}
json = gzipped.toByteArray();
}
connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(json.length);
connection.getOutputStream().write(json);
try (InputStream in = connection.getInputStream()) {
while (in.read() != -1) ; // skip
} catch (IOException e) {
try (InputStream err = connection.getErrorStream()) {
if (err != null) { // possible, if the connection was dropped
while (err.read() != -1) ; // skip
}
}
throw e;
}
}
}
可以看出最终span
信息是通过HttpURLConnection
实现的,同样道理就可以推理brave
对brave-spring-resttemplate-interceptors
模块的实现,
只是换了一种http
封装。
Brave
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName()); //指定state
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
Brave
类包装了各种工具类
public Brave build() {
return new Brave(this);
}
创建一个Brave
private Brave(Builder builder) {
serverTracer = ServerTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
clientTracer = ClientTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
localTracer = LocalTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state))
.traceSampler(builder.sampler).build();
serverRequestInterceptor = new ServerRequestInterceptor(serverTracer);
serverResponseInterceptor = new ServerResponseInterceptor(serverTracer);
clientRequestInterceptor = new ClientRequestInterceptor(clientTracer);
clientResponseInterceptor = new ClientResponseInterceptor(clientTracer);
serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state));
serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state);
clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state);
}
封装了*Tracer
,*Interceptor
,*Binder
等
其中 serverTracer
当服务作为服务端
时处理span
信息,clientTracer
当服务作为客户端
时处理span
信息
Filter
BraveServletFilter
是http
模块提供的拦截器功能,传递serverRequestInterceptor
,serverResponseInterceptor
,spanNameProvider
等参数
其中spanNameProvider
表示如何处理span
的名称,默认使用method
名称,spring boot
中申明的filter bean
默认拦截所有请求
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
String alreadyFilteredAttributeName = getAlreadyFilteredAttributeName();
boolean hasAlreadyFilteredAttribute = request.getAttribute(alreadyFilteredAttributeName) != null;
if (hasAlreadyFilteredAttribute) {
// Proceed without invoking this filter...
filterChain.doFilter(request, response);
} else {
final StatusExposingServletResponse statusExposingServletResponse = new StatusExposingServletResponse((HttpServletResponse) response);
requestInterceptor.handle(new HttpServerRequestAdapter(new ServletHttpServerRequest((HttpServletRequest) request), spanNameProvider));
try {
filterChain.doFilter(request, statusExposingServletResponse);
} finally {
responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {
@Override
public int getHttpStatusCode() {
return statusExposingServletResponse.getStatus();
}
}));
}
}
}
首先来看requestInterceptor.handle
方法,
public void handle(ServerRequestAdapter adapter) {
serverTracer.clearCurrentSpan();
final TraceData traceData = adapter.getTraceData();
Boolean sample = traceData.getSample();
if (sample != null && Boolean.FALSE.equals(sample)) {
serverTracer.setStateNoTracing();
LOGGER.fine("Received indication that we should NOT trace.");
} else {
if (traceData.getSpanId() != null) {
LOGGER.fine("Received span information as part of request.");
SpanId spanId = traceData.getSpanId();
serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
spanId.nullableParentId(), adapter.getSpanName());
} else {
LOGGER.fine("Received no span state.");
serverTracer.setStateUnknown(adapter.getSpanName());
}
serverTracer.setServerReceived();
for(KeyValueAnnotation annotation : adapter.requestAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
}
}
其中serverTracer.clearCurrentSpan()
清除当前线程上的span
信息,调用ThreadLocalServerClientAndLocalSpanState
中的
@Override
public void setCurrentServerSpan(final ServerSpan span) {
if (span == null) {
currentServerSpan.remove();
} else {
currentServerSpan.set(span);
}
}
currentServerSpan
为ThreadLocal
对象
private final static ThreadLocal<ServerSpan> currentServerSpan = new ThreadLocal<ServerSpan>() {
回到ServerRequestInterceptor#handle()
方法中final TraceData traceData = adapter.getTraceData()
@Override
public TraceData getTraceData() {
final String sampled = serverRequest.getHttpHeaderValue(BraveHttpHeaders.Sampled.getName());
if (sampled != null) {
if (sampled.equals("0") || sampled.toLowerCase().equals("false")) {
return TraceData.builder().sample(false).build();
} else {
final String parentSpanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.ParentSpanId.getName());
final String traceId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.TraceId.getName());
final String spanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.SpanId.getName());
if (traceId != null && spanId != null) {
SpanId span = getSpanId(traceId, spanId, parentSpanId);
return TraceData.builder().sample(true).spanId(span).build();
}
}
}
return TraceData.builder().build();
}
其中 SpanId span = getSpanId(traceId, spanId, parentSpanId)
将构造一个SpanId
对象
private SpanId getSpanId(String traceId, String spanId, String parentSpanId) {
return SpanId.builder()
.traceId(convertToLong(traceId))
.spanId(convertToLong(spanId))
.parentId(parentSpanId == null ? null : convertToLong(parentSpanId)).build();
}
将traceId
,spanId
,parentId
关联起来,其中设置parentId
方法为
public Builder parentId(@Nullable Long parentId) {
if (parentId == null) {
this.flags |= FLAG_IS_ROOT;
} else {
this.flags &= ~FLAG_IS_ROOT;
}
this.parentId = parentId;
return this;
}
如果parentId
为空为根节点,则执行this.flags |= FLAG_IS_ROOT
,因此后续在判断节点是否为根节点时,只需要执行(flags & FLAG_IS_ROOT) == FLAG_IS_ROOT
即可.
构造完SpanId
后看
serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
spanId.nullableParentId(), adapter.getSpanName());
设置当前Span
public void setStateCurrentTrace(long traceId, long spanId, @Nullable Long parentSpanId, @Nullable String name) {
checkNotBlank(name, "Null or blank span name");
spanAndEndpoint().state().setCurrentServerSpan(
ServerSpan.create(traceId, spanId, parentSpanId, name));
}
ServerSpan.create
创建Span
信息
static ServerSpan create(long traceId, long spanId, @Nullable Long parentSpanId, String name) {
Span span = new Span();
span.setTrace_id(traceId);
span.setId(spanId);
if (parentSpanId != null) {
span.setParent_id(parentSpanId);
}
span.setName(name);
return create(span, true);
}
构造了一个包含Span
信息的AutoValue_ServerSpan
对象
通过setCurrentServerSpan
设置到当前线程上
继续看serverTracer.setServerReceived()
方法
public void setServerReceived() {
submitStartAnnotation(zipkinCoreConstants.SERVER_RECV);
}
为当前请求设置了server received event
void submitStartAnnotation(String annotationName) {
Span span = spanAndEndpoint().span();
if (span != null) {
Annotation annotation = Annotation.create(
currentTimeMicroseconds(),
annotationName,
spanAndEndpoint().endpoint()
);
synchronized (span) {
span.setTimestamp(annotation.timestamp);
span.addToAnnotations(annotation);
}
}
}
在这里为Span
信息设置了Annotation
信息,后续的
for(KeyValueAnnotation annotation : adapter.requestAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
设置了BinaryAnnotation
信息,adapter.requestAnnotations()
在构造HttpServerRequestAdapter
时已完成
@Override
public Collection<KeyValueAnnotation> requestAnnotations() {
KeyValueAnnotation uriAnnotation = KeyValueAnnotation.create(
TraceKeys.HTTP_URL, serverRequest.getUri().toString());
return Collections.singleton(uriAnnotation);
}
以上将Span
信息(包括sr)存储在当前线程中,接下来继续看BraveServletFilter#doFilter
方法的finally
部分
responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {
@Override //获取http状态码
public int getHttpStatusCode() {
return statusExposingServletResponse.getStatus();
}
}));
handle
方法
public void handle(ServerResponseAdapter adapter) {
// We can submit this in any case. When server state is not set or
// we should not trace this request nothing will happen.
LOGGER.fine("Sending server send.");
try {
for(KeyValueAnnotation annotation : adapter.responseAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
serverTracer.setServerSend();
} finally {
serverTracer.clearCurrentSpan();
}
}
首先配置BinaryAnnotation
信息,然后执行serverTracer.setServerSend
,在finally
中清除当前线程中的Span
信息(不管前面是否清楚成功,最终都将执行该不走),ThreadLocal
中的数据要做到有始有终
看serverTracer.setServerSend()
public void setServerSend() {
if (submitEndAnnotation(zipkinCoreConstants.SERVER_SEND, spanCollector())) {
spanAndEndpoint().state().setCurrentServerSpan(null);
}
}
终于看到spanCollector
收集器了,说明下面将看是收集Span
信息,这里为ss
注解
boolean submitEndAnnotation(String annotationName, SpanCollector spanCollector) {
Span span = spanAndEndpoint().span();
if (span == null) {
return false;
}
Annotation annotation = Annotation.create(
currentTimeMicroseconds(),
annotationName,
spanAndEndpoint().endpoint()
);
span.addToAnnotations(annotation);
if (span.getTimestamp() != null) {
span.setDuration(annotation.timestamp - span.getTimestamp());
}
spanCollector.collect(span);
return true;
}
首先获取当前线程中的Span
信息,然后处理注解信息,通过annotation.timestamp - span.getTimestamp()
计算延迟,
调用spanCollector.collect(span)
进行收集Span
信息,那么Span
信息是同步收集的吗?肯定不是的,接着看
调用spanCollector.collect(span)
则执行FlushingSpanCollector
中的collect
方法
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}
首先进行的是metrics
统计信息,可以自定义该SpanCollectorMetricsHandler
信息收集各指标信息,利用如grafana
等展示信息
pending.offer(span)
将span
信息存储在BlockingQueue
中,然后通过定时任务去取出阻塞队列中的值,偷偷摸摸的上传span
信息
定时任务利用了Flusher
类来执行,在构造FlushingSpanCollector
时构造了Flusher
类
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}
@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}
创建了一个核心线程数为1的线程池,每间隔flushInterval
秒执行一次Span
信息上传,执行flush
方法
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;
int spanCount = drained.size();
try {
reportSpans(drained);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
}
}
首先将阻塞队列中的值全部取出存如集合中,最后调用reportSpans(List<Span> drained)
抽象方法,该方法在AbstractSpanCollector
得到覆写
@Override
protected void reportSpans(List<Span> drained) throws IOException {
byte[] encoded = codec.writeSpans(drained);
sendSpans(encoded);
}
转换成字节流后调用sendSpans
抽象方法发送Span
信息,此时就回到一开始说的HttpSpanCollector
通过HttpURLConnection
实现的sendSpans
方法。
more about is here