1
1
import * as Sentry from "@sentry/node" ;
2
- import { PrismaClient , Repo , RepoPermissionSyncStatus } from "@sourcebot/db" ;
2
+ import { PrismaClient , Repo , RepoPermissionSyncJobStatus } from "@sourcebot/db" ;
3
3
import { createLogger } from "@sourcebot/logger" ;
4
4
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type" ;
5
5
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type" ;
@@ -10,16 +10,16 @@ import { Redis } from 'ioredis';
10
10
import { env } from "./env.js" ;
11
11
import { createOctokitFromConfig , getUserIdsWithReadAccessToRepo } from "./github.js" ;
12
12
import { RepoWithConnections } from "./types.js" ;
13
+ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "./constants.js" ;
13
14
14
15
type RepoPermissionSyncJob = {
15
- repoId : number ;
16
+ jobId : string ;
16
17
}
17
18
18
19
const QUEUE_NAME = 'repoPermissionSyncQueue' ;
19
20
20
- const logger = createLogger ( 'permission-syncer' ) ;
21
+ const logger = createLogger ( 'repo- permission-syncer' ) ;
21
22
22
- const SUPPORTED_CODE_HOST_TYPES = [ 'github' ] ;
23
23
24
24
export class RepoPermissionSyncer {
25
25
private queue : Queue < RepoPermissionSyncJob > ;
@@ -46,47 +46,55 @@ export class RepoPermissionSyncer {
46
46
return setInterval ( async ( ) => {
47
47
// @todo : make this configurable
48
48
const thresholdDate = new Date ( Date . now ( ) - 1000 * 60 * 60 * 24 ) ;
49
+
49
50
const repos = await this . db . repo . findMany ( {
50
51
// Repos need their permissions to be synced against the code host when...
51
52
where : {
52
53
// They belong to a code host that supports permissions syncing
53
54
AND : [
54
55
{
55
56
external_codeHostType : {
56
- in : SUPPORTED_CODE_HOST_TYPES ,
57
+ in : PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES ,
57
58
}
58
59
} ,
59
- // and, they either require a sync (SYNC_NEEDED) or have been in a completed state (SYNCED or FAILED)
60
- // for > some duration (default 24 hours)
61
60
{
62
61
OR : [
63
- {
64
- permissionSyncStatus : RepoPermissionSyncStatus . SYNC_NEEDED
65
- } ,
66
- {
67
- AND : [
68
- {
69
- OR : [
70
- { permissionSyncStatus : RepoPermissionSyncStatus . SYNCED } ,
71
- { permissionSyncStatus : RepoPermissionSyncStatus . FAILED } ,
72
- ]
73
- } ,
74
- {
75
- OR : [
76
- { permissionSyncJobLastCompletedAt : null } ,
77
- { permissionSyncJobLastCompletedAt : { lt : thresholdDate } }
78
- ]
79
- }
80
- ]
62
+ { permissionSyncedAt : null } ,
63
+ { permissionSyncedAt : { lt : thresholdDate } } ,
64
+ ] ,
65
+ } ,
66
+ {
67
+ NOT : {
68
+ permissionSyncJobs : {
69
+ some : {
70
+ OR : [
71
+ // Don't schedule if there are active jobs
72
+ {
73
+ status : {
74
+ in : [
75
+ RepoPermissionSyncJobStatus . PENDING ,
76
+ RepoPermissionSyncJobStatus . IN_PROGRESS ,
77
+ ] ,
78
+ }
79
+ } ,
80
+ // Don't schedule if there are recent failed jobs (within the threshold date). Note `gt` is used here since this is a inverse condition.
81
+ {
82
+ AND : [
83
+ { status : RepoPermissionSyncJobStatus . FAILED } ,
84
+ { completedAt : { gt : thresholdDate } } ,
85
+ ]
86
+ }
87
+ ]
88
+ }
81
89
}
82
- ]
90
+ }
83
91
} ,
84
92
]
85
93
}
86
94
} ) ;
87
95
88
96
await this . schedulePermissionSync ( repos ) ;
89
- } , 1000 * 30 ) ;
97
+ } , 1000 * 5 ) ;
90
98
}
91
99
92
100
public dispose ( ) {
@@ -96,15 +104,16 @@ export class RepoPermissionSyncer {
96
104
97
105
private async schedulePermissionSync ( repos : Repo [ ] ) {
98
106
await this . db . $transaction ( async ( tx ) => {
99
- await tx . repo . updateMany ( {
100
- where : { id : { in : repos . map ( repo => repo . id ) } } ,
101
- data : { permissionSyncStatus : RepoPermissionSyncStatus . IN_SYNC_QUEUE } ,
107
+ const jobs = await tx . repoPermissionSyncJob . createManyAndReturn ( {
108
+ data : repos . map ( repo => ( {
109
+ repoId : repo . id ,
110
+ } ) ) ,
102
111
} ) ;
103
112
104
- await this . queue . addBulk ( repos . map ( repo => ( {
113
+ await this . queue . addBulk ( jobs . map ( ( job ) => ( {
105
114
name : 'repoPermissionSyncJob' ,
106
115
data : {
107
- repoId : repo . id ,
116
+ jobId : job . id ,
108
117
} ,
109
118
opts : {
110
119
removeOnComplete : env . REDIS_REMOVE_ON_COMPLETE ,
@@ -115,21 +124,25 @@ export class RepoPermissionSyncer {
115
124
}
116
125
117
126
private async runJob ( job : Job < RepoPermissionSyncJob > ) {
118
- const id = job . data . repoId ;
119
- const repo = await this . db . repo . update ( {
127
+ const id = job . data . jobId ;
128
+ const { repo } = await this . db . repoPermissionSyncJob . update ( {
120
129
where : {
121
- id
130
+ id,
122
131
} ,
123
132
data : {
124
- permissionSyncStatus : RepoPermissionSyncStatus . SYNCING ,
133
+ status : RepoPermissionSyncJobStatus . IN_PROGRESS ,
125
134
} ,
126
- include : {
127
- connections : {
135
+ select : {
136
+ repo : {
128
137
include : {
129
- connection : true ,
130
- } ,
131
- } ,
132
- } ,
138
+ connections : {
139
+ include : {
140
+ connection : true ,
141
+ }
142
+ }
143
+ }
144
+ }
145
+ }
133
146
} ) ;
134
147
135
148
if ( ! repo ) {
@@ -171,34 +184,43 @@ export class RepoPermissionSyncer {
171
184
return [ ] ;
172
185
} ) ( ) ;
173
186
174
- await this . db . repo . update ( {
175
- where : {
176
- id : repo . id ,
177
- } ,
178
- data : {
179
- permittedUsers : {
180
- deleteMany : { } ,
187
+ await this . db . $transaction ( [
188
+ this . db . repo . update ( {
189
+ where : {
190
+ id : repo . id ,
191
+ } ,
192
+ data : {
193
+ permittedUsers : {
194
+ deleteMany : { } ,
195
+ }
181
196
}
182
- }
183
- } ) ;
184
-
185
- await this . db . userToRepoPermission . createMany ( {
186
- data : userIds . map ( userId => ( {
187
- userId,
188
- repoId : repo . id ,
189
- } ) ) ,
190
- } ) ;
197
+ } ) ,
198
+ this . db . userToRepoPermission . createMany ( {
199
+ data : userIds . map ( userId => ( {
200
+ userId,
201
+ repoId : repo . id ,
202
+ } ) ) ,
203
+ } )
204
+ ] ) ;
191
205
}
192
206
193
207
private async onJobCompleted ( job : Job < RepoPermissionSyncJob > ) {
194
- const repo = await this . db . repo . update ( {
208
+ const { repo } = await this . db . repoPermissionSyncJob . update ( {
195
209
where : {
196
- id : job . data . repoId ,
210
+ id : job . data . jobId ,
197
211
} ,
198
212
data : {
199
- permissionSyncStatus : RepoPermissionSyncStatus . SYNCED ,
200
- permissionSyncJobLastCompletedAt : new Date ( ) ,
213
+ status : RepoPermissionSyncJobStatus . COMPLETED ,
214
+ repo : {
215
+ update : {
216
+ permissionSyncedAt : new Date ( ) ,
217
+ }
218
+ } ,
219
+ completedAt : new Date ( ) ,
201
220
} ,
221
+ select : {
222
+ repo : true
223
+ }
202
224
} ) ;
203
225
204
226
logger . info ( `Permissions synced for repo ${ repo . displayName ?? repo . name } ` ) ;
@@ -207,21 +229,25 @@ export class RepoPermissionSyncer {
207
229
private async onJobFailed ( job : Job < RepoPermissionSyncJob > | undefined , err : Error ) {
208
230
Sentry . captureException ( err , {
209
231
tags : {
210
- repoId : job ?. data . repoId ,
232
+ jobId : job ?. data . jobId ,
211
233
queue : QUEUE_NAME ,
212
234
}
213
235
} ) ;
214
236
215
- const errorMessage = ( repoName : string ) => `Repo permission sync job failed for repo ${ repoName } : ${ err } ` ;
237
+ const errorMessage = ( repoName : string ) => `Repo permission sync job failed for repo ${ repoName } : ${ err . message } ` ;
216
238
217
239
if ( job ) {
218
- const repo = await this . db . repo . update ( {
240
+ const { repo } = await this . db . repoPermissionSyncJob . update ( {
219
241
where : {
220
- id : job ? .data . repoId ,
242
+ id : job . data . jobId ,
221
243
} ,
222
244
data : {
223
- permissionSyncStatus : RepoPermissionSyncStatus . FAILED ,
224
- permissionSyncJobLastCompletedAt : new Date ( ) ,
245
+ status : RepoPermissionSyncJobStatus . FAILED ,
246
+ completedAt : new Date ( ) ,
247
+ errorMessage : err . message ,
248
+ } ,
249
+ select : {
250
+ repo : true
225
251
} ,
226
252
} ) ;
227
253
logger . error ( errorMessage ( repo . displayName ?? repo . name ) ) ;
0 commit comments