Skip to content

Conversation

@Naoki427
Copy link
Contributor

@Naoki427 Naoki427 commented Nov 13, 2025

Task

https://redmine.weseek.co.jp/issues/172033

変更点

  • エクスポート処理(exportAuditLogsToFsAsync.ts)、アップロード処理(compress-and-upload.ts)の追加
  • それぞれの処理に必要なAuditLogBulkExportJobCronServiceクラス内の変数や関数、またエラークラス(error.ts)の追加
    • tmpOutputRootDir ファイルをエクスポートするパス
    • pageBatchSize DB から一度に何件フェッチするかのcursor batch size
    • maxLogsPerFile 1ファイルに含めるログの数
    • compressFormat ダウンロードするファイルの形式
    • compressLevel zipの圧縮レベル
    • streamInExecutionMemo 現在走っているストリームの管理
  • jobとattachmentを結びつけられるようauditLogBulkExportJobSchemaにattachmentを追加
  • restart時の処理の追加
  • 条件を満たすアクティビティが見つからなかったことを知らせるアクション(ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS)の追加
  • AuditLogBulkExportJobCronServiceの統合テストの追加

テストの内容

1. Basic Operations(正常系)

1-1. フィルターなしの一連の処理(export → zip → upload)

1-2. フィルターあり(action / date / users)でのエクスポート

1-3. フィルターにヒットしない場合の挙動(No results 通知)

2. Resumability(途中再開)

2-1. lastExportedId が設定されている場合の再開動作

2-2. totalExportedCount / lastExportedId の更新が正しく行われるか

3. Upload / Compression

3-1. 生成される ZIP ファイルの妥当性(root に JSON が入っているか)

3-2. アップロード失敗時に正しくエラーハンドリング・クリーンアップされるか

4. Error Handling

4-1. 存在しないユーザー ID がフィルターに含まれていた場合の処理

4-2. 無効なファイルパス(tmpOutputRootDir)指定時の FS エラー処理
※本番ではパスは動的変更しないため coverage 用のテスト

4-3. ExpiredError / RestartedError 発生時の状態遷移とクリーンアップ

5. State / Stream Control

5-1. exporting → uploading → completed のステータス遷移確認

5-2. 各処理フェーズにおける streamInExecution の登録・破棄の確認

5-3. restartFlag が有効な場合の proceedBulkExportJob の動き

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements the core export and upload functionality for the audit log bulk export feature. The implementation includes streaming export of audit logs to temporary JSON files, compression into ZIP archives, and upload to cloud storage, with support for filtering, resumability, and comprehensive error handling.

