1
- import { PrismaClient } from "@sourcebot/db" ;
1
+ import * as Sentry from "@sentry/node" ;
2
+ import { PrismaClient , Repo , RepoPermissionSyncStatus } from "@sourcebot/db" ;
2
3
import { createLogger } from "@sourcebot/logger" ;
3
4
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type" ;
4
5
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type" ;
5
6
import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type" ;
6
7
import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type" ;
7
8
import { Job , Queue , Worker } from 'bullmq' ;
8
9
import { Redis } from 'ioredis' ;
10
+ import { env } from "./env.js" ;
9
11
import { createOctokitFromConfig , getUserIdsWithReadAccessToRepo } from "./github.js" ;
10
12
import { RepoWithConnections } from "./types.js" ;
11
13
@@ -17,6 +19,8 @@ const QUEUE_NAME = 'repoPermissionSyncQueue';
17
19
18
20
const logger = createLogger ( 'permission-syncer' ) ;
19
21
22
+ const SUPPORTED_CODE_HOST_TYPES = [ 'github' ] ;
23
+
20
24
export class RepoPermissionSyncer {
21
25
private queue : Queue < RepoPermissionSyncJob > ;
22
26
private worker : Worker < RepoPermissionSyncJob > ;
@@ -30,48 +34,94 @@ export class RepoPermissionSyncer {
30
34
} ) ;
31
35
this . worker = new Worker < RepoPermissionSyncJob > ( QUEUE_NAME , this . runJob . bind ( this ) , {
32
36
connection : redis ,
37
+ concurrency : 1 ,
33
38
} ) ;
34
39
this . worker . on ( 'completed' , this . onJobCompleted . bind ( this ) ) ;
35
40
this . worker . on ( 'failed' , this . onJobFailed . bind ( this ) ) ;
36
41
}
37
42
38
- public async scheduleJob ( repoId : number ) {
39
- await this . queue . add ( QUEUE_NAME , {
40
- repoId,
41
- } ) ;
42
- }
43
-
44
43
public startScheduler ( ) {
45
44
logger . debug ( 'Starting scheduler' ) ;
46
45
47
- // @todo : we should only sync permissions for a repository if it has been at least ~24 hours since the last sync.
48
46
return setInterval ( async ( ) => {
47
+ // @todo : make this configurable
48
+ const thresholdDate = new Date ( Date . now ( ) - 1000 * 60 * 60 * 24 ) ;
49
49
const repos = await this . db . repo . findMany ( {
50
+ // Repos need their permissions to be synced against the code host when...
50
51
where : {
51
- external_codeHostType : {
52
- in : [ 'github' ] ,
53
- }
52
+ // They belong to a code host that supports permissions syncing
53
+ AND : [
54
+ {
55
+ external_codeHostType : {
56
+ in : SUPPORTED_CODE_HOST_TYPES ,
57
+ }
58
+ } ,
59
+ // and, they either require a sync (SYNC_NEEDED) or have been in a completed state (SYNCED or FAILED)
60
+ // for > some duration (default 24 hours)
61
+ {
62
+ OR : [
63
+ {
64
+ permissionSyncStatus : RepoPermissionSyncStatus . SYNC_NEEDED
65
+ } ,
66
+ {
67
+ AND : [
68
+ {
69
+ OR : [
70
+ { permissionSyncStatus : RepoPermissionSyncStatus . SYNCED } ,
71
+ { permissionSyncStatus : RepoPermissionSyncStatus . FAILED } ,
72
+ ]
73
+ } ,
74
+ {
75
+ OR : [
76
+ { permissionSyncJobLastCompletedAt : null } ,
77
+ { permissionSyncJobLastCompletedAt : { lt : thresholdDate } }
78
+ ]
79
+ }
80
+ ]
81
+ }
82
+ ]
83
+ } ,
84
+ ]
54
85
}
55
86
} ) ;
56
87
57
- for ( const repo of repos ) {
58
- await this . scheduleJob ( repo . id ) ;
59
- }
60
-
61
- // @todo : make this configurable
62
- } , 1000 * 60 ) ;
88
+ await this . schedulePermissionSync ( repos ) ;
89
+ } , 1000 * 30 ) ;
63
90
}
64
91
65
92
public dispose ( ) {
66
93
this . worker . close ( ) ;
67
94
this . queue . close ( ) ;
68
95
}
69
96
97
+ private async schedulePermissionSync ( repos : Repo [ ] ) {
98
+ await this . db . $transaction ( async ( tx ) => {
99
+ await tx . repo . updateMany ( {
100
+ where : { id : { in : repos . map ( repo => repo . id ) } } ,
101
+ data : { permissionSyncStatus : RepoPermissionSyncStatus . IN_SYNC_QUEUE } ,
102
+ } ) ;
103
+
104
+ await this . queue . addBulk ( repos . map ( repo => ( {
105
+ name : 'repoPermissionSyncJob' ,
106
+ data : {
107
+ repoId : repo . id ,
108
+ } ,
109
+ opts : {
110
+ removeOnComplete : env . REDIS_REMOVE_ON_COMPLETE ,
111
+ removeOnFail : env . REDIS_REMOVE_ON_FAIL ,
112
+ }
113
+ } ) ) )
114
+ } ) ;
115
+ }
116
+
70
117
private async runJob ( job : Job < RepoPermissionSyncJob > ) {
71
118
const id = job . data . repoId ;
72
- const repo = await this . db . repo . findUnique ( {
119
+ const repo = await this . db . repo . update ( {
73
120
where : {
74
- id,
121
+ id
122
+ } ,
123
+ data : {
124
+ permissionSyncStatus : RepoPermissionSyncStatus . SYNCING ,
75
125
} ,
76
126
include : {
77
127
connections : {
@@ -86,6 +136,8 @@ export class RepoPermissionSyncer {
86
136
throw new Error ( `Repo ${ id } not found` ) ;
87
137
}
88
138
139
+ logger . info ( `Syncing permissions for repo ${ repo . displayName } ...` ) ;
140
+
89
141
const connection = getFirstConnectionWithToken ( repo ) ;
90
142
if ( ! connection ) {
91
143
throw new Error ( `No connection with token found for repo ${ id } ` ) ;
@@ -119,8 +171,6 @@ export class RepoPermissionSyncer {
119
171
return [ ] ;
120
172
} ) ( ) ;
121
173
122
- logger . info ( `User IDs with read access to repo ${ id } : ${ userIds } ` ) ;
123
-
124
174
await this . db . repo . update ( {
125
175
where : {
126
176
id : repo . id ,
@@ -141,11 +191,43 @@ export class RepoPermissionSyncer {
141
191
}
142
192
143
193
private async onJobCompleted ( job : Job < RepoPermissionSyncJob > ) {
144
- logger . info ( `Repo permission sync job completed for repo ${ job . data . repoId } ` ) ;
194
+ const repo = await this . db . repo . update ( {
195
+ where : {
196
+ id : job . data . repoId ,
197
+ } ,
198
+ data : {
199
+ permissionSyncStatus : RepoPermissionSyncStatus . SYNCED ,
200
+ permissionSyncJobLastCompletedAt : new Date ( ) ,
201
+ } ,
202
+ } ) ;
203
+
204
+ logger . info ( `Permissions synced for repo ${ repo . displayName ?? repo . name } ` ) ;
145
205
}
146
206
147
207
private async onJobFailed ( job : Job < RepoPermissionSyncJob > | undefined , err : Error ) {
148
- logger . error ( `Repo permission sync job failed for repo ${ job ?. data . repoId } : ${ err } ` ) ;
208
+ Sentry . captureException ( err , {
209
+ tags : {
210
+ repoId : job ?. data . repoId ,
211
+ queue : QUEUE_NAME ,
212
+ }
213
+ } ) ;
214
+
215
+ const errorMessage = ( repoName : string ) => `Repo permission sync job failed for repo ${ repoName } : ${ err } ` ;
216
+
217
+ if ( job ) {
218
+ const repo = await this . db . repo . update ( {
219
+ where : {
220
+ id : job ?. data . repoId ,
221
+ } ,
222
+ data : {
223
+ permissionSyncStatus : RepoPermissionSyncStatus . FAILED ,
224
+ permissionSyncJobLastCompletedAt : new Date ( ) ,
225
+ } ,
226
+ } ) ;
227
+ logger . error ( errorMessage ( repo . displayName ?? repo . name ) ) ;
228
+ } else {
229
+ logger . error ( errorMessage ( 'unknown repo (id not found)' ) ) ;
230
+ }
149
231
}
150
232
}
151
233
0 commit comments