Skip to content

Commit

Permalink
Merge branch 'master' into master_issue2765
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliaozhong committed Dec 26, 2024
2 parents 8cbf6bc + c16a9df commit 72c1a96
Show file tree
Hide file tree
Showing 38 changed files with 555 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class ArtifactMetricsConfiguration {

@Bean
fun artifactMetricsExporter(
customMetricsExporter: CustomMetricsExporter? = null
customMetricsExporter: CustomMetricsExporter? = null,
artifactMetricsProperties: ArtifactMetricsProperties,
): ArtifactMetricsExporter {
return ArtifactMetricsExporter(customMetricsExporter)
return ArtifactMetricsExporter(customMetricsExporter, artifactMetricsProperties.allowUnknownProjectExport)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,9 @@ data class ArtifactMetricsProperties(
/**
* 超过该大小的文件未命中缓存时将计入大文件缓存未命中监控数据
*/
var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L)
var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L),
/**
* 允许上报未知项目信息
* */
var allowUnknownProjectExport: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import org.slf4j.LoggerFactory
import java.util.Queue

class ArtifactMetricsExporter(
private val customMetricsExporter: CustomMetricsExporter? = null
private val customMetricsExporter: CustomMetricsExporter? = null,
private val allowUnknownProjectExport: Boolean,
) {

fun export(queue: Queue<ArtifactTransferRecord>) {
Expand All @@ -51,7 +52,11 @@ class ArtifactMetricsExporter(
val count: Int = queue.size
for (i in 0 until count) {
val item = queue.poll()
if (item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) continue
if ((item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) &&
!allowUnknownProjectExport
) {
continue
}
val labels = convertRecordToMap(item)
val metrics = TypeOfMetricsItem.ARTIFACT_TRANSFER_RATE
val metricItem = MetricsItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ open class DevXAccessInterceptor(private val devXProperties: DevXProperties) : H
.refreshAfterWrite(devXProperties.cacheExpireTime)
.build(object : CacheLoader<String, Set<String>>() {
override fun load(key: String): Set<String> {
return listIpFromProject(key) + listCvmIpFromProject(key) + listIpFromProps(key)
return listIpFromProject(key) +
listCvmIpFromProject(key) +
listIpFromProps(key) +
listIpFromProjects(key)
}

override fun reload(key: String, oldValue: Set<String>): ListenableFuture<Set<String>> {
Expand Down Expand Up @@ -150,6 +153,16 @@ open class DevXAccessInterceptor(private val devXProperties: DevXProperties) : H
return ips
}

private fun listIpFromProjects(projectId: String): Set<String>{
val projectIdList = devXProperties.projectWhiteList[projectId] ?: emptySet()
val ips = HashSet<String>()
projectIdList.forEach {
ips.addAll(listIpFromProject(it))
ips.addAll(listCvmIpFromProject(it))
}
return ips
}

private fun listIpFromProps(projectId: String) = devXProperties.projectCvmWhiteList[projectId] ?: emptySet()

private fun listCvmIpFromProject(projectId: String): Set<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ data class DevXProperties(
* key 为项目ip, value为CVM配置
*/
var projectCvmWhiteList: Map<String, Set<String>> = emptyMap(),
/**
* 配置可以被访问的项目
* key 为项目id, value为可被访问的项目id
*/
var projectWhiteList: Map<String, Set<String>> = emptyMap(),
/**
* 可以从任意来源访问的用户
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import java.io.File
import java.io.FileNotFoundException
import java.io.OutputStream
import java.nio.file.DirectoryIteratorException
import java.nio.file.DirectoryNotEmptyException
import java.nio.file.FileSystemException
import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -94,6 +95,9 @@ fun Path.delete(): Boolean {
} else {
throw e
}
} catch (e: DirectoryNotEmptyException) {
logger.info("delete dir[$this] failed: ${e.message}")
return false
}
// 目录还存在内容
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ import org.springframework.stereotype.Repository

@Repository
class BlobRefRepository : SimpleMongoDao<TDdcBlobRef>() {
fun addRefToBlob(projectId: String, repoName: String, bucket: String, refKey: String, blobIds: Set<String>) {

/**
* 添加blob与ref关系,返回插入成功的blobId列表
*/
fun addRefToBlob(
projectId: String,
repoName: String,
bucket: String,
refKey: String,
blobIds: Set<String>
): Set<String> {
val addedBlobIds = HashSet<String>()
if (blobIds.size > DEFAULT_BLOB_SIZE_LIMIT) {
val ref = "$projectId/$repoName/${buildRef(bucket, refKey)}"
logger.error("blobs of ref[$ref] exceed size limit, size[${blobIds.size}]]")
Expand All @@ -27,9 +38,11 @@ class BlobRefRepository : SimpleMongoDao<TDdcBlobRef>() {
ref = buildRef(bucket, refKey)
)
)
addedBlobIds.add(it)
} catch (ignore: DuplicateKeyException) {
}
}
return addedBlobIds
}

fun removeRefFromBlob(projectId: String, repoName: String, bucket: String, refKey: String): List<TDdcBlobRef> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,20 @@ class BlobService(
}

fun addRefToBlobs(ref: Reference, blobIds: Set<String>) {
blobRefRepository.addRefToBlob(ref.projectId, ref.repoName, ref.bucket, ref.key.toString(), blobIds)
blobRepository.incRefCount(ref.projectId, ref.repoName, blobIds)
with(ref) {
val addedBlobIds = blobRefRepository.addRefToBlob(projectId, repoName, bucket, key.toString(), blobIds)
if (addedBlobIds.isNotEmpty()) {
blobRepository.incRefCount(projectId, repoName, addedBlobIds)
}
}
}

fun removeRefFromBlobs(projectId: String, repoName: String, bucket: String, key: String) {
val blobIds = HashSet<String>()
blobRefRepository.removeRefFromBlob(projectId, repoName, bucket, key).mapTo(blobIds) { it.blobId }
blobRepository.incRefCount(projectId, repoName, blobIds, -1L)
if (blobIds.isNotEmpty()) {
blobRepository.incRefCount(projectId, repoName, blobIds, -1L)
}
// 兼容旧逻辑,所有blob的references字段为空后可以移除该逻辑
blobRepository.removeRefFromBlob(projectId, repoName, bucket, key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ data class ClientCreateRequest(
val version: String,
val os: String,
val arch: String,
val ip: String?,
)
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ClientService(

suspend fun createClient(request: ClientCreateRequest): ClientDetail {
with(request) {
val ip = ReactiveRequestContextHolder.getClientAddress()
val ip = if (this.ip.isNullOrBlank()) ReactiveRequestContextHolder.getClientAddress() else this.ip
val query = Query(
Criteria.where(TClient::projectId.name).isEqualTo(projectId)
.and(TClient::repoName.name).isEqualTo(repoName)
Expand Down Expand Up @@ -277,7 +277,8 @@ class ClientService(
mountPoint = client.mountPoint,
version = client.version,
os = client.os,
arch = client.arch
arch = client.arch,
ip = client.ip
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toMono
import reactor.netty.http.client.HttpClient
Expand All @@ -62,6 +63,7 @@ import reactor.util.retry.RetryBackoffSpec
import java.net.URLDecoder
import java.time.Duration
import java.util.concurrent.Executors
import java.util.stream.Collectors

class DevxWorkspaceUtils(
devXProperties: DevXProperties
Expand Down Expand Up @@ -136,8 +138,12 @@ class DevxWorkspaceUtils(
}

private fun listIp(projectId: String): Mono<Set<String>> {
return Mono.zip(listIpFromProject(projectId), listIpFromProps(projectId), listCvmIpFromProject(projectId))
.map { it.t1 + it.t2 + it.t3 }
return Mono.zip(
listIpFromProject(projectId),
listIpFromProps(projectId),
listCvmIpFromProject(projectId),
listIpFromProjects(projectId))
.map { it.t1 + it.t2 + it.t3 + it.t4}
}

private fun listIpFromProject(projectId: String): Mono<Set<String>> {
Expand Down Expand Up @@ -177,6 +183,19 @@ class DevxWorkspaceUtils(
}
}

private fun listIpFromProjects(projectId: String): Mono<Set<String>> {
val projectIdList = devXProperties.projectWhiteList[projectId] ?: emptySet()
return Flux.fromIterable(projectIdList)
.flatMap { id ->
Flux.merge(
listIpFromProject(id),
listCvmIpFromProject(id)
)
}
.flatMapIterable { it }
.collect(Collectors.toSet())
}

suspend fun validateToken(devxToken: String): Mono<DevxTokenInfo> {
val token = withContext(Dispatchers.IO) {
URLDecoder.decode(devxToken, Charsets.UTF_8.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const val SHARDING_COUNT = 256
*/
const val BATCH_SIZE = 1000

/**
* 最大并发线程限制数
*/
const val CONCURRENT_THREAD_LIMIT = 1024

/**
* 数据库字段
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
private val permitsPerSecond: Double
get() = properties.permitsPerSecond

private val concurrentThreadLimit: Int get() = properties.concurrentThreadLimit

@Autowired
private lateinit var lockingTaskExecutor: LockingTaskExecutor

Expand Down Expand Up @@ -195,7 +197,7 @@ abstract class MongoDbBatchJob<Entity : Any, Context : JobContext>(
hasAsyncTask = true
tasks.forEach {
val task = IdentityTask(taskId) { block(it) }
executor.executeWithId(task, produce, permitsPerSecond)
executor.executeWithId(task, concurrentThreadLimit, produce, permitsPerSecond)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.context

import com.google.common.hash.BloomFilter
import com.tencent.bkrepo.job.batch.base.JobContext

@Suppress("UnstableApiUsage")
class DdcBlobRefCountCorrectJobContext(
val bf: BloomFilter<CharSequence>,
) : JobContext()
Loading

0 comments on commit 72c1a96

Please sign in to comment.