11import { type Client , connect } from "./client" ;
22import { Patterns } from "./patterns" ;
3- import {
4- updateInterest ,
5- updateSticky ,
6- type InterestUpdate ,
7- type StickyUpdate ,
8- } from "@cocalc/conat/core/server" ;
3+ import { updateInterest , type InterestUpdate } from "@cocalc/conat/core/server" ;
94import type { DStream } from "@cocalc/conat/sync/dstream" ;
105import { once } from "@cocalc/util/async-utils" ;
116import { server as createPersistServer } from "@cocalc/conat/persist/server" ;
@@ -47,14 +42,12 @@ export async function clusterLink(
4742 return link ;
4843}
4944
50- export type Sticky = { [ pattern : string ] : { [ subject : string ] : string } } ;
5145export type Interest = Patterns < { [ queue : string ] : Set < string > } > ;
5246
5347export { type ClusterLink } ;
5448
5549class ClusterLink {
5650 public interest : Interest = new Patterns ( ) ;
57- public sticky : Sticky = { } ;
5851 private streams : ClusterStreams ;
5952 private state : "init" | "ready" | "closed" = "init" ;
6053 private clientStateChanged = Date . now ( ) ; // when client status last changed
@@ -85,10 +78,7 @@ class ClusterLink {
8578 clusterName : this . clusterName ,
8679 } ) ;
8780 for ( const update of this . streams . interest . getAll ( ) ) {
88- updateInterest ( update , this . interest , this . sticky ) ;
89- }
90- for ( const update of this . streams . sticky . getAll ( ) ) {
91- updateSticky ( update , this . sticky ) ;
81+ updateInterest ( update , this . interest ) ;
9282 }
9383 // I have a slight concern about this because updates might not
9484 // arrive in order during automatic failover. That said, maybe
@@ -97,7 +87,6 @@ class ClusterLink {
9787 // it is about, and when that server goes down none of this state
9888 // matters anymore.
9989 this . streams . interest . on ( "change" , this . handleInterestUpdate ) ;
100- this . streams . sticky . on ( "change" , this . handleStickyUpdate ) ;
10190 this . state = "ready" ;
10291 } ;
10392
@@ -106,11 +95,7 @@ class ClusterLink {
10695 } ;
10796
10897 handleInterestUpdate = ( update : InterestUpdate ) => {
109- updateInterest ( update , this . interest , this . sticky ) ;
110- } ;
111-
112- handleStickyUpdate = ( update : StickyUpdate ) => {
113- updateSticky ( update , this . sticky ) ;
98+ updateInterest ( update , this . interest ) ;
11499 } ;
115100
116101 private handleClientStateChanged = ( ) => {
@@ -134,7 +119,6 @@ class ClusterLink {
134119 if ( this . streams != null ) {
135120 this . streams . interest . removeListener ( "change" , this . handleInterestUpdate ) ;
136121 this . streams . interest . close ( ) ;
137- this . streams . sticky . close ( ) ;
138122 // @ts -ignore
139123 delete this . streams ;
140124 }
@@ -178,10 +162,9 @@ class ClusterLink {
178162 return false ;
179163 } ;
180164
181- hash = ( ) : { interest : number ; sticky : number } => {
165+ hash = ( ) : { interest : number } => {
182166 return {
183167 interest : hashInterest ( this . interest ) ,
184- sticky : hashSticky ( this . sticky ) ,
185168 } ;
186169 } ;
187170}
@@ -195,7 +178,6 @@ function clusterStreamNames({
195178} ) {
196179 return {
197180 interest : `cluster/${ clusterName } /${ id } /interest` ,
198- sticky : `cluster/${ clusterName } /${ id } /sticky` ,
199181 } ;
200182}
201183
@@ -225,7 +207,6 @@ export async function createClusterPersistServer({
225207
226208export interface ClusterStreams {
227209 interest : DStream < InterestUpdate > ;
228- sticky : DStream < StickyUpdate > ;
229210}
230211
231212export async function clusterStreams ( {
@@ -252,27 +233,21 @@ export async function clusterStreams({
252233 name : names . interest ,
253234 ...opts ,
254235 } ) ;
255- const sticky = await client . sync . dstream < StickyUpdate > ( {
256- noInventory : true ,
257- name : names . sticky ,
258- ...opts ,
259- } ) ;
260236 logger . debug ( "clusterStreams: got them" , { clusterName } ) ;
261- return { interest, sticky } ;
237+ return { interest } ;
262238}
263239
264240// Periodically delete not-necessary updates from the interest stream
265241export async function trimClusterStreams (
266242 streams : ClusterStreams ,
267243 data : {
268244 interest : Patterns < { [ queue : string ] : Set < string > } > ;
269- sticky : { [ pattern : string ] : { [ subject : string ] : string } } ;
270245 links : { interest : Patterns < { [ queue : string ] : Set < string > } > } [ ] ;
271246 } ,
272247 // don't delete anything that isn't at lest minAge ms old.
273248 minAge : number ,
274- ) : Promise < { seqsInterest : number [ ] ; seqsSticky : number [ ] } > {
275- const { interest, sticky } = streams ;
249+ ) : Promise < { seqsInterest : number [ ] } > {
250+ const { interest } = streams ;
276251 // First deal with interst
277252 // we iterate over the interest stream checking for subjects
278253 // with no current interest at all; in such cases it is safe
@@ -300,45 +275,7 @@ export async function trimClusterStreams(
300275 logger . debug ( "trimClusterStream: successfully trimmed interest" , { seqs } ) ;
301276 }
302277
303- // Next deal with sticky -- trim ones where the pattern is no longer of interest.
304- // There could be other reasons to trim but it gets much trickier. This one is more
305- // obvious, except we have to check for any interest in the whole cluster, not
306- // just this node.
307- const seqs2 : number [ ] = [ ] ;
308- function noInterest ( pattern : string ) {
309- if ( data . interest . hasPattern ( pattern ) ) {
310- return false ;
311- }
312- for ( const link of data . links ) {
313- if ( link . interest . hasPattern ( pattern ) ) {
314- return false ;
315- }
316- }
317- // nobody cares
318- return true ;
319- }
320- for ( let n = 0 ; n < sticky . length ; n ++ ) {
321- const time = sticky . time ( n ) ;
322- if ( time == null ) continue ;
323- if ( now - time . valueOf ( ) <= minAge ) {
324- break ;
325- }
326- const update = sticky . get ( n ) as StickyUpdate ;
327- if ( noInterest ( update . pattern ) ) {
328- const seq = sticky . seq ( n ) ;
329- if ( seq != null ) {
330- seqs2 . push ( seq ) ;
331- }
332- }
333- }
334- if ( seqs2 . length > 0 ) {
335- // [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
336- logger . debug ( "trimClusterStream: trimming sticky" , { seqs2 } ) ;
337- await sticky . delete ( { seqs : seqs2 } ) ;
338- logger . debug ( "trimClusterStream: successfully trimmed sticky" , { seqs2 } ) ;
339- }
340-
341- return { seqsInterest : seqs , seqsSticky : seqs2 } ;
278+ return { seqsInterest : seqs } ;
342279}
343280
344281function hashSet ( X : Set < string > ) : number {
@@ -362,15 +299,3 @@ export function hashInterest(
362299) : number {
363300 return interest . hash ( hashInterestValue ) ;
364301}
365-
366- export function hashSticky ( sticky : Sticky ) : number {
367- let h = 0 ;
368- for ( const pattern in sticky ) {
369- h += hash_string ( pattern ) ;
370- const x = sticky [ pattern ] ;
371- for ( const subject in x ) {
372- h += hash_string ( x [ subject ] ) ;
373- }
374- }
375- return h ;
376- }
0 commit comments