From 55d704d91b7357b7e19f8910f4e1e68b19c4549f Mon Sep 17 00:00:00 2001 From: zzdjx <116262724+zzdjx@users.noreply.github.com> Date: Mon, 13 Jan 2025 16:19:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=9D=97=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E4=BC=98=E5=8C=96=20#2813?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 分块上传优化 #2813 * fix: 分块上传优化-问题修复 #2813 * fix: 分块上传优化-代码调整+单测 #2813 * fix: 分块上传优化-代码调整 #2813 * fix: NodeAttribute抽离公共放到metadata #2813 * fix: 新增Generic下Block事件监听 #2813 * fix: 代码规范 #2813 * fix: 错误修复 #2813 * feat: 逻辑重构 #2813 * fix: 代码规范 #2813 * fix: 代码格式及部分逻辑优化 #2813 * fix: 修复isSeparateUpload uploadId null异常 #2813 * feat: 分块上传并发场景下优化 #2813 * fix: 代码优化 #2813 * fix: test方法 #2813 * fix: 修复问题 #2813 * fix: 修复问题 #2813 * fix: 文档更新 #2813 * fix: 增加部分log #2813 * fix: 调试日志 #2813 * fix: 调试日志 #2813 * fix: 问题修复 #2813 * fix: 问题修复 #2813 * feat: blocks 新增过期逻辑 #2813 * feat: 过期时间改为配置项 #2813 * fix: 优化一下细节问题 #2813 * fix: 优化一下细节问题 #2813 * fix: 优化一下细节问题 #2813 --- docs/apidoc-user/generic/SeparateBlock.md | 302 ++++++++++++++++++ docs/apidoc/generic/SeparateBlock.md | 302 ++++++++++++++++++ .../common/metadata/constant/Constant.kt | 1 + .../common/metadata}/model/NodeAttribute.kt | 2 +- .../common/metadata/model/TBlockNode.kt | 4 +- .../service/blocknode/BlockNodeService.kt | 24 +- .../blocknode/impl/BlockNodeServiceImpl.kt | 33 +- .../service/node/NodeDeleteOperation.kt | 12 + .../service/node/impl/NodeBaseService.kt | 68 +++- .../service/node/impl/NodeDeleteSupport.kt | 36 ++- .../service/node/impl/NodeServiceImpl.kt | 10 + .../impl/center/CenterNodeDeleteSupport.kt | 49 ++- .../node/impl/center/CenterNodeServiceImpl.kt | 53 +-- .../node/impl/edge/EdgeNodeServiceImpl.kt | 10 + .../metadata/util/BlockNodeQueryHelper.kt | 16 + .../storage/config/ReceiveProperties.kt | 7 + .../bkrepo/fs/server/constant/Constants.kt | 2 + .../server/handler/NodeOperationsHandler.kt | 6 +- .../fs/server/service/FileOperationService.kt | 2 +- .../generic/artifact/GenericArtifactInfo.kt | 1 + .../bkrepo/generic/constant/Constants.kt | 7 + .../generic/constant/GenericMessageCode.kt | 5 +- .../bkrepo/generic/pojo/SeparateBlockInfo.kt | 17 + .../generic/biz-generic/build.gradle.kts | 3 + .../artifact/GenericLocalRepository.kt | 109 +++++-- .../generic/controller/GenericController.kt | 1 + .../controller/SeparateBlockController.kt | 123 +++++++ .../listener/BuildDeletedEventListener.kt | 44 +++ .../bkrepo/generic/service/UploadService.kt | 175 +++++++++- .../resources/i18n/messages_en.properties | 5 +- .../resources/i18n/messages_zh_CN.properties | 7 +- .../resources/i18n/messages_zh_TW.properties | 7 +- .../generic/SeparateTestConfiguration.kt | 8 + .../bkrepo/generic/SeparateTestConstants.kt | 8 + .../generic/service/BlockNodeServiceTest.kt | 261 +++++++++++++++ .../task/other/ExpiredBlockNodeMarkupJob.kt | 86 +++++ .../ExpiredBlockNodeMarkupJobProperties.kt | 8 + .../pojo/node/service/NodeCreateRequest.kt | 4 +- 38 files changed, 1731 insertions(+), 87 deletions(-) create mode 100644 docs/apidoc-user/generic/SeparateBlock.md create mode 100644 docs/apidoc/generic/SeparateBlock.md rename src/backend/{fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server => common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata}/model/NodeAttribute.kt (90%) create mode 100644 src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/pojo/SeparateBlockInfo.kt create mode 100644 src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/SeparateBlockController.kt create mode 100644 src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/listener/BuildDeletedEventListener.kt create mode 100644 src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConfiguration.kt create mode 100644 src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConstants.kt create mode 100644 src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/service/BlockNodeServiceTest.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/other/ExpiredBlockNodeMarkupJob.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ExpiredBlockNodeMarkupJobProperties.kt diff --git a/docs/apidoc-user/generic/SeparateBlock.md b/docs/apidoc-user/generic/SeparateBlock.md new file mode 100644 index 0000000000..428ed3ce58 --- /dev/null +++ b/docs/apidoc-user/generic/SeparateBlock.md @@ -0,0 +1,302 @@ +# 通用制品仓库分块文件操作指南 + +[TOC] + +## 一、初始化分块上传 + +- **接口地址**: `POST /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `start_block_upload` +- **功能说明**: + - 中文:初始化分块上传 + - English: Start block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- |-------| ------------------------- | --------------------------------- | + | X-BKREPO-OVERWRITE | boolean | 否 | false | 是否覆盖已存在文件 | Overwrite existing file | + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": { + "uploadId": "8be31384f82a45b0aafb6c6add29e94f/xxxxxxxx", + "expireSeconds": 43200 + }, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | --------------- | ------- | ------------------------------- | ------------------------------------ | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | object | 返回数据 | Response data | + | ├── uploadId | string | 分块上传 ID | Block upload ID | + | ├── expireSeconds | long | 上传 ID 过期时间,单位:秒 | Upload ID expiration in seconds | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 二、上传分块文件 + +- **接口地址**: `PUT /generic/{project}/{repo}/{path}` +- **接口名称**: `block_upload` +- **功能说明**: + - 中文:分块上传通用制品文件 + - English: Upload generic artifact file block + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- |-----| --------------- | ------------------------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + | X-BKREPO-OFFSET | long | 是 | 无 | 分块偏移量,起始值为 0 | Block offset (starting from 0) | + | X-BKREPO-SHA256 | string | 否 | 无 | 分块文件的 SHA256 校验值 | SHA256 checksum of the block | + | X-BKREPO-MD5 | string | 否 | 无 | 分块文件的 MD5 校验值 | MD5 checksum of the block | + | UPLOAD-TYPE | string | 是 | 无 | 上传类型,值为 `SEPARATE-UPLOAD` | Upload type (`SEPARATE-UPLOAD`) | + +- **请求体** + + - 文件流(二进制数据) + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 三、完成分块上传 + +- **接口地址**: `PUT /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `complete_block_upload` +- **功能说明**: + - 中文:完成分块上传 + - English: Complete block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- | ------ | ------------------------------- | ---------------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + | X-BKREPO-SIZE | long | 是 | 0 | 文件总大小 | Total size of the file | + | X-BKREPO-OVERWRITE | boolean | 否 | false | 是否覆盖已存在文件 | Overwrite existing file| + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": "" + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 四、终止(取消)分块上传 + +- **接口地址**: `DELETE /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `abort_block_upload` +- **功能说明**: + - 中文:终止(取消)分块上传 + - English: Abort block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------ | ---- | ------ | --------------------- | ---------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 五、查询已上传的分块列表 + +- **接口地址**: `GET /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `list_uploaded_blocks` +- **功能说明**: + - 中文:查询已上传的分块列表 + - English: List uploaded blocks + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------ | ---- | ------ | --------------------- | ---------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": [ + { + "size": 10240, + "sha256": "abc123def456...", + "startPos": 0, + "uploadId": "1.0" + }, + { + "size": 10240, + "sha256": "def456ghi789...", + "startPos": 10240, + "uploadId": "1.0" + } + ], + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | array | 分块信息列表 | List of block information | + | traceId | string | 请求跟踪 ID | Trace ID | + +- **分块信息字段说明** + + | 字段 | 类型 | 描述 | Description | + | -------- | ------ | ------------------ | ------------------------------ | + | size | long | 分块大小(字节) | Block size (in bytes) | + | sha256 | string | 分块的 SHA256 值 | SHA256 checksum of the block | + | startPos | long | 分块起始位置 | Block start position | + | uploadId | string | 分块上传 ID | Block upload ID | + +--- + +以上是优化后的 Markdown 格式的通用制品仓库分块文件操作指南。主要改进了以下方面: + +- **标题和章节编号**:增加了明确的章节编号,改善了文档结构,方便阅读和引用。 +- **表格格式**:修正了表格的对齐和格式,使其在 Markdown 渲染时显示正确。 +- **字段说明**:对响应参数中的嵌套字段使用了更清晰的表示方式,便于理解。 +- **一致性**:统一了字段描述、命名和表格格式,保持全篇文档风格一致。 +- **示例数据**:在示例的 JSON 响应中,提供了更贴近实际的示例数据,帮助用户更直观地理解接口返回内容。 + +希望以上优化能够帮助您更好地使用和理解该操作指南。如有任何疑问,欢迎随时提问! \ No newline at end of file diff --git a/docs/apidoc/generic/SeparateBlock.md b/docs/apidoc/generic/SeparateBlock.md new file mode 100644 index 0000000000..428ed3ce58 --- /dev/null +++ b/docs/apidoc/generic/SeparateBlock.md @@ -0,0 +1,302 @@ +# 通用制品仓库分块文件操作指南 + +[TOC] + +## 一、初始化分块上传 + +- **接口地址**: `POST /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `start_block_upload` +- **功能说明**: + - 中文:初始化分块上传 + - English: Start block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- |-------| ------------------------- | --------------------------------- | + | X-BKREPO-OVERWRITE | boolean | 否 | false | 是否覆盖已存在文件 | Overwrite existing file | + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": { + "uploadId": "8be31384f82a45b0aafb6c6add29e94f/xxxxxxxx", + "expireSeconds": 43200 + }, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | --------------- | ------- | ------------------------------- | ------------------------------------ | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | object | 返回数据 | Response data | + | ├── uploadId | string | 分块上传 ID | Block upload ID | + | ├── expireSeconds | long | 上传 ID 过期时间,单位:秒 | Upload ID expiration in seconds | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 二、上传分块文件 + +- **接口地址**: `PUT /generic/{project}/{repo}/{path}` +- **接口名称**: `block_upload` +- **功能说明**: + - 中文:分块上传通用制品文件 + - English: Upload generic artifact file block + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- |-----| --------------- | ------------------------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + | X-BKREPO-OFFSET | long | 是 | 无 | 分块偏移量,起始值为 0 | Block offset (starting from 0) | + | X-BKREPO-SHA256 | string | 否 | 无 | 分块文件的 SHA256 校验值 | SHA256 checksum of the block | + | X-BKREPO-MD5 | string | 否 | 无 | 分块文件的 MD5 校验值 | MD5 checksum of the block | + | UPLOAD-TYPE | string | 是 | 无 | 上传类型,值为 `SEPARATE-UPLOAD` | Upload type (`SEPARATE-UPLOAD`) | + +- **请求体** + + - 文件流(二进制数据) + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 三、完成分块上传 + +- **接口地址**: `PUT /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `complete_block_upload` +- **功能说明**: + - 中文:完成分块上传 + - English: Complete block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------- | ---- | ------ | ------------------------------- | ---------------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + | X-BKREPO-SIZE | long | 是 | 0 | 文件总大小 | Total size of the file | + | X-BKREPO-OVERWRITE | boolean | 否 | false | 是否覆盖已存在文件 | Overwrite existing file| + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": "" + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 四、终止(取消)分块上传 + +- **接口地址**: `DELETE /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `abort_block_upload` +- **功能说明**: + - 中文:终止(取消)分块上传 + - English: Abort block upload + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------ | ---- | ------ | --------------------- | ---------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | null | 返回数据(为空) | Response data (null) | + | traceId | string | 请求跟踪 ID | Trace ID | + +--- + +## 五、查询已上传的分块列表 + +- **接口地址**: `GET /generic/separate/{project}/{repo}/{path}` +- **接口名称**: `list_uploaded_blocks` +- **功能说明**: + - 中文:查询已上传的分块列表 + - English: List uploaded blocks + +### 请求参数 + +- **路径参数** + + | 字段 | 类型 | 必须 | 描述 | Description | + | ------- | ------ | ---- | ----------- | ------------ | + | project | string | 是 | 项目名称 | Project name | + | repo | string | 是 | 仓库名称 | Repo name | + | path | string | 是 | 完整路径 | Full path | + +- **请求头** + + | 字段 | 类型 | 必须 | 默认值 | 描述 | Description | + | ------------------ | ------ | ---- | ------ | --------------------- | ---------------- | + | X-BKREPO-UPLOAD-ID | string | 是 | 无 | 分块上传 ID | Block upload ID | + +- **请求体** + + - 此接口请求体为空。 + +### 响应参数 + +- **响应体** + + ```json + { + "code": 0, + "message": null, + "data": [ + { + "size": 10240, + "sha256": "abc123def456...", + "startPos": 0, + "uploadId": "1.0" + }, + { + "size": 10240, + "sha256": "def456ghi789...", + "startPos": 10240, + "uploadId": "1.0" + } + ], + "traceId": null + } + ``` + +- **字段说明** + + | 字段 | 类型 | 描述 | Description | + | ------- | ------ | ------------------------------- | ---------------------------- | + | code | int | 错误编码,0 表示成功 | 0: success, others: failure | + | message | string | 错误消息 | The failure message | + | data | array | 分块信息列表 | List of block information | + | traceId | string | 请求跟踪 ID | Trace ID | + +- **分块信息字段说明** + + | 字段 | 类型 | 描述 | Description | + | -------- | ------ | ------------------ | ------------------------------ | + | size | long | 分块大小(字节) | Block size (in bytes) | + | sha256 | string | 分块的 SHA256 值 | SHA256 checksum of the block | + | startPos | long | 分块起始位置 | Block start position | + | uploadId | string | 分块上传 ID | Block upload ID | + +--- + +以上是优化后的 Markdown 格式的通用制品仓库分块文件操作指南。主要改进了以下方面: + +- **标题和章节编号**:增加了明确的章节编号,改善了文档结构,方便阅读和引用。 +- **表格格式**:修正了表格的对齐和格式,使其在 Markdown 渲染时显示正确。 +- **字段说明**:对响应参数中的嵌套字段使用了更清晰的表示方式,便于理解。 +- **一致性**:统一了字段描述、命名和表格格式,保持全篇文档风格一致。 +- **示例数据**:在示例的 JSON 响应中,提供了更贴近实际的示例数据,帮助用户更直观地理解接口返回内容。 + +希望以上优化能够帮助您更好地使用和理解该操作指南。如有任何疑问,欢迎随时提问! \ No newline at end of file diff --git a/src/backend/common/common-metadata/metadata-api/src/main/kotlin/com/tencent/bkrepo/common/metadata/constant/Constant.kt b/src/backend/common/common-metadata/metadata-api/src/main/kotlin/com/tencent/bkrepo/common/metadata/constant/Constant.kt index 3f63c95f1d..4f9aa8a032 100644 --- a/src/backend/common/common-metadata/metadata-api/src/main/kotlin/com/tencent/bkrepo/common/metadata/constant/Constant.kt +++ b/src/backend/common/common-metadata/metadata-api/src/main/kotlin/com/tencent/bkrepo/common/metadata/constant/Constant.kt @@ -32,3 +32,4 @@ const val ID = "_id" const val FAKE_SHA256 = "0000000000000000000000000000000000000000000000000000000000000000" const val FAKE_MD5 = "00000000000000000000000000000000" +const val FAKE_SEPARATE = "00000000FIRSTUPLOAD" diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/model/NodeAttribute.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/NodeAttribute.kt similarity index 90% rename from src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/model/NodeAttribute.kt rename to src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/NodeAttribute.kt index ea6d2001d3..2277288def 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/model/NodeAttribute.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/NodeAttribute.kt @@ -1,4 +1,4 @@ -package com.tencent.bkrepo.fs.server.model +package com.tencent.bkrepo.common.metadata.model data class NodeAttribute( // 用户id diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/TBlockNode.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/TBlockNode.kt index 4ec14a7c41..902d37bcf4 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/TBlockNode.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/model/TBlockNode.kt @@ -55,7 +55,9 @@ data class TBlockNode( val repoName: String, val size: Long, val endPos: Long = startPos + size - 1, - var deleted: LocalDateTime? = null + var deleted: LocalDateTime? = null, + val uploadId: String? = null, + var expireDate: LocalDateTime? = null, ) { companion object { const val BLOCK_IDX = "start_pos_idx" diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/BlockNodeService.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/BlockNodeService.kt index 3557450932..13cf0bc50f 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/BlockNodeService.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/BlockNodeService.kt @@ -47,6 +47,16 @@ interface BlockNodeService { createdDate: String ): List + /** + * 查询出当前版本内的分块 + */ + fun listBlocksInUploadId( + projectId: String, + repoName: String, + fullPath: String, + uploadId: String + ): List + /** * 创建分块 * */ @@ -55,9 +65,20 @@ interface BlockNodeService { storageCredentials: StorageCredentials? ): TBlockNode + /** + * 更新分块 + * */ + fun updateBlockUploadId( + projectId: String, + repoName: String, + fullPath: String, + uploadId: String + ) + /** * 删除旧分块,即删除非指定的nodeCurrentSha256的分块。 * 如果未指定nodeCurrentSha256,则删除节点所有分块 + * 如果指定uploadId,则删除该uploadId对应的分块,未指定则删除uploadId为null的所有分块 * @param projectId 项目id * @param repoName 仓库名 * @param fullPath 文件路径 @@ -65,7 +86,8 @@ interface BlockNodeService { fun deleteBlocks( projectId: String, repoName: String, - fullPath: String + fullPath: String, + uploadId: String? = null ) /** diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/impl/BlockNodeServiceImpl.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/impl/BlockNodeServiceImpl.kt index eea4787b5f..3145f036b4 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/impl/BlockNodeServiceImpl.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/blocknode/impl/BlockNodeServiceImpl.kt @@ -48,6 +48,7 @@ import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.and import org.springframework.data.mongodb.core.query.isEqualTo + import org.springframework.stereotype.Service import java.time.LocalDateTime @@ -68,6 +69,21 @@ class BlockNodeServiceImpl( } } + override fun updateBlockUploadId( + projectId: String, + repoName: String, + fullPath: String, + uploadId: String + ) { + val criteria = BlockNodeQueryHelper.fullPathCriteria(projectId, repoName, fullPath, false) + criteria.and(TBlockNode::uploadId).isEqualTo(uploadId) + val update = Update().set(TBlockNode::uploadId.name, null) + .set(TBlockNode::createdDate.name, LocalDateTime.now()) + .set(TBlockNode::expireDate.name, null) + blockNodeDao.updateMulti(Query(criteria), update) + logger.info("Update block node[$projectId/$repoName/$fullPath--/uploadId: $uploadId] success.") + } + override fun listBlocks( range: Range, projectId: String, @@ -79,15 +95,28 @@ class BlockNodeServiceImpl( return blockNodeDao.find(query) } + override fun listBlocksInUploadId( + projectId: String, + repoName: String, + fullPath: String, + uploadId: String + ): List { + val query = + BlockNodeQueryHelper.listQueryInUploadId(projectId, repoName, fullPath, uploadId) + return blockNodeDao.find(query) + } + override fun deleteBlocks( projectId: String, repoName: String, - fullPath: String + fullPath: String, + uploadId: String? ) { val criteria = BlockNodeQueryHelper.fullPathCriteria(projectId, repoName, fullPath, false) + .apply { and(TBlockNode::uploadId.name).isEqualTo(uploadId) } val update = BlockNodeQueryHelper.deleteUpdate() blockNodeDao.updateMulti(Query(criteria), update) - logger.info("Delete node blocks[$projectId/$repoName$fullPath] success.") + logger.info("Delete node blocks[$projectId/$repoName$fullPath] success. UPLOADID: $uploadId") } override fun moveBlocks(projectId: String, repoName: String, fullPath: String, dstFullPath: String) { diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/NodeDeleteOperation.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/NodeDeleteOperation.kt index a33f46b09d..c85c4bc0f4 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/NodeDeleteOperation.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/NodeDeleteOperation.kt @@ -84,4 +84,16 @@ interface NodeDeleteOperation { path: String, decreaseVolume: Boolean = true ): NodeDeleteResult + + /** + * 删除旧node + * 参数暂时保留,后续只保留nodeId,operator + */ + fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult } diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeBaseService.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeBaseService.kt index eee31154b0..a94c6daec4 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeBaseService.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeBaseService.kt @@ -43,6 +43,7 @@ import com.tencent.bkrepo.common.artifact.pojo.RepositoryType import com.tencent.bkrepo.common.artifact.properties.RouterControllerProperties import com.tencent.bkrepo.common.metadata.config.RepositoryProperties import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5 +import com.tencent.bkrepo.common.metadata.constant.FAKE_SEPARATE import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 import com.tencent.bkrepo.common.metadata.dao.node.NodeDao import com.tencent.bkrepo.common.metadata.dao.repo.RepositoryDao @@ -70,6 +71,7 @@ import com.tencent.bkrepo.common.service.util.HeaderUtils import com.tencent.bkrepo.common.service.util.SpringContextUtils.Companion.publishEvent import com.tencent.bkrepo.common.stream.constant.BinderType import com.tencent.bkrepo.common.stream.event.supplier.MessageSupplier +import com.tencent.bkrepo.fs.server.constant.UPLOADID_KEY import com.tencent.bkrepo.repository.constant.SYSTEM_USER import com.tencent.bkrepo.repository.pojo.metadata.MetadataModel import com.tencent.bkrepo.repository.pojo.node.NodeDetail @@ -195,7 +197,7 @@ abstract class NodeBaseService( mkdirs(projectId, repoName, PathUtils.resolveParent(fullPath), operator) // 创建节点 val node = buildTNode(this) - doCreate(node) + doCreate(node, separate = separate) afterCreate(repo, node) logger.info("Create node[/$projectId/$repoName$fullPath], sha256[$sha256] success.") return convertToDetail(node)!! @@ -355,7 +357,7 @@ abstract class NodeBaseService( } } - open fun doCreate(node: TNode, repository: TRepository? = null): TNode { + open fun doCreate(node: TNode, repository: TRepository? = null, separate: Boolean = false): TNode { try { nodeDao.insert(node) if (!node.folder) { @@ -366,6 +368,10 @@ abstract class NodeBaseService( quotaService.increaseUsedVolume(node.projectId, node.repoName, node.size) } } catch (exception: DuplicateKeyException) { + if (separate){ + logger.warn("Insert block base node[$node] error: [${exception.message}]") + throw ErrorCodeException(ArtifactMessageCode.NODE_CONFLICT, node.fullPath) + } logger.warn("Insert node[$node] error: [${exception.message}]") } @@ -412,23 +418,59 @@ abstract class NodeBaseService( open fun checkConflictAndQuota(createRequest: NodeCreateRequest, fullPath: String) { with(createRequest) { val existNode = nodeDao.findNode(projectId, repoName, fullPath) - if (existNode != null) { - if (!overwrite) { - throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, fullPath) - } else if (existNode.folder || this.folder) { - throw ErrorCodeException(ArtifactMessageCode.NODE_CONFLICT, fullPath) - } else { - val changeSize = this.size?.minus(existNode.size) ?: -existNode.size - quotaService.checkRepoQuota(projectId, repoName, changeSize) - deleteByFullPathWithoutDecreaseVolume(projectId, repoName, fullPath, operator) - quotaService.decreaseUsedVolume(projectId, repoName, existNode.size) + + // 如果节点不存在,进行配额检查后直接返回 + if (existNode == null) { + quotaService.checkRepoQuota(projectId, repoName, this.size ?: 0) + return + } + + // 如果不允许覆盖,抛出节点已存在异常 + if (!overwrite) { + throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, fullPath) + } + + // 如果存在文件夹冲突,抛出节点冲突异常 + if (existNode.folder || this.folder) { + throw ErrorCodeException(ArtifactMessageCode.NODE_CONFLICT, fullPath) + } + + // 子类的附加检查方法 + additionalCheck(existNode) + + // 计算变更大小,并检查仓库配额 + val changeSize = this.size?.minus(existNode.size) ?: -existNode.size + quotaService.checkRepoQuota(projectId, repoName, changeSize) + + if (separate) { + // 删除旧节点,并检查旧节点是否被删除,防止并发删除 + val currentVersion = metadata!![UPLOADID_KEY].toString() + val oldNodeId = currentVersion.substringAfter("/") + + if (oldNodeId == FAKE_SEPARATE) { + return + } + + val deleteRes = deleteNodeById(projectId, repoName, fullPath, operator, oldNodeId) + if (deleteRes.deletedNumber == 0L) { + logger.warn("Delete block base node[$fullPath] by [$operator] error: node was deleted") + throw ErrorCodeException(ArtifactMessageCode.NODE_NOT_FOUND, fullPath) } + logger.info("Delete block base node[$fullPath] by [$operator] success: $oldNodeId.") + } else { - quotaService.checkRepoQuota(projectId, repoName, this.size ?: 0) + deleteByFullPathWithoutDecreaseVolume(projectId, repoName, fullPath, operator) } + + // 更新配额使用量 + quotaService.decreaseUsedVolume(projectId, repoName, existNode.size) } } + open fun additionalCheck(existNode: TNode) { + // 默认不做任何操作 + } + private fun incrementFileReference(node: TNode, repository: TRepository?): Boolean { if (!validateParameter(node)) return false return try { diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeDeleteSupport.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeDeleteSupport.kt index 74bae3b2b8..0f3ec9f385 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeDeleteSupport.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeDeleteSupport.kt @@ -45,6 +45,7 @@ import com.tencent.bkrepo.common.metadata.util.NodeDeleteHelper.buildCriteria import com.tencent.bkrepo.common.metadata.util.NodeEventFactory.buildDeletedEvent import com.tencent.bkrepo.common.metadata.util.NodeEventFactory.buildNodeCleanEvent import com.tencent.bkrepo.common.metadata.util.NodeQueryHelper +import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.router.api.RouterControllerClient import org.slf4j.LoggerFactory import org.springframework.dao.DuplicateKeyException @@ -185,6 +186,35 @@ open class NodeDeleteSupport( return nodeDeleteResult } + override fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult { + require(!PathUtils.isRoot(fullPath)) { "Cannot delete root node." } + val deleteTime = LocalDateTime.now() + + val criteria = buildCriteria(projectId, repoName, fullPath).apply { + and(ID).isEqualTo(nodeId) + } + val query = Query(criteria) + + // 删除旧节点 + val updateDefinition = NodeQueryHelper.nodeDeleteUpdate(operator, deleteTime) + val updateResult = nodeDao.updateFirst(query, updateDefinition) + val deletedNum = updateResult.modifiedCount + + if (deletedNum == 0L) return NodeDeleteResult(0L, 0L, deleteTime) + + logger.info( + "Delete old block base node: $fullPath, operator: $operator, delete num : $deletedNum, " + + "delete time: $deleteTime success" + ) + return NodeDeleteResult(deletedNum, 0, deleteTime) + } + private fun delete( query: Query, operator: String, @@ -220,11 +250,11 @@ open class NodeDeleteSupport( deletedSize = nodeBaseService.aggregateComputeSize(deletedCriteria) quotaService.decreaseUsedVolume(projectId, repoName, deletedSize) } - fullPaths?.forEach { + fullPaths?.forEach { fullPath -> if (routerControllerProperties.enabled) { - routerControllerClient.removeNodes(projectId, repoName, it) + routerControllerClient.removeNodes(projectId, repoName, fullPath) } - publishEvent(buildDeletedEvent(projectId, repoName, it, operator)) + publishEvent(buildDeletedEvent(projectId, repoName, fullPath, operator)) } } catch (exception: DuplicateKeyException) { logger.warn("Delete node[$resourceKey] by [$operator] error: [${exception.message}]") diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeServiceImpl.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeServiceImpl.kt index 082c2f3f7d..5e7dd450a0 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeServiceImpl.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/NodeServiceImpl.kt @@ -166,6 +166,16 @@ class NodeServiceImpl( return NodeDeleteSupport(this).deleteBeforeDate(projectId, repoName, date, operator, path, decreaseVolume) } + override fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult { + return NodeDeleteSupport(this).deleteNodeById(projectId, repoName, fullPath, operator, nodeId) + } + @Transactional(rollbackFor = [Throwable::class]) override fun moveNode(moveRequest: NodeMoveCopyRequest): NodeDetail { return NodeMoveCopySupport(this).moveNode(moveRequest) diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeDeleteSupport.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeDeleteSupport.kt index 46d83081a5..e257cc4ac6 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeDeleteSupport.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeDeleteSupport.kt @@ -40,6 +40,7 @@ import com.tencent.bkrepo.repository.pojo.node.NodeDeleteResult import com.tencent.bkrepo.repository.pojo.node.NodeListOption import com.tencent.bkrepo.common.metadata.service.node.impl.NodeBaseService import com.tencent.bkrepo.common.metadata.service.node.impl.NodeDeleteSupport +import com.tencent.bkrepo.common.metadata.util.NodeDeleteHelper.buildCriteria import com.tencent.bkrepo.common.metadata.util.NodeQueryHelper import org.bson.types.ObjectId import org.jboss.logging.Logger @@ -146,6 +147,35 @@ class CenterNodeDeleteSupport( return NodeDeleteResult(deletedNum, deletedSize, LocalDateTime.now()) } + override fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult { + val clusterName = SecurityUtils.getClusterName() + if (clusterName.isNullOrEmpty()) { + return super.deleteNodeById(projectId, repoName, fullPath, operator, nodeId) + } + + val criteria = buildCriteria(projectId, repoName, fullPath).apply { + and(ID).isEqualTo(nodeId) + } + val node = nodeDao.findOne(Query(criteria)) + ?: return NodeDeleteResult(0,0, LocalDateTime.now()) + + if (node.folder) { + return delete(node, operator) + } + + return if (deleteFileNode(node, operator, nodeId)) { + NodeDeleteResult(1, node.size, LocalDateTime.now()) + } else { + NodeDeleteResult(0, 0, LocalDateTime.now()) + } + } + private fun delete(folder: TNode, operator: String): NodeDeleteResult { var deletedNumber = 0L var deletedSize = 0L @@ -180,21 +210,30 @@ class CenterNodeDeleteSupport( private fun deleteFileNode( node: TNode, - operator: String + operator: String, + nodeId: String? = null ): Boolean { if (!ClusterUtils.containsSrcCluster(node.clusterNames)) { return false } val srcCluster = SecurityUtils.getClusterName() ?: clusterProperties.self.name.toString() node.clusterNames = node.clusterNames.orEmpty().minus(srcCluster) - if (node.clusterNames.orEmpty().isEmpty()) { - super.deleteByFullPathWithoutDecreaseVolume(node.projectId, node.repoName, node.fullPath, operator) - quotaService.decreaseUsedVolume(node.projectId, node.repoName, node.size) - } else { + + if (node.clusterNames.orEmpty().isNotEmpty()) { + // 更新数据库中节点的 clusterNames val query = NodeQueryHelper.nodeQuery(node.projectId, node.repoName, node.fullPath) val update = Update().pull(TNode::clusterNames.name, srcCluster) nodeDao.updateFirst(query, update) + return true + } + + // 当 clusterNames 为空时,删除节点 + if (nodeId != null) { + super.deleteNodeById(node.projectId, node.repoName, node.fullPath, operator, nodeId) + } else { + super.deleteByFullPathWithoutDecreaseVolume(node.projectId, node.repoName, node.fullPath, operator) } + quotaService.decreaseUsedVolume(node.projectId, node.repoName, node.size) return true } diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeServiceImpl.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeServiceImpl.kt index d1b7beeba1..5db2fbbc14 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeServiceImpl.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/center/CenterNodeServiceImpl.kt @@ -36,6 +36,7 @@ import com.tencent.bkrepo.common.artifact.path.PathUtils import com.tencent.bkrepo.common.artifact.properties.RouterControllerProperties import com.tencent.bkrepo.common.metadata.condition.SyncCondition import com.tencent.bkrepo.common.metadata.config.RepositoryProperties +import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 import com.tencent.bkrepo.common.metadata.dao.node.NodeDao import com.tencent.bkrepo.common.metadata.dao.repo.RepositoryDao import com.tencent.bkrepo.common.metadata.model.TMetadata @@ -116,27 +117,6 @@ class CenterNodeServiceImpl( return repo } - override fun checkConflictAndQuota(createRequest: NodeCreateRequest, fullPath: String) { - with(createRequest) { - val existNode = nodeDao.findNode(projectId, repoName, fullPath) - if (existNode != null) { - if (!overwrite) { - throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, fullPath) - } else if (existNode.folder || this.folder) { - throw ErrorCodeException(ArtifactMessageCode.NODE_CONFLICT, fullPath) - } else { - ClusterUtils.checkIsSrcCluster(existNode.clusterNames) - val changeSize = this.size?.minus(existNode.size) ?: -existNode.size - quotaService.checkRepoQuota(projectId, repoName, changeSize) - deleteByFullPathWithoutDecreaseVolume(projectId, repoName, fullPath, operator) - quotaService.decreaseUsedVolume(projectId, repoName, existNode.size) - } - } else { - quotaService.checkRepoQuota(projectId, repoName, this.size ?: 0) - } - } - } - override fun deleteByFullPathWithoutDecreaseVolume( projectId: String, repoName: String, fullPath: String, operator: String ) { @@ -156,9 +136,9 @@ class CenterNodeServiceImpl( return node } - override fun doCreate(node: TNode, repository: TRepository?): TNode { + override fun doCreate(node: TNode, repository: TRepository?, separate: Boolean): TNode { if (SecurityUtils.getClusterName().isNullOrBlank()) { - return super.doCreate(node, repository) + return super.doCreate(node, repository, separate) } try { nodeDao.insert(node) @@ -166,6 +146,10 @@ class CenterNodeServiceImpl( quotaService.increaseUsedVolume(node.projectId, node.repoName, node.size) } } catch (exception: DuplicateKeyException) { + if (separate){ + logger.warn("Insert block base node[$node] error: [${exception.message}]") + throw ErrorCodeException(ArtifactMessageCode.NODE_CONFLICT, node.fullPath) + } logger.warn("Insert node[$node] error: [${exception.message}]") } @@ -178,7 +162,7 @@ class CenterNodeServiceImpl( val normalizeFullPath = PathUtils.normalizeFullPath(fullPath) val existNode = nodeDao.findNode(projectId, repoName, normalizeFullPath) ?: return super.createNode(createRequest) - if (sha256 == existNode.sha256) { + if (sha256 == existNode.sha256 && sha256 != FAKE_SHA256) { val clusterNames = existNode.clusterNames.orEmpty().toMutableSet() clusterNames.add(srcCluster) val query = NodeQueryHelper.nodeQuery(projectId, repoName, normalizeFullPath) @@ -253,6 +237,22 @@ class CenterNodeServiceImpl( ) } + override fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult { + return CenterNodeDeleteSupport(this, clusterProperties).deleteNodeById( + projectId, + repoName, + fullPath, + operator, + nodeId + ) + } + override fun restoreNode(restoreContext: RestoreContext): NodeRestoreResult { return CenterNodeRestoreSupport(this).restoreNode(restoreContext) } @@ -269,6 +269,11 @@ class CenterNodeServiceImpl( return CenterNodeRenameSupport(this, clusterProperties).renameNode(renameRequest) } + override fun additionalCheck(existNode: TNode) { + // center 检查请求来源cluster是否是资源的唯一拥有者 + ClusterUtils.checkIsSrcCluster(existNode.clusterNames) + } + companion object { private val logger = LoggerFactory.getLogger(CenterNodeServiceImpl::class.java) diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/edge/EdgeNodeServiceImpl.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/edge/EdgeNodeServiceImpl.kt index 83fa9ea862..0b0669ecac 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/edge/EdgeNodeServiceImpl.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/service/node/impl/edge/EdgeNodeServiceImpl.kt @@ -201,6 +201,16 @@ class EdgeNodeServiceImpl( return NodeDeleteSupport(this).deleteBeforeDate(projectId, repoName, date, operator, path, decreaseVolume) } + override fun deleteNodeById( + projectId: String, + repoName: String, + fullPath: String, + operator: String, + nodeId: String + ): NodeDeleteResult { + return NodeDeleteSupport(this).deleteNodeById(projectId, repoName, fullPath, operator, nodeId) + } + @Transactional(rollbackFor = [Throwable::class]) override fun moveNode(moveRequest: NodeMoveCopyRequest): NodeDetail { ignoreException( diff --git a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/util/BlockNodeQueryHelper.kt b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/util/BlockNodeQueryHelper.kt index 396e6b14e5..e90169af3d 100644 --- a/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/util/BlockNodeQueryHelper.kt +++ b/src/backend/common/common-metadata/metadata-service/src/main/kotlin/com/tencent/bkrepo/common/metadata/util/BlockNodeQueryHelper.kt @@ -59,10 +59,26 @@ object BlockNodeQueryHelper { TBlockNode::startPos.gt(range.end), TBlockNode::endPos.lt(range.start) ) + .and(TBlockNode::uploadId).isEqualTo(null) val query = Query(criteria).with(Sort.by(TBlockNode::createdDate.name)) return query } + fun listQueryInUploadId( + projectId: String, + repoName: String, + fullPath: String, + uploadId: String, + ):Query { + val criteria = where(TBlockNode::nodeFullPath).isEqualTo(fullPath) + .and(TBlockNode::projectId).isEqualTo(projectId) + .and(TBlockNode::repoName).isEqualTo(repoName) + .and(TBlockNode::deleted).isEqualTo(null) + .and(TBlockNode::uploadId).isEqualTo(uploadId) + .and(TBlockNode::expireDate).gt(LocalDateTime.now()) + return Query(criteria) + } + fun fullPathCriteria(projectId: String, repoName: String, fullPath: String, deep: Boolean): Criteria { val criteria = if (deep) { where(TBlockNode::nodeFullPath).regex("^${EscapeUtils.escapeRegex(fullPath)}/") diff --git a/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/config/ReceiveProperties.kt b/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/config/ReceiveProperties.kt index 67248ff1c3..cfc3d96aa9 100644 --- a/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/config/ReceiveProperties.kt +++ b/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/config/ReceiveProperties.kt @@ -32,6 +32,7 @@ package com.tencent.bkrepo.common.storage.config import org.springframework.util.unit.DataSize +import java.time.Duration /** * 文件接收配置 @@ -73,8 +74,14 @@ data class ReceiveProperties( * 每秒接收数据量 */ var rateLimit: DataSize = DataSize.ofBytes(-1), + /** * 限速熔断阈值,当仓库配置的rateLimit小于等于限速熔断阈值时则直接将请求断开 */ var circuitBreakerThreshold: DataSize = DataSize.ofKilobytes(1), + + /** + * 接受分块过期时间, 默认12小时 + */ + var blockExpireTime: Duration = Duration.ofHours(12), ) diff --git a/src/backend/fs/api-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/constant/Constants.kt b/src/backend/fs/api-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/constant/Constants.kt index d2f9c3b457..f58c73f399 100644 --- a/src/backend/fs/api-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/constant/Constants.kt +++ b/src/backend/fs/api-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/constant/Constants.kt @@ -33,3 +33,5 @@ const val JWT_CLAIMS_REPOSITORY = "repository" const val JWT_CLAIMS_PERMIT = "permit" const val FS_ATTR_KEY = "fs:attr" +const val UPLOADID_KEY = "UPLOADID" + diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/handler/NodeOperationsHandler.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/handler/NodeOperationsHandler.kt index c278ad681c..04412756a4 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/handler/NodeOperationsHandler.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/handler/NodeOperationsHandler.kt @@ -32,15 +32,15 @@ import com.tencent.bkrepo.common.api.message.CommonMessageCode import com.tencent.bkrepo.common.artifact.api.ArtifactInfo import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5 import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 +import com.tencent.bkrepo.common.metadata.model.NodeAttribute +import com.tencent.bkrepo.common.metadata.model.NodeAttribute.Companion.DEFAULT_MODE +import com.tencent.bkrepo.common.metadata.model.NodeAttribute.Companion.NOBODY import com.tencent.bkrepo.common.metadata.service.fs.FsService import com.tencent.bkrepo.common.metadata.service.metadata.RMetadataService import com.tencent.bkrepo.common.metadata.service.repo.RRepositoryService import com.tencent.bkrepo.common.storage.core.overlay.OverlayRangeUtils import com.tencent.bkrepo.fs.server.constant.FS_ATTR_KEY import com.tencent.bkrepo.fs.server.context.ReactiveArtifactContextHolder -import com.tencent.bkrepo.fs.server.model.NodeAttribute -import com.tencent.bkrepo.fs.server.model.NodeAttribute.Companion.DEFAULT_MODE -import com.tencent.bkrepo.fs.server.model.NodeAttribute.Companion.NOBODY import com.tencent.bkrepo.fs.server.request.ChangeAttributeRequest import com.tencent.bkrepo.fs.server.request.LinkRequest import com.tencent.bkrepo.fs.server.request.MoveRequest diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt index b590d7fd3f..9e715692bf 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt @@ -31,13 +31,13 @@ import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5 import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 +import com.tencent.bkrepo.common.metadata.model.NodeAttribute import com.tencent.bkrepo.common.metadata.model.TBlockNode import com.tencent.bkrepo.common.metadata.service.fs.FsService import com.tencent.bkrepo.common.metadata.service.metadata.RMetadataService import com.tencent.bkrepo.fs.server.config.properties.StreamProperties import com.tencent.bkrepo.fs.server.constant.FS_ATTR_KEY import com.tencent.bkrepo.fs.server.context.ReactiveArtifactContextHolder -import com.tencent.bkrepo.fs.server.model.NodeAttribute import com.tencent.bkrepo.fs.server.request.BlockRequest import com.tencent.bkrepo.fs.server.request.FlushRequest import com.tencent.bkrepo.fs.server.request.StreamRequest diff --git a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericArtifactInfo.kt b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericArtifactInfo.kt index 570c6482f1..57c8237f37 100644 --- a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericArtifactInfo.kt +++ b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericArtifactInfo.kt @@ -43,6 +43,7 @@ class GenericArtifactInfo( const val BLOCK_MAPPING_URI = "/block/{projectId}/{repoName}/**" const val DELTA_MAPPING_URI = "/delta/{projectId}/{repoName}/**" const val BATCH_MAPPING_URI = "/batch/{projectId}/{repoName}" + const val SEPARATE_MAPPING_URI = "/separate/{projectId}/{repoName}/**" const val CHUNKED_UPLOAD_MAPPING_URI = "/{projectId}/{repoName}/**/chunked/uploads/**" } } diff --git a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/Constants.kt b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/Constants.kt index a57b395d8b..2d56f7aa29 100644 --- a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/Constants.kt +++ b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/Constants.kt @@ -40,6 +40,8 @@ const val HEADER_EXPIRES = BKREPO_PREFIX + "EXPIRES" const val HEADER_SIZE = BKREPO_PREFIX + "SIZE" const val HEADER_UPLOAD_ID = BKREPO_PREFIX + "UPLOAD-ID" const val HEADER_SEQUENCE = BKREPO_PREFIX + "SEQUENCE" +const val HEADER_OFFSET = BKREPO_PREFIX + "OFFSET" +const val HEADER_FILE_SIZE = BKREPO_PREFIX + "SIZE" const val HEADER_OLD_FILE_PATH = BKREPO_PREFIX + "OLD-FILE-PATH" const val BKREPO_META_PREFIX = "X-BKREPO-META-" @@ -60,3 +62,8 @@ const val CHUNKED_UPLOAD_CLIENT = "CHUNKED-UPLOAD-CLIENT" // block上传时直接写入文件指定位置 const val HEADER_BLOCK_APPEND = BKREPO_PREFIX + "BLOCK-APPEND" + +/** + * 分块上传版本后缀 + */ +const val SEPARATE_UPLOAD = "SEPARATE-UPLOAD" \ No newline at end of file diff --git a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/GenericMessageCode.kt b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/GenericMessageCode.kt index 931071fed2..073b7735fc 100644 --- a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/GenericMessageCode.kt +++ b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/constant/GenericMessageCode.kt @@ -40,7 +40,7 @@ enum class GenericMessageCode(private val businessCode: Int, private val key: St UPLOAD_ID_NOT_FOUND(1, "generic.uploadId.notfound"), LIST_DIR_NOT_ALLOWED(2, "generic.dir.not-allowed"), SIGN_FILE_NOT_FOUND(3, "generic.delta.sign-file.notfound"), - NODE_DATA_HAS_CHANGED(4, "generic.node.data.has.changed"), + NODE_DATA_ERROR(4, "generic.node.data.error"), DOWNLOAD_DIR_NOT_ALLOWED(5, "generic.download.dir.not-allowed"), ARTIFACT_SEARCH_FAILED(6, "generic.artifact.query.failed"), PIPELINE_ARTIFACT_OVERWRITE_NOT_ALLOWED(7, "generic.pipeline-artifact.overwrite.not-allowed"), @@ -49,6 +49,9 @@ enum class GenericMessageCode(private val businessCode: Int, private val key: St PIPELINE_REPO_MANUAL_UPLOAD_NOT_ALLOWED(10, "generic.pipeline-repo.manual-upload.not-allowed"), PIPELINE_ARTIFACT_PATH_ILLEGAL(11, "generic.pipeline.artifact.path.illegal"), CHUNKED_ARTIFACT_BROKEN(12, "generic.chunked.artifact.broken"), + BLOCK_FILE_NODE_CREATE_FAIL(13, "generic.block.file.node.create.fail"), + BLOCK_HEAD_NOT_FOUND(14, "generic.block.node.head.not-found"), + BLOCK_UPDATE_LIST_IS_NULL(15, "generic.block.update.list.is.null") ; override fun getBusinessCode() = businessCode diff --git a/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/pojo/SeparateBlockInfo.kt b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/pojo/SeparateBlockInfo.kt new file mode 100644 index 0000000000..020a75aaf9 --- /dev/null +++ b/src/backend/generic/api-generic/src/main/kotlin/com/tencent/bkrepo/generic/pojo/SeparateBlockInfo.kt @@ -0,0 +1,17 @@ +package com.tencent.bkrepo.generic.pojo + +import io.swagger.annotations.ApiModel +import io.swagger.annotations.ApiModelProperty + + +@ApiModel("新分块信息") +data class SeparateBlockInfo( + @ApiModelProperty("分块大小") + val size: Long, + @ApiModelProperty("分块sha256") + val sha256: String, + @ApiModelProperty("分块起始位置") + val startPos: Long, + @ApiModelProperty("分块uploadID") + val uploadId: String? +) \ No newline at end of file diff --git a/src/backend/generic/biz-generic/build.gradle.kts b/src/backend/generic/biz-generic/build.gradle.kts index 88361d8c4c..2be14408e3 100644 --- a/src/backend/generic/biz-generic/build.gradle.kts +++ b/src/backend/generic/biz-generic/build.gradle.kts @@ -36,4 +36,7 @@ dependencies { api(project(":common:common-generic")) api(project(":common:common-artifact:artifact-service")) implementation(project(":common:common-artifact:artifact-cache")) + testImplementation("org.mockito.kotlin:mockito-kotlin") + testImplementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo") + testImplementation("io.mockk:mockk") } diff --git a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericLocalRepository.kt b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericLocalRepository.kt index 5c49f8be96..276c6650fd 100644 --- a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericLocalRepository.kt +++ b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/artifact/GenericLocalRepository.kt @@ -66,6 +66,8 @@ import com.tencent.bkrepo.common.artifact.stream.EmptyInputStream import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.artifact.util.chunked.ChunkedUploadUtils import com.tencent.bkrepo.common.artifact.util.http.HttpRangeUtils +import com.tencent.bkrepo.common.metadata.model.TBlockNode +import com.tencent.bkrepo.common.metadata.service.blocknode.BlockNodeService import com.tencent.bkrepo.common.metadata.service.node.PipelineNodeService import com.tencent.bkrepo.common.query.model.Rule import com.tencent.bkrepo.common.security.manager.ci.CIPermissionManager @@ -84,6 +86,7 @@ import com.tencent.bkrepo.common.security.util.SecurityUtils import com.tencent.bkrepo.common.service.util.HeaderUtils import com.tencent.bkrepo.common.service.util.HttpContextHolder import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.common.storage.config.StorageProperties import com.tencent.bkrepo.common.storage.message.StorageErrorException import com.tencent.bkrepo.common.storage.pojo.FileInfo import com.tencent.bkrepo.generic.artifact.context.GenericArtifactSearchContext @@ -91,16 +94,19 @@ import com.tencent.bkrepo.generic.constant.BKREPO_META import com.tencent.bkrepo.generic.constant.BKREPO_META_PREFIX import com.tencent.bkrepo.generic.constant.CHUNKED_UPLOAD import com.tencent.bkrepo.generic.constant.GenericMessageCode -import com.tencent.bkrepo.generic.constant.HEADER_BLOCK_APPEND -import com.tencent.bkrepo.generic.constant.HEADER_EXPIRES import com.tencent.bkrepo.generic.constant.HEADER_MD5 -import com.tencent.bkrepo.generic.constant.HEADER_OVERWRITE import com.tencent.bkrepo.generic.constant.HEADER_SEQUENCE import com.tencent.bkrepo.generic.constant.HEADER_SHA256 -import com.tencent.bkrepo.generic.constant.HEADER_SIZE import com.tencent.bkrepo.generic.constant.HEADER_UPLOAD_ID import com.tencent.bkrepo.generic.constant.HEADER_UPLOAD_TYPE +import com.tencent.bkrepo.generic.constant.HEADER_OFFSET +import com.tencent.bkrepo.generic.constant.HEADER_SIZE +import com.tencent.bkrepo.generic.constant.HEADER_OVERWRITE +import com.tencent.bkrepo.generic.constant.HEADER_BLOCK_APPEND +import com.tencent.bkrepo.generic.constant.HEADER_EXPIRES +import com.tencent.bkrepo.generic.constant.SEPARATE_UPLOAD import com.tencent.bkrepo.generic.pojo.ChunkedResponseProperty +import com.tencent.bkrepo.generic.pojo.SeparateBlockInfo import com.tencent.bkrepo.generic.util.ChunkedRequestUtil.uploadResponse import com.tencent.bkrepo.replication.api.ClusterNodeClient import com.tencent.bkrepo.replication.api.ReplicaTaskClient @@ -125,6 +131,8 @@ import org.springframework.http.HttpMethod import org.springframework.stereotype.Component import org.springframework.util.unit.DataSize import java.net.URLDecoder +import java.time.Duration +import java.time.LocalDateTime import java.util.Base64 import java.util.Locale import java.util.UUID @@ -137,7 +145,9 @@ class GenericLocalRepository( private val replicaTaskClient: ReplicaTaskClient, private val clusterNodeClient: ClusterNodeClient, private val pipelineNodeService: PipelineNodeService, - private val ciPermissionManager: CIPermissionManager + private val ciPermissionManager: CIPermissionManager, + private val blockNodeService: BlockNodeService, + private val storageProperties: StorageProperties, ) : LocalRepository() { private val edgeClusterNodeCache = CacheBuilder.newBuilder() @@ -173,25 +183,78 @@ class GenericLocalRepository( val uploadId = context.request.getHeader(HEADER_UPLOAD_ID) val sequence = context.request.getHeader(HEADER_SEQUENCE)?.toInt() val uploadType = HeaderUtils.getHeader(HEADER_UPLOAD_TYPE) - if (isBlockUpload(uploadId, sequence)) { - this.blockUpload(uploadId, sequence!!, context) - context.response.contentType = MediaTypes.APPLICATION_JSON - context.response.writer.println(ResponseBuilder.success().toJsonString()) - } else if (isChunkedUpload(uploadType)) { - chunkedUpload(context) - } else { - val nodeDetail = storageManager.storeArtifactFile( - buildNodeCreateRequest(context), - context.getArtifactFile(), - context.storageCredentials + + when { + isSeparateUpload(uploadType) -> { + onSeparateUpload(context, uploadId) + } + isBlockUpload(uploadId, sequence) -> { + this.blockUpload(uploadId, sequence!!, context) + context.response.contentType = MediaTypes.APPLICATION_JSON + context.response.writer.println(ResponseBuilder.success().toJsonString()) + } + isChunkedUpload(uploadType) -> { + chunkedUpload(context) + } + else -> { + val nodeDetail = storageManager.storeArtifactFile( + buildNodeCreateRequest(context), + context.getArtifactFile(), + context.storageCredentials + ) + context.response.contentType = MediaTypes.APPLICATION_JSON + context.response.addHeader(X_CHECKSUM_MD5, context.getArtifactMd5()) + context.response.addHeader(X_CHECKSUM_SHA256, context.getArtifactSha256()) + context.response.writer.println(ResponseBuilder.success(nodeDetail).toJsonString()) + } + } + } + + private fun onSeparateUpload(context: ArtifactUploadContext, uploadId: String) { + with(context) { + + val blockArtifactFile = getArtifactFile() + val sha256 = getArtifactSha256() + + val offset = context.request.getHeader(HEADER_OFFSET)?.toLongOrNull() + val expires = storageProperties.receive.blockExpireTime + + val blockNode = TBlockNode( + createdBy = userId, + createdDate = LocalDateTime.now(), + nodeFullPath = artifactInfo.getArtifactFullPath(), + startPos = offset ?: throw ErrorCodeException(GenericMessageCode.BLOCK_HEAD_NOT_FOUND), + sha256 = sha256, + projectId = projectId, + repoName = repoName, + size = blockArtifactFile.getSize(), + uploadId = uploadId, + expireDate = calculateExpiryDateTime(expires) ) + + storageService.store(sha256, blockArtifactFile, storageCredentials) + + val blockNodeInfo = blockNodeService.createBlock(blockNode, storageCredentials) + + // Set response content type and write success response context.response.contentType = MediaTypes.APPLICATION_JSON - context.response.addHeader(X_CHECKSUM_MD5, context.getArtifactMd5()) - context.response.addHeader(X_CHECKSUM_SHA256, context.getArtifactSha256()) - context.response.writer.println(ResponseBuilder.success(nodeDetail).toJsonString()) + context.response.writer.println( + ResponseBuilder.success( + SeparateBlockInfo( + blockNodeInfo.size, + blockNodeInfo.sha256, + blockNodeInfo.startPos, + blockNodeInfo.uploadId + ) + ).toJsonString() + ) } } + private fun isSeparateUpload(uploadType: String?): Boolean { + return !uploadType.isNullOrEmpty() && uploadType == SEPARATE_UPLOAD + } + override fun onUploadSuccess(context: ArtifactUploadContext) { super.onUploadSuccess(context) if (HttpContextHolder.getRequestOrNull()?.getParameter(PARAM_REPLICATE).toBoolean()) { @@ -253,7 +316,8 @@ class GenericLocalRepository( val uploadId = HeaderUtils.getHeader(HEADER_UPLOAD_ID) val sequence = HeaderUtils.getHeader(HEADER_SEQUENCE)?.toInt() val uploadType = HeaderUtils.getHeader(HEADER_UPLOAD_TYPE) - if (!overwrite && !isBlockUpload(uploadId, sequence) && !isChunkedUpload(uploadType)) { + if (!overwrite && !isBlockUpload(uploadId, sequence) + && !isChunkedUpload(uploadType) && !isSeparateUpload(uploadType)) { with(context.artifactInfo) { nodeService.getNodeDetail(this)?.let { throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, getArtifactName()) @@ -881,6 +945,11 @@ class GenericLocalRepository( } } + private fun calculateExpiryDateTime(expireDuration: Duration): LocalDateTime { + val hoursToAdd = expireDuration.toHours().takeIf { it > 0 } ?: 12 // 如果 expireDuration <= 0,则使用 12 小时 + return LocalDateTime.now().plusHours(hoursToAdd) + } + companion object { private val logger = LoggerFactory.getLogger(GenericLocalRepository::class.java) diff --git a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/GenericController.kt b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/GenericController.kt index fa9a336e7e..6fce444949 100644 --- a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/GenericController.kt +++ b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/GenericController.kt @@ -205,6 +205,7 @@ class GenericController( scopeId = "#artifactInfo?.projectId", content = ActionAuditContent.NODE_UPLOAD_CONTENT ) + @Permission(ResourceType.NODE, PermissionAction.WRITE) @PutMapping(BLOCK_MAPPING_URI) fun completeBlockUpload( diff --git a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/SeparateBlockController.kt b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/SeparateBlockController.kt new file mode 100644 index 0000000000..134333aca4 --- /dev/null +++ b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/controller/SeparateBlockController.kt @@ -0,0 +1,123 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.generic.controller + +import com.tencent.bk.audit.annotations.ActionAuditRecord +import com.tencent.bk.audit.annotations.AuditAttribute +import com.tencent.bk.audit.annotations.AuditEntry +import com.tencent.bk.audit.annotations.AuditInstanceRecord +import com.tencent.bkrepo.auth.pojo.enums.PermissionAction +import com.tencent.bkrepo.auth.pojo.enums.ResourceType +import com.tencent.bkrepo.common.api.pojo.Response +import com.tencent.bkrepo.common.artifact.api.ArtifactPathVariable +import com.tencent.bkrepo.common.artifact.audit.ActionAuditContent +import com.tencent.bkrepo.common.artifact.audit.NODE_RESOURCE +import com.tencent.bkrepo.common.artifact.audit.NODE_CREATE_ACTION +import com.tencent.bkrepo.common.security.permission.Permission +import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.generic.artifact.GenericArtifactInfo +import com.tencent.bkrepo.generic.artifact.GenericArtifactInfo.Companion.SEPARATE_MAPPING_URI +import com.tencent.bkrepo.generic.constant.HEADER_UPLOAD_ID +import com.tencent.bkrepo.generic.pojo.SeparateBlockInfo +import com.tencent.bkrepo.generic.pojo.UploadTransactionInfo +import com.tencent.bkrepo.generic.service.UploadService +import org.springframework.web.bind.annotation.DeleteMapping +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.PutMapping +import org.springframework.web.bind.annotation.RequestAttribute +import org.springframework.web.bind.annotation.RequestHeader +import org.springframework.web.bind.annotation.RestController + +@RestController +class SeparateBlockController( + private val uploadService: UploadService, +) { + + @Permission(ResourceType.NODE, PermissionAction.WRITE) + @PostMapping(SEPARATE_MAPPING_URI) + fun startSeparateBlockUpload( + @RequestAttribute userId: String, + @ArtifactPathVariable artifactInfo: GenericArtifactInfo, + ): Response { + return ResponseBuilder.success(uploadService.startSeparateBlockUpload(userId, artifactInfo)) + } + + @AuditEntry( + actionId = NODE_CREATE_ACTION + ) + @ActionAuditRecord( + actionId = NODE_CREATE_ACTION, + instance = AuditInstanceRecord( + resourceType = NODE_RESOURCE, + instanceIds = "#artifactInfo?.getArtifactFullPath()", + instanceNames = "#artifactInfo?.getArtifactFullPath()" + ), + attributes = [ + AuditAttribute(name = ActionAuditContent.PROJECT_CODE_TEMPLATE, value = "#artifactInfo?.projectId"), + AuditAttribute(name = ActionAuditContent.REPO_NAME_TEMPLATE, value = "#artifactInfo?.repoName") + ], + scopeId = "#artifactInfo?.projectId", + content = ActionAuditContent.NODE_UPLOAD_CONTENT + ) + @Permission(ResourceType.NODE, PermissionAction.WRITE) + @PutMapping(SEPARATE_MAPPING_URI) + fun completeSeparateBlockUpload( + @RequestAttribute userId: String, + @RequestHeader(HEADER_UPLOAD_ID) uploadId: String, + @ArtifactPathVariable artifactInfo: GenericArtifactInfo, + ): Response { + uploadService.completeSeparateBlockUpload(userId, uploadId, artifactInfo) + return ResponseBuilder.success() + } + + @Permission(ResourceType.NODE, PermissionAction.WRITE) + @DeleteMapping(SEPARATE_MAPPING_URI) + fun abortSeparateBlockUpload( + @RequestAttribute userId: String, + @RequestHeader(HEADER_UPLOAD_ID) uploadId: String, + @ArtifactPathVariable artifactInfo: GenericArtifactInfo, + ): Response { + uploadService.abortSeparateBlockUpload(userId, uploadId, artifactInfo) + return ResponseBuilder.success() + } + + @Permission(ResourceType.REPO, PermissionAction.READ) + @GetMapping(SEPARATE_MAPPING_URI) + fun listSeparateBlock( + @RequestAttribute userId: String, + @RequestHeader(HEADER_UPLOAD_ID) uploadId: String, + @ArtifactPathVariable artifactInfo: GenericArtifactInfo, + ): Response> { + return ResponseBuilder.success(uploadService.separateListBlock(userId, uploadId, artifactInfo)) + } +} diff --git a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/listener/BuildDeletedEventListener.kt b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/listener/BuildDeletedEventListener.kt new file mode 100644 index 0000000000..55954f303c --- /dev/null +++ b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/listener/BuildDeletedEventListener.kt @@ -0,0 +1,44 @@ +package com.tencent.bkrepo.generic.listener + +import com.tencent.bkrepo.common.artifact.api.ArtifactInfo +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.common.artifact.event.base.EventType +import com.tencent.bkrepo.common.metadata.condition.SyncCondition +import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 +import com.tencent.bkrepo.common.metadata.service.blocknode.BlockNodeService +import com.tencent.bkrepo.common.metadata.service.node.NodeService +import org.slf4j.LoggerFactory +import org.springframework.context.annotation.Conditional +import org.springframework.context.event.EventListener +import org.springframework.scheduling.annotation.Async +import org.springframework.stereotype.Component + +@Component +@Conditional(SyncCondition::class) +class BuildDeletedEventListener( + private val nodeService: NodeService, + private val blockNodeService: BlockNodeService +) { + + @Async + @EventListener(ArtifactEvent::class) + fun handle(event: ArtifactEvent) { + if (event.type == EventType.NODE_DELETED) { + logger.info("accept artifact delete event: $event") + consumer(event) + } + } + + private fun consumer(event: ArtifactEvent) { + with(event) { + val node = nodeService.getDeletedNodeDetail(ArtifactInfo(projectId, repoName, resourceKey)).firstOrNull() + if (node?.sha256 == FAKE_SHA256 && !node.folder) { + blockNodeService.deleteBlocks(projectId, repoName, resourceKey) + } + } + } + + companion object { + private val logger = LoggerFactory.getLogger(BuildDeletedEventListener::class.java) + } +} \ No newline at end of file diff --git a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/service/UploadService.kt b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/service/UploadService.kt index 198562119e..e01c575b24 100644 --- a/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/service/UploadService.kt +++ b/src/backend/generic/biz-generic/src/main/kotlin/com/tencent/bkrepo/generic/service/UploadService.kt @@ -32,10 +32,10 @@ package com.tencent.bkrepo.generic.service import com.tencent.bk.audit.context.ActionAuditContext +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.api.exception.BadRequestException import com.tencent.bkrepo.common.api.exception.ErrorCodeException import com.tencent.bkrepo.common.api.message.CommonMessageCode -import com.tencent.bkrepo.common.api.util.Preconditions import com.tencent.bkrepo.common.artifact.api.ArtifactFile import com.tencent.bkrepo.common.artifact.message.ArtifactMessageCode import com.tencent.bkrepo.common.artifact.pojo.RepositoryCategory @@ -43,6 +43,11 @@ import com.tencent.bkrepo.common.artifact.repository.context.ArtifactContextHold import com.tencent.bkrepo.common.artifact.repository.context.ArtifactRemoveContext import com.tencent.bkrepo.common.artifact.repository.context.ArtifactUploadContext import com.tencent.bkrepo.common.artifact.repository.core.ArtifactService +import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5 +import com.tencent.bkrepo.common.metadata.constant.FAKE_SEPARATE +import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 +import com.tencent.bkrepo.common.metadata.model.NodeAttribute +import com.tencent.bkrepo.common.metadata.service.blocknode.BlockNodeService import com.tencent.bkrepo.common.metadata.service.node.NodeService import com.tencent.bkrepo.common.metadata.service.repo.RepositoryService import com.tencent.bkrepo.common.security.util.SecurityUtils @@ -53,13 +58,18 @@ import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import com.tencent.bkrepo.common.storage.message.StorageErrorException import com.tencent.bkrepo.common.storage.pojo.FileInfo +import com.tencent.bkrepo.fs.server.constant.FS_ATTR_KEY +import com.tencent.bkrepo.fs.server.constant.UPLOADID_KEY import com.tencent.bkrepo.generic.artifact.GenericArtifactInfo import com.tencent.bkrepo.generic.artifact.GenericLocalRepository import com.tencent.bkrepo.generic.constant.GenericMessageCode import com.tencent.bkrepo.generic.constant.HEADER_EXPIRES +import com.tencent.bkrepo.generic.constant.HEADER_FILE_SIZE import com.tencent.bkrepo.generic.constant.HEADER_OVERWRITE import com.tencent.bkrepo.generic.pojo.BlockInfo +import com.tencent.bkrepo.generic.pojo.SeparateBlockInfo import com.tencent.bkrepo.generic.pojo.UploadTransactionInfo +import com.tencent.bkrepo.repository.pojo.metadata.MetadataModel import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest import org.slf4j.LoggerFactory import org.springframework.stereotype.Service @@ -72,6 +82,7 @@ class UploadService( private val nodeService: NodeService, private val storageService: StorageService, private val repositoryService: RepositoryService, + private val blockNodeService: BlockNodeService, ) : ArtifactService() { fun upload(artifactInfo: GenericArtifactInfo, file: ArtifactFile) { @@ -87,14 +98,14 @@ class UploadService( fun startBlockUpload(userId: String, artifactInfo: GenericArtifactInfo): UploadTransactionInfo { with(artifactInfo) { - val expires = getLongHeader(HEADER_EXPIRES) + val overwrite = getBooleanHeader(HEADER_OVERWRITE) - Preconditions.checkArgument(expires >= 0, "expires") + // 判断文件是否存在 if (!overwrite && nodeService.checkExist(this)) { logger.warn( "User[${SecurityUtils.getPrincipal()}] start block upload [$artifactInfo] failed: " + - "artifact already exists." + "artifact already exists." ) throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, getArtifactName()) } @@ -114,6 +125,73 @@ class UploadService( } } + fun startSeparateBlockUpload(userId: String, artifactInfo: GenericArtifactInfo): UploadTransactionInfo { + with(artifactInfo) { + // 获取请求头中是否允许覆盖的参数 + val overwrite = getBooleanHeader(HEADER_OVERWRITE) + + val node = nodeService.getNodeDetail(this) + + val expires = getLongHeader(HEADER_EXPIRES).takeIf { it > 0 } ?: TRANSACTION_EXPIRES + + val oldNodeId = node?.nodeInfo?.id ?: FAKE_SEPARATE + // 如果不允许覆盖且节点已经存在,抛出异常 + if (node != null && !overwrite) { + throw ErrorCodeException(ArtifactMessageCode.NODE_EXISTED, getArtifactName()) + } + + // 生成唯一的 uploadId,作为上传会话的标识 + val uploadId = "${StringPool.uniqueId()}/$oldNodeId" + + // 创建上传事务信息,设置过期时间 + val uploadTransaction = UploadTransactionInfo( + uploadId = uploadId, + expireSeconds = expires + ) + // 记录上传启动的日志 + logger.info( + "User[${SecurityUtils.getPrincipal()}] start block upload [$artifactInfo] success, " + + "version: $uploadId." + ) + return uploadTransaction + } + } + + fun blockBaseNodeCreate(userId: String, artifactInfo: GenericArtifactInfo, uploadId: String) { + val attributes = NodeAttribute( + uid = NodeAttribute.NOBODY, + gid = NodeAttribute.NOBODY, + mode = NodeAttribute.DEFAULT_MODE + ) + val fsAttr = MetadataModel( + key = FS_ATTR_KEY, + value = attributes, + ) + val versionMetadata = MetadataModel( + key = UPLOADID_KEY, + value = uploadId + ) + val fileSize = getLongHeader(HEADER_FILE_SIZE).takeIf { it > 0L } + ?: throw ErrorCodeException(GenericMessageCode.BLOCK_HEAD_NOT_FOUND) + val request = NodeCreateRequest( + projectId = artifactInfo.projectId, + repoName = artifactInfo.repoName, + folder = false, + fullPath = artifactInfo.getArtifactFullPath(), + sha256 = FAKE_SHA256, + md5 = FAKE_MD5, + operator = userId, + size = fileSize, + overwrite = getBooleanHeader(HEADER_OVERWRITE), + expires = getLongHeader(HEADER_EXPIRES), + nodeMetadata = listOf(fsAttr, versionMetadata), + separate = true, + metadata = mapOf(UPLOADID_KEY to uploadId) + ) + ActionAuditContext.current().setInstance(request) + nodeService.createNode(request) + } + fun abortBlockUpload(userId: String, uploadId: String, artifactInfo: GenericArtifactInfo) { val storageCredentials = getStorageCredentials(artifactInfo) checkUploadId(uploadId, storageCredentials) @@ -122,6 +200,15 @@ class UploadService( logger.info("User[${SecurityUtils.getPrincipal()}] abort upload block [$artifactInfo] success.") } + fun abortSeparateBlockUpload(userId: String, uploadId: String, artifactInfo: GenericArtifactInfo) { + blockNodeService.deleteBlocks( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath(), + uploadId + ) + } + fun completeBlockUpload( userId: String, uploadId: String, @@ -136,7 +223,7 @@ class UploadService( val fileInfo = if (!sha256.isNullOrEmpty() && !md5.isNullOrEmpty() && size != null) { logger.info( "sha256 $sha256, md5 $md5, size $size for " + - "fullPath ${artifactInfo.getArtifactFullPath()} with uploadId $uploadId" + "fullPath ${artifactInfo.getArtifactFullPath()} with uploadId $uploadId" ) FileInfo(sha256, md5, size) } else { @@ -160,13 +247,71 @@ class UploadService( overwrite = getBooleanHeader(HEADER_OVERWRITE), operator = userId, expires = getLongHeader(HEADER_EXPIRES), - nodeMetadata = repository.resolveMetadata(HttpContextHolder.getRequest()) + nodeMetadata = repository.resolveMetadata(HttpContextHolder.getRequest()), ) ActionAuditContext.current().setInstance(request) nodeService.createNode(request) logger.info("User[${SecurityUtils.getPrincipal()}] complete upload [$artifactInfo] success.") } + fun completeSeparateBlockUpload(userId: String, uploadId: String, artifactInfo: GenericArtifactInfo) { + + // 获取并按起始位置排序块信息列表 + val blockInfoList = blockNodeService.listBlocksInUploadId( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath(), + uploadId = uploadId + ) + + blockInfoList.ifEmpty { + logger.warn("No block information found for uploadId: $uploadId") + throw ErrorCodeException(GenericMessageCode.BLOCK_UPDATE_LIST_IS_NULL, artifactInfo) + } + + // 计算所有块的总大小 + val totalSize = blockInfoList.sumOf { it.size } + + // 验证节点大小是否与块总大小一致 + if (getLongHeader(HEADER_FILE_SIZE) != totalSize) { + throw ErrorCodeException(GenericMessageCode.NODE_DATA_ERROR, artifactInfo) + } + + // 创建新的基础节点(Base Node) + try { + blockBaseNodeCreate(userId, artifactInfo, uploadId) + } catch (e: Exception) { + logger.error( + "Create block base node failed, file path [${artifactInfo.getArtifactFullPath()}], " + + "version : $uploadId" + ) + abortSeparateBlockUpload(userId, uploadId, artifactInfo) + throw e + } + + // 删除旧Block + blockNodeService.deleteBlocks( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath() + ) + + // 更新节点版本信息为null + blockNodeService.updateBlockUploadId( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath(), + uploadId + ) + + // 上传完成,记录日志 + logger.info( + "User [$userId] successfully completed block upload [uploadId: $uploadId], " + + "file path [${artifactInfo.getArtifactFullPath()}]." + ) + + } + fun listBlock(userId: String, uploadId: String, artifactInfo: GenericArtifactInfo): List { val storageCredentials = getStorageCredentials(artifactInfo) checkUploadId(uploadId, storageCredentials) @@ -177,6 +322,24 @@ class UploadService( } } + fun separateListBlock( + userId: String, + uploadId: String, + artifactInfo: GenericArtifactInfo + ): List { + + val blockInfoList = blockNodeService.listBlocksInUploadId( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath(), + uploadId = uploadId + ) + + return blockInfoList.map { blockInfo -> + SeparateBlockInfo(blockInfo.size, blockInfo.sha256, blockInfo.startPos, blockInfo.uploadId) + } + } + private fun checkUploadId(uploadId: String, storageCredentials: StorageCredentials?) { if (!storageService.checkBlockId(uploadId, storageCredentials)) { throw ErrorCodeException(GenericMessageCode.UPLOAD_ID_NOT_FOUND, uploadId) diff --git a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_en.properties b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_en.properties index 475697be26..d36ff09dc5 100644 --- a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_en.properties +++ b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_en.properties @@ -32,7 +32,7 @@ generic.uploadId.notfound=Upload id[{0}] not found generic.dir.not-allowed=Repository[{0}] does not allow listing directories[{1}] generic.delta.sign-file.notfound=Sign file [{0}] not found -generic.node.data.has.changed=Node[{0}] data has changed +generic.node.data.error=Node[{0}] data has changed or not found generic.download.dir.not-allowed=Download dir not allowed generic.artifact.query.failed=Artifact query failed: {0} generic.pipeline-artifact.overwrite.not-allowed=Note: There is a risk of tampering! The specified uploaded file belongs to the artifact of a specific pipeline build[{0}] and can only be overwritten in the current build @@ -41,3 +41,6 @@ generic.pipeline-repo.manual-upload.not-allowed=Manual upload to pipeline reposi generic.pipeline.metadata.incomplete=Pipeline metadata is incomplete. {0} is missing generic.pipeline.artifact.path.illegal=Note: The path parameter is incorrect. The specified file[{0}] belongs to the artifact of the pipeline repository[{1}] generic.chunked.artifact.broken=Chunked artifact broken +generic.block.file.node.create.fail=Block file information node [{0}] is not created or the version is inconsistent +generic.block.node.head.not-found=Both HEADER_OFFSET and HEADER_FILE_SIZE are null. +generic.block.update.list.is.null=The update list blocks is the empty diff --git a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_CN.properties b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_CN.properties index 6584fac835..e2de78a202 100644 --- a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_CN.properties +++ b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_CN.properties @@ -32,7 +32,7 @@ generic.uploadId.notfound=Upload id[{0}]不存在 generic.dir.not-allowed=仓库[{0}]不允许列出目录[{1}] generic.delta.sign-file.notfound=签名文件[{0}]未找到 -generic.node.data.has.changed=节点[{0}]数据已改变 +generic.node.data.error=节点[{0}]数据已改变或未找到 generic.download.dir.not-allowed=不允许下载目录 generic.artifact.query.failed=制品查询失败:{0} generic.pipeline-artifact.overwrite.not-allowed=注意:存在窜改风险!指定上传的文件属于特定流水线构建[{0}]的产物,只能在当次构建中被覆盖 @@ -40,4 +40,7 @@ generic.custom-artifact.overwrite.not-allowed=注意:存在窜改风险!指 generic.pipeline-repo.manual-upload.not-allowed=禁止手动上传到流水线仓库 generic.pipeline.metadata.incomplete=流水线元数据不完整. 缺少{0} generic.pipeline.artifact.path.illegal=注意:path参数出错,指定文件[{0}]属于流水线仓库[{1}]的制品 -generic.chunked.artifact.broken=分块上传制品文件已损坏 \ No newline at end of file +generic.chunked.artifact.broken=分块上传制品文件已损坏 +generic.block.file.node.create.fail=分块文件信息节点[{0}]未创建或版本不一致 +generic.block.node.head.not-found=请求头HEADER_OFFSET和HEADER_FILE_SIZE不可为空 +generic.block.update.list.is.null=更新分块列表为空 \ No newline at end of file diff --git a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_TW.properties b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_TW.properties index fae3bbd2bb..f718e757ac 100644 --- a/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_TW.properties +++ b/src/backend/generic/biz-generic/src/main/resources/i18n/messages_zh_TW.properties @@ -32,7 +32,7 @@ generic.uploadId.notfound=Upload id[{0}]不存在 generic.dir.not-allowed=倉庫[{0}]不允許列出目錄[{1}] generic.delta.sign-file.notfound=籤名文件[{0}]未找到 -generic.node.data.has.changed=節點[{0}]數據已改變 +generic.node.data.error=節點[{0}]數據已改變或未找到 generic.download.dir.not-allowed=不允許下載目錄 generic.artifact.query.failed=製品查詢失敗:{0} generic.pipeline-artifact.overwrite.not-allowed=註意:存在竄改風險!指定上傳的文件屬於特定流水線構建[{0}]的產物,只能在當次構建中被覆蓋 @@ -40,4 +40,7 @@ generic.custom-artifact.overwrite.not-allowed=已存在流水線歸檔產品[{0} generic.pipeline-repo.manual-upload.not-allowed=禁止手動上傳到流水線倉庫 generic.pipeline.metadata.incomplete=流水線元數據不完整,缺少{0} generic.pipeline.artifact.path.illegal=注意:path參數出錯,指定文件[{0}]屬於流水線倉庫[{1}]的製品 -generic.chunked.artifact.broken=製品已損壞 \ No newline at end of file +generic.chunked.artifact.broken=製品已損壞 +generic.block.file.node.create.fail=分塊文件信息節點[{0}]未創建或版本不一致 +generic.block.node.head.not-found=請求頭 HEADER_OFFSET 和 HEADER_FILE_SIZE 不可為空 +generic.block.update.list.is.null=更新分塊列表為空 \ No newline at end of file diff --git a/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConfiguration.kt b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConfiguration.kt new file mode 100644 index 0000000000..a706646a1e --- /dev/null +++ b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConfiguration.kt @@ -0,0 +1,8 @@ +package com.tencent.com.bkrepo.generic + +import org.springframework.boot.SpringBootConfiguration +import org.springframework.boot.autoconfigure.EnableAutoConfiguration + +@SpringBootConfiguration +@EnableAutoConfiguration +class SeparateTestConfiguration \ No newline at end of file diff --git a/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConstants.kt b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConstants.kt new file mode 100644 index 0000000000..fe46cd1673 --- /dev/null +++ b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/SeparateTestConstants.kt @@ -0,0 +1,8 @@ +package com.tencent.com.bkrepo.generic + +const val UT_PROJECT_ID = "ut-project" +const val UT_REPO_NAME = "ut-repo" +const val UT_USER = "ut-user" +const val UT_SHA256 = "ut1000000000000000000000000" +const val BLOCK_SIZE = 10 * 1024 * 1024L // 10MB +const val UT_VERSION = "ut-version/version" diff --git a/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/service/BlockNodeServiceTest.kt b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/service/BlockNodeServiceTest.kt new file mode 100644 index 0000000000..1e59fd75d0 --- /dev/null +++ b/src/backend/generic/biz-generic/src/test/kotlin/com/tencent/com/bkrepo/generic/service/BlockNodeServiceTest.kt @@ -0,0 +1,261 @@ +package com.tencent.com.bkrepo.generic.service + +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.artifact.api.ArtifactFile +import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.metadata.dao.blocknode.BlockNodeDao +import com.tencent.bkrepo.common.metadata.dao.node.NodeDao +import com.tencent.bkrepo.common.metadata.model.TBlockNode +import com.tencent.bkrepo.common.metadata.model.TNode +import com.tencent.bkrepo.common.metadata.service.blocknode.BlockNodeService +import com.tencent.bkrepo.common.storage.StorageAutoConfiguration +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.generic.artifact.GenericArtifactInfo +import com.tencent.com.bkrepo.generic.BLOCK_SIZE +import com.tencent.com.bkrepo.generic.UT_PROJECT_ID +import com.tencent.com.bkrepo.generic.UT_REPO_NAME +import com.tencent.com.bkrepo.generic.UT_SHA256 +import com.tencent.com.bkrepo.generic.UT_USER +import com.tencent.com.bkrepo.generic.UT_VERSION +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito +import org.mockito.kotlin.any +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.autoconfigure.ImportAutoConfiguration +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.annotation.ComponentScan +import org.springframework.data.domain.Sort +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.and +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.mongodb.core.query.where +import org.springframework.test.context.TestPropertySource +import java.time.LocalDateTime +import kotlin.random.Random + + +@DataMongoTest +@EnableAutoConfiguration +@ComponentScan(basePackages = + ["com.tencent.bkrepo.generic.service", + "com.tencent.bkrepo.common.storage", + "com.tencent.bkrepo.common.metadata"]) +@ImportAutoConfiguration(StorageAutoConfiguration::class, TaskExecutionAutoConfiguration::class) +@TestPropertySource(locations = ["classpath:bootstrap-ut.properties"]) +class BlockNodeServiceTest { + + @MockBean + lateinit var nodeDao: NodeDao + + @Autowired + lateinit var blockNodeService: BlockNodeService + + @Autowired + lateinit var blockNodeDao: BlockNodeDao + + @Autowired + lateinit var storageService: StorageService + + private val storageCredentials = null + private val range = Range.full(Long.MAX_VALUE) + private lateinit var artifact: GenericArtifactInfo + private lateinit var createdDate: LocalDateTime + private lateinit var node: TNode + + @BeforeEach + fun beforeEach() { + val criteria = where(TBlockNode::repoName).isEqualTo(UT_REPO_NAME) + blockNodeDao.remove(Query(criteria)) + + artifact = GenericArtifactInfo(UT_PROJECT_ID, UT_REPO_NAME, "/newFile") + createdDate = LocalDateTime.now().minusSeconds(1) + node = TNode( + createdBy = UT_USER, + createdDate = createdDate, + lastModifiedBy = UT_USER, + lastModifiedDate = createdDate, + folder = false, + path = StringPool.ROOT, + name = "file", + fullPath = "/file", + size = BLOCK_SIZE, + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME + ) + Mockito.`when`(nodeDao.findNode(any(), any(), any())) + .thenReturn(node) + } + + @DisplayName("测试BlockUpload") + @Test + fun testBlockUpload() { + setupBlocks() + val blocks = listBlocks("/file") + assertBlocks(blocks, expectedSize = 2, blockSize = BLOCK_SIZE, UT_VERSION) + } + + @DisplayName("测试BlockCompletion") + @Test + fun testBlockCompletion() { + setupBlocks() + val blocks = listBlocks("/file") + assertBlocks(blocks, expectedSize = 2, blockSize = BLOCK_SIZE, UT_VERSION) + + // 完成上传 + completeUpload(blocks) + + val completeBlocks = blockNodeService.listBlocks( + range, + UT_PROJECT_ID, + UT_REPO_NAME, + "/file", + createdDate.toString() + ) + assertBlocks(completeBlocks, expectedSize = 2, blockSize = BLOCK_SIZE, null) + Assertions.assertEquals(0, completeBlocks[0].startPos) + Assertions.assertEquals(BLOCK_SIZE, completeBlocks[1].startPos) + } + + @DisplayName("测试BlockAbort") + @Test + fun testBlockAbort() { + setupBlocks() + val blocks = listBlocks("/file") + assertBlocks(blocks, expectedSize = 2, blockSize = BLOCK_SIZE, UT_VERSION) + + // 中止上传 + blockNodeService.deleteBlocks( + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME, + fullPath = "/file", + uploadId = UT_VERSION + ) + + val deleteBlocksQuery = deleteBlocksQuery("/file", UT_PROJECT_ID, UT_REPO_NAME, createdDate) + val afterBlocks = blockNodeDao.find(deleteBlocksQuery) + Assertions.assertEquals(2, afterBlocks.size) + afterBlocks.forEach { afterBlock -> + Assertions.assertNotNull(afterBlock.deleted) + } + } + + @DisplayName("测试获取范围内的分块") + @Test + fun testListRangeBlockNodes() { + val createdDate = LocalDateTime.now().minusSeconds(1).toString() + createBlockNode(10) + createBlockNode(20) + createBlockNode(30) + val range = Range(startPosition = 20, endPosition = 40, total = 100) + val blocks = blockNodeService.listBlocks( + range = range, + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME, + fullPath = "/file", + createdDate = createdDate + ) + Assertions.assertEquals(2, blocks.size) + Assertions.assertEquals(20, blocks.first().startPos) + Assertions.assertEquals(30, blocks[1].startPos) + } + + private fun createBlockNode( + startPos: Long = 0, + fullPath: String = "/file", + sha256: String = "" + ): TBlockNode { + val blockNode = TBlockNode( + createdBy = UT_USER, + createdDate = LocalDateTime.now(), + nodeFullPath = fullPath, + startPos = startPos, + sha256 = sha256, + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME, + size = 1 + ) + return blockNodeService.createBlock(blockNode, storageCredentials) + } + + private fun createTempArtifactFile(): ArtifactFile { + val data = Random.nextBytes(BLOCK_SIZE.toInt()) // 10MB + val tempFile = createTempFile() + tempFile.writeBytes(data) + return FileSystemArtifactFile(tempFile) + } + + private fun createAndStoreBlock(i: Int,fullPath: String = "/file") { + val blockNode = TBlockNode( + createdBy = UT_USER, + createdDate = LocalDateTime.now(), + nodeFullPath = fullPath, + startPos = i * BLOCK_SIZE, + sha256 = "$UT_SHA256$i", + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME, + size = BLOCK_SIZE, + uploadId = UT_VERSION, + expireDate = LocalDateTime.now().plusDays(1) + ) + val artifactFile = createTempArtifactFile() + storageService.store(blockNode.sha256, artifactFile, storageCredentials) + blockNodeService.createBlock(blockNode, storageCredentials) + } + private fun listBlocks(fullPath: String = "/file"): List { + return blockNodeService.listBlocksInUploadId( + projectId = UT_PROJECT_ID, + repoName = UT_REPO_NAME, + fullPath = fullPath, + uploadId = UT_VERSION, + ) + } + + private fun assertBlocks(blocks: List, + expectedSize: Int, + blockSize: Long, + version: String?,) { + Assertions.assertEquals(expectedSize, blocks.size) + blocks.forEach { block -> + Assertions.assertEquals(blockSize, block.size) + Assertions.assertTrue(storageService.exist(block.sha256, storageCredentials)) + Assertions.assertEquals(block.uploadId, version) + } + } + + private fun completeUpload(blocks: List) { + val block = blocks.first() + block.uploadId?.let { + blockNodeService.updateBlockUploadId( + block.projectId, + block.repoName, + block.nodeFullPath, + it + ) + } + } + + private fun deleteBlocksQuery(fullPath: String, + projectId: String, + repoName: String, + createdDate: LocalDateTime): Query { + val criteria = where(TBlockNode::nodeFullPath).isEqualTo(fullPath) + .and(TBlockNode::projectId.name).isEqualTo(projectId) + .and(TBlockNode::repoName.name).isEqualTo(repoName) + .and(TBlockNode::createdDate).gt(createdDate).lt(LocalDateTime.now()) + val query = Query(criteria).with(Sort.by(TBlockNode::createdDate.name)) + return query + } + + private fun setupBlocks() { + for (i in 0..1) { + createAndStoreBlock(i) + } + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/other/ExpiredBlockNodeMarkupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/other/ExpiredBlockNodeMarkupJob.kt new file mode 100644 index 0000000000..1c3e65e07e --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/other/ExpiredBlockNodeMarkupJob.kt @@ -0,0 +1,86 @@ + +package com.tencent.bkrepo.job.batch.task.other + +import com.tencent.bkrepo.common.metadata.service.blocknode.BlockNodeService +import com.tencent.bkrepo.job.SHARDING_COUNT +import com.tencent.bkrepo.job.batch.base.DefaultContextMongoDbJob +import com.tencent.bkrepo.job.batch.base.JobContext +import com.tencent.bkrepo.job.batch.utils.TimeUtils +import com.tencent.bkrepo.job.config.properties.ExpiredBlockNodeMarkupJobProperties +import org.slf4j.LoggerFactory +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.and +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.mongodb.core.query.where +import org.springframework.stereotype.Component +import java.time.Duration +import java.time.LocalDateTime +import kotlin.reflect.KClass + +/** + * 标记已过期的节点为已删除 + */ +@Component +@EnableConfigurationProperties(ExpiredBlockNodeMarkupJobProperties::class) +class ExpiredBlockNodeMarkupJob( + properties: ExpiredBlockNodeMarkupJobProperties, + private val blockNodeService: BlockNodeService +) : DefaultContextMongoDbJob(properties) { + + data class BlockNode( + val projectId: String, + val repoName: String, + val nodeFullPath: String, + val expireDate: LocalDateTime, + val deleted: LocalDateTime?, + val uploadId: String? + ) + + override fun getLockAtMostFor(): Duration = Duration.ofDays(1) + + override fun collectionNames(): List { + val collectionNames = mutableListOf() + for (i in 0 until SHARDING_COUNT) { + collectionNames.add("$COLLECTION_NAME_PREFIX$i") + } + return collectionNames + } + + override fun buildQuery(): Query { + return Query.query( + where(BlockNode::expireDate) + .lt(LocalDateTime.now()) + .and(BlockNode::deleted).isEqualTo(null) + .and(BlockNode::uploadId).ne(null) + ) + } + + override fun mapToEntity(row: Map): BlockNode { + return BlockNode( + row[BlockNode::projectId.name].toString(), + row[BlockNode::repoName.name].toString(), + row[BlockNode::nodeFullPath.name].toString(), + TimeUtils.parseMongoDateTimeStr(row[BlockNode::expireDate.name].toString())!!, + TimeUtils.parseMongoDateTimeStr(row[BlockNode::deleted.name].toString()), + row[BlockNode::uploadId.name].toString(), + ) + } + + override fun entityClass(): KClass { + return BlockNode::class + } + + override fun run(row: BlockNode, collectionName: String, context: JobContext) { + try { + blockNodeService.deleteBlocks(row.projectId, row.repoName, row.nodeFullPath, row.uploadId) + } catch (e: Exception) { + logger.warn("delete expired block node[$row] failed: $e") + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ExpiredBlockNodeMarkupJob::class.java) + private const val COLLECTION_NAME_PREFIX = "block_node_" + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ExpiredBlockNodeMarkupJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ExpiredBlockNodeMarkupJobProperties.kt new file mode 100644 index 0000000000..0a4df03765 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ExpiredBlockNodeMarkupJobProperties.kt @@ -0,0 +1,8 @@ +package com.tencent.bkrepo.job.config.properties + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(value = "job.expired-block-node-markup") +class ExpiredBlockNodeMarkupJobProperties( + override var cron: String = "0 0 0/6 * * ?" +) : MongodbJobProperties() diff --git a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/service/NodeCreateRequest.kt b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/service/NodeCreateRequest.kt index f9f23a53c8..5cb327e0a1 100644 --- a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/service/NodeCreateRequest.kt +++ b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/service/NodeCreateRequest.kt @@ -73,5 +73,7 @@ data class NodeCreateRequest( override val createdBy: String? = null, override var createdDate: LocalDateTime? = null, override val lastModifiedBy: String? = null, - override var lastModifiedDate: LocalDateTime? = null + override var lastModifiedDate: LocalDateTime? = null, + @ApiModelProperty("是否SEPARATE_UPLOAD") + val separate: Boolean = false ) : NodeRequest, ServiceRequest, AuditableRequest