Key Changes:

  • Implemented streaming export of audit logs to JSON files with batching and resumability support
  • Added compression and upload pipeline using archiver to create ZIP files
  • Extended the cron service with stream lifecycle management, restart handling, and resource cleanup
  • Added comprehensive integration tests covering happy path, error scenarios, and edge cases

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
apps/app/src/server/interfaces/attachment.ts Added AUDIT_LOG_BULK_EXPORT attachment type and storage prefix
apps/app/src/interfaces/activity.ts Added ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS action for no-results notification
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/exportAuditLogsToFsAsync.ts Implemented streaming export with query filtering, file batching, and resumability
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/compress-and-upload.ts Implemented ZIP compression and upload with error handling
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/index.ts Enhanced cron service with stream management, restart logic, and cleanup functionality
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/errors.ts Added custom error classes for expired and restarted jobs
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/audit-log-bulk-export-job-cron-service.integ.ts Added comprehensive integration tests for export, upload, error handling, and state management
apps/app/src/features/audit-log-bulk-export/server/models/audit-log-bulk-export-job.ts Added attachment reference field to link jobs with uploaded files
apps/app/src/features/audit-log-bulk-export/interfaces/audit-log-bulk-export.ts Added attachment field to job interface

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +231 to +234
err: Error | null,
auditLogBulkExportJob: AuditLogBulkExportJobDocument,
) {
if (err == null) return;
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The handleError method signature includes err: Error | null but only checks for null (line 234). However, the pipeline callback in exportAuditLogsToFsAsync.ts (line 142) will never pass null - it will either pass an Error or not call the handler. Consider removing the null check or documenting why null handling is needed.

Suggested change
err: Error | null,
auditLogBulkExportJob: AuditLogBulkExportJobDocument,
) {
if (err == null) return;
err: Error,
auditLogBulkExportJob: AuditLogBulkExportJobDocument,
) {

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineは正常終了でもerr = nullを渡すのでhandleErrorはnullを受け取れる必要があります

Comment on lines 246 to 247
await cronService.proceedBulkExportJob(job);
await new Promise((resolve) => setTimeout(resolve, 100));
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test relies on setTimeout with fixed delays (100ms, 200ms) to wait for async operations to complete. This approach is brittle and can lead to flaky tests, especially in CI environments or under load. Consider using a polling mechanism or event-based waiting to ensure operations have actually completed before making assertions.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応

const hasAny = await Activity.exists(query);
if (!hasAny) {
job.totalExportedCount = 0;
job.status = AuditLogBulkExportJobStatus.completed;
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] In the no-results case, the job status is set to completed and totalExportedCount is set to 0, but lastExportedId is not cleared. For consistency with the restart logic (line 121 in index.ts) and to avoid potential issues on subsequent operations, consider explicitly setting job.lastExportedId = undefined here.

Suggested change
job.status = AuditLogBulkExportJobStatus.completed;
job.status = AuditLogBulkExportJobStatus.completed;
job.lastExportedId = undefined;

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応

Comment on lines 29 to 30
if (err.code === 'ENOENT') logger.error(err);
else throw err;
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archiver warning handler distinguishes between ENOENT errors and other errors, but both paths use different error handling strategies (logger.error vs throw). This inconsistency could make debugging difficult. Additionally, throwing an error from an event handler can lead to uncaught exceptions. Consider using consistent error handling or emitting an 'error' event instead.

Suggested change
if (err.code === 'ENOENT') logger.error(err);
else throw err;
if (err.code === 'ENOENT') {
logger.error(err);
} else {
auditLogArchiver.emit('error', err);
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応

Comment on lines 108 to 110
auditLogBulkExportJobInProgress.forEach((auditLogBulkExportJob) => {
this.proceedBulkExportJob(auditLogBulkExportJob);
});
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proceedBulkExportJob method is called with forEach but not awaited (line 109). This means the async function executions are not tracked, and any errors thrown won't be properly handled. The previous implementation used Promise.all which was more appropriate. Consider reverting to Promise.all or adding proper error handling for each async call.

Suggested change
auditLogBulkExportJobInProgress.forEach((auditLogBulkExportJob) => {
this.proceedBulkExportJob(auditLogBulkExportJob);
});
await Promise.all(
auditLogBulkExportJobInProgress.map((auditLogBulkExportJob) => this.proceedBulkExportJob(auditLogBulkExportJob)),
);

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応

JSON.stringify(buffer, null, 2),
);

await job.save();
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job.save() operation is called within the stream write callback on every iteration (line 43). This could lead to significant database overhead and performance issues, especially for large datasets. Consider batching the saves (e.g., only save after writing a file) or using a different approach to persist progress periodically rather than on every single log entry.

Suggested change
await job.save();
// Removed job.save() here to avoid excessive DB writes; will save in final callback.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job.save() はログ 1 件ごとではなく JSON ファイル単位でのみ実行しているので、大規模データでも DB 負荷を抑えられている認識です

Comment on lines +48 to +51
} catch (err) {
callback(err as Error);
return;
}
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The error handling in the writable stream callbacks (lines 48-51 and 69-71) calls callback(err as Error) but does not prevent further execution. While return is called after the callback, this pattern could still allow the stream to continue processing after an error. Consider ensuring the stream is properly destroyed on error.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline は自動でストリームを destroy() する仕様なので問題ない認識です

await fileUploadService.uploadAttachment(auditLogArchiver, attachment);
} catch (e) {
logger.error(e);
await this.handleError(e as Error, job);
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The error handling logs the error and calls handleError, but then returns without updating the job status. If handleError fails or doesn't properly update the job status, the job could be left in an inconsistent state (status=uploading but actually failed). Consider ensuring the job status is reliably updated to failed before returning.

Suggested change
await this.handleError(e as Error, job);
try {
await this.handleError(e as Error, job);
} catch (handleErrorErr) {
logger.error('Error in handleError:', handleErrorErr);
}
job.status = AuditLogBulkExportJobStatus.failed;
await job.save();

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応

auditLogBulkExportJob.status = AuditLogBulkExportJobStatus.exporting;
auditLogBulkExportJob.lastExportedId = undefined;
auditLogBulkExportJob.totalExportedCount = 0;
await auditLogBulkExportJob.save();
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] When restartFlag is true, the function returns early after cleanup (line 124) without re-invoking proceedBulkExportJob. This means the restarted job won't actually start processing until the next cron tick. Consider either documenting this behavior or immediately re-processing the job after restart cleanup.

Suggested change
await auditLogBulkExportJob.save();
await auditLogBulkExportJob.save();
// Immediately proceed with the job after restart cleanup
await this.proceedBulkExportJob(auditLogBulkExportJob);

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restartFlag が立っていたら

  • まずクリーンアップして
  • state をリセットして exporting に戻して
  • そこで一旦終わる

という挙動の方が、proceedBulkExportJob の「与えられた状態を1ステップだけ進める」という責務が明確になること、またテストもしやすいこともあってあえてこうしています


compressLevel = 6;

private streamInExecutionMemo: { [key: string]: Readable } = {};
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The streamInExecutionMemo is a plain object used as a dictionary, but the keys are converted to strings (line 161, 168, 175). Consider using a Map<string, Readable> instead for better type safety and more idiomatic code.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

今回の streamInExecutionMemo は単純なメモ用途であり、既存のpage-bulk-exportの実装を踏襲しているため、オブジェクトでも問題ない認識です

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants