Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 作业执行结果回调日志级别优化 #3097 #3210

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public class CommonMetricNames {
* 统计调用 GSE API整个过程,含反序列化
*/
public static final String ESB_GSE_API = "job.client.gse.api";

/**
* 统计任务执行结束回调HTTP的请求状态
*/
public static final String TASK_CALLBACK_HTTP_STATUS = "job.client.task.callback.http.status";
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public class CommonMetricTags {
* 蓝鲸应用 ID
*/
public static final String KEY_APP_CODE = "app_code";

/**
* HTTP请求状态
*/
public static final String KEY_HTTP_STATUS = "http_status";
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class HttpConPoolUtil {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 响应
*/
public static String post(String url, String content, String contentType) {
public static HttpResponse post(String url, String content, String contentType) {
return post(url, CHARSET, content, contentType);
}

Expand All @@ -105,15 +105,11 @@ public static String post(String url, String content, String contentType) {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 响应
*/
public static String post(String url, String charset, String content, String contentType) {
public static HttpResponse post(String url, String charset, String content, String contentType) {
try {
byte[] resp = post(url, content.getBytes(charset), contentType);
if (null == resp) {
return null;
}
return new String(resp, charset);
return post(url, content.getBytes(charset), contentType);
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand All @@ -125,7 +121,7 @@ public static String post(String url, String charset, String content, String con
* @param content 提交的内容字符串
* @return 响应
*/
public static String post(String url, String content) {
public static HttpResponse post(String url, String content) {
return post(url, CHARSET, content, "application/x-www-form-urlencoded");
}

Expand All @@ -137,7 +133,7 @@ public static String post(String url, String content) {
* @param headers 自定义请求头
* @return 响应
*/
public static String post(String url, String content, Header... headers) {
public static HttpResponse post(String url, String content, Header... headers) {
return post(url, CHARSET, content, headers);
}

Expand All @@ -150,15 +146,11 @@ public static String post(String url, String content, Header... headers) {
* @param headers 自定义请求头
* @return 响应
*/
public static String post(String url, String charset, String content, Header... headers) {
public static HttpResponse post(String url, String charset, String content, Header... headers) {
try {
byte[] resp = post(url, new ByteArrayEntity(content.getBytes(charset)), headers);
if (null == resp) {
return null;
}
return new String(resp, charset);
return post(url, new ByteArrayEntity(content.getBytes(charset)), headers);
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand All @@ -171,7 +163,7 @@ public static String post(String url, String charset, String content, Header...
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 返回字节数组
*/
public static byte[] post(String url, byte[] content, String contentType) {
public static HttpResponse post(String url, byte[] content, String contentType) {
return post(url, new ByteArrayEntity(content), contentType);
}

Expand All @@ -183,30 +175,30 @@ public static byte[] post(String url, byte[] content, String contentType) {
* @param contentType 默认传null则为"application/x-www-form-urlencoded"
* @return 返回字节数组
*/
public static byte[] post(String url, HttpEntity requestEntity, String contentType) {
public static HttpResponse post(String url, HttpEntity requestEntity, String contentType) {
return post(url, requestEntity,
new BasicHeader("Content-Type", contentType == null ? "application/x-www-form-urlencoded" : contentType));
new BasicHeader("Content-Type",
contentType == null ? "application/x-www-form-urlencoded" : contentType));
}

public static byte[] post(String url, HttpEntity requestEntity, Header... headers) {
public static HttpResponse post(String url, HttpEntity requestEntity, Header... headers) {
HttpPost post = new HttpPost(url);
// 设置为长连接,服务端判断有此参数就不关闭连接。
post.setHeader("Connection", "Keep-Alive");
post.setHeaders(headers);
post.setEntity(requestEntity);
try (CloseableHttpResponse httpResponse = HTTP_CLIENT.execute(post)) {
int statusCode = httpResponse.getStatusLine().getStatusCode();
try (CloseableHttpResponse response = HTTP_CLIENT.execute(post)) {
int statusCode = response.getStatusLine().getStatusCode();
log.info("Post url: {}, statusCode: {}", url, statusCode);
if (statusCode != HttpStatus.SC_OK) {
String errorMsg = buildErrorMessage("Post", url, statusCode,
httpResponse.getStatusLine().getReasonPhrase());
log.error(errorMsg);
throw new InternalException(errorMsg, ErrorCode.API_ERROR);
response.getStatusLine().getReasonPhrase());
log.warn(errorMsg);
}
HttpEntity entity = httpResponse.getEntity();
return EntityUtils.toByteArray(entity);
String entity = response.getEntity() != null ? EntityUtils.toString(response.getEntity()) : null;
return new HttpResponse(statusCode, entity, response.getAllHeaders());
} catch (IOException e) {
log.error("Post request fail", e);
log.warn("Post request fail", e);
throw new InternalException(e, ErrorCode.API_ERROR);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@

package com.tencent.bk.job.execute.engine.listener;

import com.tencent.bk.job.common.metrics.CommonMetricNames;
import com.tencent.bk.job.common.metrics.CommonMetricTags;
import com.tencent.bk.job.common.util.http.HttpConPoolUtil;
import com.tencent.bk.job.common.util.http.HttpResponse;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.engine.model.JobCallbackDTO;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.springframework.stereotype.Component;

import java.net.MalformedURLException;
Expand All @@ -39,40 +44,80 @@
@Component
@Slf4j
public class CallbackListener {
private final MeterRegistry meterRegistry;

public CallbackListener(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 处理回调请求
*/
public void handleMessage(JobCallbackDTO callbackDTO) {
long taskInstanceId = callbackDTO.getId();
String callbackUrl = callbackDTO.getCallbackUrl();

try {
log.info("Handle callback, taskInstanceId: {}, msg: {}", taskInstanceId, callbackDTO);
String callbackUrl = callbackDTO.getCallbackUrl();
try {
new URL(callbackUrl);
} catch (MalformedURLException var5) {
log.warn("Callback fail, bad url: {}", callbackUrl);
return;
validateUrl(callbackUrl);

HttpResponse response = callbackRequest(callbackUrl, callbackDTO);

// 回调状态码不是200,重试一次
if (response.getStatusCode() != HttpStatus.SC_OK) {
log.warn("Callback failed, retrying. taskInstanceId: {}, statusCode: {}",
taskInstanceId, response.getStatusCode());
response = callbackRequest(callbackUrl, callbackDTO);
}
log.info("Final callback {}, taskInstanceId: {}, statusCode: {}, result: {}",
response.getStatusCode() == HttpStatus.SC_OK ? "success" : "fail",
taskInstanceId,
response.getStatusCode(),
response.getEntity());
} catch (MalformedURLException e) {
log.warn("Invalid callback URL: "+callbackUrl, e);
recordCallbackMetrics("unknown");
}
}

/**
* 校验URL是否合法
*/
private void validateUrl(String callbackUrl) throws MalformedURLException {
new URL(callbackUrl);
}

/**
* 执行回调请求
*/
private HttpResponse callbackRequest(String callbackUrl, JobCallbackDTO callbackDTO) {
try {
callbackDTO.setCallbackUrl(null);
try {
// TODO 需要优化,返回application/json
try {
String rst = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
log.info("Callback success, taskInstanceId: {}, result: {}", taskInstanceId, rst);
} catch (Throwable e) { //出错重试一次
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
log.warn(errorMsg, e);
String rst = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
log.info("Retry callback success, taskInstanceId: {}, result: {}", taskInstanceId, rst);
}
} catch (Throwable e) {
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
log.warn(errorMsg, e);
}
HttpResponse response = HttpConPoolUtil.post(callbackUrl, JsonUtils.toJson(callbackDTO));
recordCallbackMetrics(response.getStatusCode());
return response;
} catch (Throwable e) {
String errorMsg = "Callback fail, taskInstanceId: " + taskInstanceId;
String errorMsg = String.format("Callback request failed, taskInstanceId: %s, url: %s",
callbackDTO.getId(),
callbackUrl);
log.warn(errorMsg, e);
recordCallbackMetrics("unknown");
return new HttpResponse(500, null, null);
}
}

/**
* 记录回调请求的监控指标
*/
private void recordCallbackMetrics(int statusCode) {
recordCallbackMetrics(String.valueOf(statusCode));
}

private void recordCallbackMetrics(String statusCode) {
meterRegistry.counter(
CommonMetricNames.TASK_CALLBACK_HTTP_STATUS,
CommonMetricTags.KEY_HTTP_STATUS,
statusCode
).increment();
}
}
Loading