1
1
package org .hypertrace .core .graphql .attributes ;
2
2
3
- import com .google .common .cache .CacheBuilder ;
4
- import com .google .common .cache .CacheLoader ;
5
- import com .google .common .cache .LoadingCache ;
6
- import com .google .common .collect .ImmutableTable ;
7
- import com .google .common .collect .Table ;
8
- import io .reactivex .rxjava3 .core .Observable ;
9
3
import io .reactivex .rxjava3 .core .Single ;
10
- import java .util . Collection ;
4
+ import java .time . Duration ;
11
5
import java .util .List ;
12
- import java .util .Map ;
13
6
import java .util .NoSuchElementException ;
14
- import java .util .Optional ;
15
- import java .util .concurrent .TimeUnit ;
16
- import java .util .function .Function ;
17
- import java .util .stream .Collectors ;
18
7
import javax .inject .Inject ;
19
8
import javax .inject .Singleton ;
20
- import org .hypertrace .core .graphql . context . ContextualCachingKey ;
9
+ import org .hypertrace .core .attribute . service . cachingclient . CachingAttributeClient ;
21
10
import org .hypertrace .core .graphql .context .GraphQlRequestContext ;
11
+ import org .hypertrace .core .graphql .spi .config .GraphQlServiceConfig ;
12
+ import org .hypertrace .core .graphql .utils .grpc .GrpcChannelRegistry ;
13
+ import org .hypertrace .core .graphql .utils .grpc .GrpcContextBuilder ;
14
+ import org .hypertrace .core .grpcutils .client .rx .GrpcRxExecutionContext ;
22
15
23
16
@ Singleton
24
17
class CachingAttributeStore implements AttributeStore {
25
18
26
- private final AttributeClient attributeClient ;
19
+ private final CachingAttributeClient cachingAttributeClient ;
27
20
private final IdLookup idLookup ;
21
+ private final GrpcContextBuilder grpcContextBuilder ;
22
+ private final AttributeModelTranslator translator ;
28
23
29
24
@ Inject
30
- CachingAttributeStore (AttributeClient attributeClient , IdLookup idLookup ) {
31
- this .attributeClient = attributeClient ;
32
- this .idLookup = idLookup ;
25
+ CachingAttributeStore (
26
+ IdLookup idLookup ,
27
+ GrpcContextBuilder grpcContextBuilder ,
28
+ AttributeModelTranslator translator ,
29
+ GrpcChannelRegistry channelRegistry ,
30
+ GraphQlServiceConfig serviceConfig ) {
31
+ this (
32
+ idLookup ,
33
+ grpcContextBuilder ,
34
+ translator ,
35
+ CachingAttributeClient .builder (
36
+ channelRegistry .forAddress (
37
+ serviceConfig .getAttributeServiceHost (),
38
+ serviceConfig .getAttributeServicePort ()))
39
+ .withCacheExpiration (Duration .ofMinutes (5 ))
40
+ .withMaximumCacheContexts (1000 )
41
+ .build ());
33
42
}
34
43
35
- private final LoadingCache <ContextualCachingKey , Single <Table <String , String , AttributeModel >>>
36
- cache =
37
- CacheBuilder .newBuilder ()
38
- .maximumSize (1000 )
39
- .expireAfterWrite (15 , TimeUnit .MINUTES )
40
- .build (CacheLoader .from (this ::loadTable ));
41
-
42
- @ Override
43
- public Single <List <AttributeModel >> getAll (GraphQlRequestContext context ) {
44
- return this .getOrInvalidate (context ).map (table -> List .copyOf (table .values ()));
44
+ CachingAttributeStore (
45
+ IdLookup idLookup ,
46
+ GrpcContextBuilder grpcContextBuilder ,
47
+ AttributeModelTranslator translator ,
48
+ CachingAttributeClient cachingAttributeClient ) {
49
+ this .idLookup = idLookup ;
50
+ this .grpcContextBuilder = grpcContextBuilder ;
51
+ this .translator = translator ;
52
+ this .cachingAttributeClient = cachingAttributeClient ;
45
53
}
46
54
47
55
@ Override
48
- public Single <AttributeModel > get (GraphQlRequestContext context , String scope , String key ) {
49
- return this .getOrInvalidate (context )
50
- .mapOptional (table -> Optional .ofNullable (table .get (scope , key )))
51
- .switchIfEmpty (Single .error (this .buildErrorForMissingAttribute (scope , key )));
56
+ public Single <List <AttributeModel >> getAll (GraphQlRequestContext requestContext ) {
57
+ return GrpcRxExecutionContext .forContext (this .grpcContextBuilder .build (requestContext ))
58
+ .wrapSingle (this .cachingAttributeClient ::getAll )
59
+ .flattenAsObservable (list -> list )
60
+ .mapOptional (this .translator ::translate )
61
+ .toList ();
52
62
}
53
63
54
64
@ Override
55
- public Single <Map <String , AttributeModel >> get (
56
- GraphQlRequestContext context , String scope , Collection <String > keys ) {
57
- return this .getOrInvalidate (context )
58
- .flatMap (table -> this .getValuesOrError (scope , table .row (scope ), keys ));
65
+ public Single <AttributeModel > get (
66
+ GraphQlRequestContext requestContext , String scope , String key ) {
67
+ return GrpcRxExecutionContext .forContext (this .grpcContextBuilder .build (requestContext ))
68
+ .wrapSingle (() -> this .cachingAttributeClient .get (scope , key ))
69
+ .toMaybe ()
70
+ .mapOptional (this .translator ::translate )
71
+ .switchIfEmpty (Single .error (this .buildErrorForMissingAttribute (scope , key )));
59
72
}
60
73
61
74
@ Override
@@ -70,28 +83,6 @@ public Single<AttributeModel> getForeignIdAttribute(
70
83
.flatMap (key -> this .get (context , scope , key ));
71
84
}
72
85
73
- private Single <Table <String , String , AttributeModel >> loadTable (ContextualCachingKey cachingKey ) {
74
- return this .attributeClient
75
- .queryAll (cachingKey .getContext ())
76
- .toList ()
77
- .map (this ::buildTable )
78
- .cache ();
79
- }
80
-
81
- private Table <String , String , AttributeModel > buildTable (List <AttributeModel > attributes ) {
82
- return attributes .stream ()
83
- .collect (
84
- ImmutableTable .toImmutableTable (
85
- AttributeModel ::scope , AttributeModel ::key , Function .identity ()));
86
- }
87
-
88
- private Single <Table <String , String , AttributeModel >> getOrInvalidate (
89
- GraphQlRequestContext context ) {
90
- return this .cache
91
- .getUnchecked (context .getCachingKey ())
92
- .doOnError (x -> this .cache .invalidate (context .getCachingKey ()));
93
- }
94
-
95
86
private Single <String > getForeignIdKey (
96
87
GraphQlRequestContext context , String scope , String foreignScope ) {
97
88
return this .idLookup
@@ -106,19 +97,6 @@ private Single<String> getIdKey(GraphQlRequestContext context, String scope) {
106
97
.switchIfEmpty (Single .error (this .buildErrorForMissingIdMapping (scope )));
107
98
}
108
99
109
- private Single <Map <String , AttributeModel >> getValuesOrError (
110
- String scope ,
111
- Map <String , AttributeModel > definedAttributes ,
112
- Collection <String > requestedAttributeKeys ) {
113
- return Observable .fromIterable (requestedAttributeKeys )
114
- .flatMap (
115
- key ->
116
- definedAttributes .containsKey (key )
117
- ? Observable .just (definedAttributes .get (key ))
118
- : Observable .error (this .buildErrorForMissingAttribute (scope , key )))
119
- .collect (Collectors .toUnmodifiableMap (AttributeModel ::key , Function .identity ()));
120
- }
121
-
122
100
private NoSuchElementException buildErrorForMissingAttribute (String scope , String key ) {
123
101
return new NoSuchElementException (
124
102
String .format ("No attribute available for scope '%s' and key '%s'" , scope , key ));
0 commit comments