Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 分块上传优化 #2813 #2816

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

Conversation

zzdjx
Copy link
Collaborator

@zzdjx zzdjx commented Dec 3, 2024

No description provided.

@zzdjx zzdjx requested review from owenlxu and removed request for cnlkl, felixncheng and yaoxuwan December 3, 2024 08:02
@@ -68,10 +68,13 @@ import com.tencent.bkrepo.generic.constant.HEADER_UPLOAD_ID
import com.tencent.bkrepo.generic.pojo.BatchDownloadPaths
import com.tencent.bkrepo.generic.pojo.BlockInfo
import com.tencent.bkrepo.generic.pojo.CompressedFileInfo
import com.tencent.bkrepo.generic.pojo.NewBlockInfo
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里新启用一个controller吧, 不要和原来代码放在一起了

@@ -135,6 +135,15 @@ abstract class AbstractMongoDao<E> : MongoDao<E> {
return determineMongoTemplate().updateFirst(query, update, determineCollectionName(query))
}

fun updateBlock(query: Query, update: Update, clazz: Class<E>): UpdateResult
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么这个方法放在AbstractMongoDao

@@ -133,6 +137,19 @@ abstract class AbstractArtifactRepository : ArtifactRepository {
}
}


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里新加的方法如果其他地方没有复用,建议不在AbstractArtifactRepository类中加方法

override fun updateBlock(blockNode: TBlockNode) {
with(blockNode) {
val criteria = Criteria.where(TBlockNode::id.name).`is`(id)
.and(TBlockNode::createdBy.name).`is`(createdBy)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个update没必要写这个多条件吧

@@ -167,6 +219,51 @@ class UploadService(
logger.info("User[${SecurityUtils.getPrincipal()}] complete upload [$artifactInfo] success.")
}

fun completeNewBlockUpload(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不是可以在一行

@@ -0,0 +1,22 @@
package com.tencent.bkrepo.generic.model

data class NodeAttribute(
Copy link
Collaborator

@yaoxuwan yaoxuwan Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不要复制重复代码,可以放到common-metadata

val blockCriteria = buildCriteria(projectId, repoName, fullPath, deleteTime)
val node = nodeDao.findOne(Query(blockCriteria))
if (node?.sha256 == FAKE_SHA256) {
nodeBaseService.blockNodeService.deleteBlocks(projectId, repoName, fullPath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocknode删除现在是异步的,通过消息队列发送事件到fs-server,fs-server删除。
可以整改下,在服务内部异步删除

class BlockNodeDao : HashShardingMongoDao<TBlockNode>()
class BlockNodeDao : HashShardingMongoDao<TBlockNode>(){

fun updateBlock(query: Query, update: Update): UpdateResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要自己写一个update方法,不能直接使用父类的updateFirst(query,update)吗

*
* @param context 构件上传上下文
*/
fun newUpload(context: ArtifactUploadContext) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只有generic支持的分块上传不建议方这,这里主要定义的是所有依赖源共用的接口

@@ -59,7 +59,7 @@ import java.time.LocalDateTime
/**
* 节点删除接口实现
*/
open class NodeDeleteSupport(
open class NodeDeleteSuspport(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么改类名

): Criteria {
val normalizedFullPath = PathUtils.normalizeFullPath(fullPath)
val normalizedPath = PathUtils.toPath(normalizedFullPath)
val escapedPath = PathUtils.escapeRegex(normalizedPath)
val criteria = where(TNode::projectId).isEqualTo(projectId)
.and(TNode::repoName).isEqualTo(repoName)
.and(TNode::deleted).isEqualTo(null)
.and(TNode::deleted).isEqualTo(deletedDate)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里原来是没限制deletedDate字段值的,现在没传deletedDate参数时会只查询出该字段为null的值

@RequestAttribute userId: String,
@ArtifactPathVariable artifactInfo: GenericArtifactInfo,
): Response<NodeDetail> {
uploadService.newStartBlockUpload(userId, artifactInfo)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generic中上传和fs-server的使用场景不太一样,generic偏向完整文件的上传下载,这里如果一开始就创建node会导致文件未完成上传就对用户可见

md5 = FAKE_MD5,
operator = userId,
size = getLongHeader(HEADER_FILE_SIZE).takeIf { it > 0L }
?: throw ErrorCodeException(GenericMessageCode.BLOCK_HEAD_NOT_FOUND) ,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里写在一行不好阅读

getLongHeader(HEADER_FILE_SIZE).takeIf { it > 0L } ?: throw ErrorCodeException(GenericMessageCode.BLOCK_HEAD_NOT_FOUND)

return ResponseBuilder.success()
}

@Permission(ResourceType.REPO, PermissionAction.READ)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么校验repo权限而不是node权限

@RequestAttribute userId: String,
@ArtifactPathVariable artifactInfo: GenericArtifactInfo,
): Response<Void> {
uploadService.abortNewBlockUpload(userId, artifactInfo)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果是覆盖上传的情况,中断后会把原文件也删掉

operator = userId,
size = getLongHeader(HEADER_FILE_SIZE).takeIf { it > 0L }
?: throw ErrorCodeException(GenericMessageCode.BLOCK_HEAD_NOT_FOUND) ,
overwrite = getBooleanHeader(HEADER_OVERWRITE),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

覆盖上传时,会导致一次下载可能同时下载到新文件和旧文件的数据

var offset = 0L // 用于记录当前偏移量

// 遍历块信息列表,创建对应的块节点
blockInfoList.forEach { blockInfo ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上传分块的时候有办法知道blockSize然后计算出endPos吗,这样就不用最后再批量更新一次了

## 初始化分块上传

- API: POST /generic/separate/block/{project}/{repo}/{path}
- API 名称: start_block_upload
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个文档和apidoc-user下的文档有啥区别

instance = AuditInstanceRecord(
resourceType = NODE_RESOURCE,
instanceIds = "#artifactInfo?.getArtifactFullPath()",
instanceNames = "#artifactInfo?.getArtifactFullPath()"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只有最终上传成功生成节点才需要记录审计日志

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants