Skip to content

Commit 43ed33c

Browse files
committed
Enh 38305584 - [38274361->25.09] Topics should have a metric for remaining messages
(merge main -> ce/main 118365) [git-p4: depot-paths = "//dev/coherence-ce/main/": change = 118366]
1 parent afbb4df commit 43ed33c

29 files changed

+1956
-110
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/daemon/queueProcessor/service/grid/partitionedService/partitionedCache/PagedTopic.java

Lines changed: 416 additions & 12 deletions
Large diffs are not rendered by default.

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeService/safeCacheService/safeDistributedCacheService/SafePagedTopicService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,16 @@ public void removeSubscriptionListener(PagedTopicSubscription.Listener listener)
317317
public void setTopicBackingMapManager(TopicBackingMapManager manager)
318318
{
319319
}
320+
321+
@Override
322+
public int getCurrentClusterTopicsApiVersion()
323+
{
324+
return getRunningTopicService().getCurrentClusterTopicsApiVersion();
325+
}
326+
327+
@Override
328+
public int getRemainingMessages(String sTopic, SubscriberGroupId subscriberGroupId, int... anChannel)
329+
{
330+
return getRunningTopicService().getRemainingMessages(sTopic, subscriberGroupId, anChannel);
331+
}
320332
}

prj/coherence-core/src/main/java/com/tangosol/coherence/config/scheme/PagedTopicStorageScheme.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
/*
2-
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77
package com.tangosol.coherence.config.scheme;
88

99
import com.tangosol.config.expression.ParameterResolver;
1010

11+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicBackingMap;
12+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
13+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicContentBackingMap;
14+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriptionsBackingMap;
15+
16+
import com.tangosol.net.BackingMapManagerContext;
17+
1118
import com.tangosol.net.NamedCache;
19+
import com.tangosol.net.PagedTopicService;
20+
import com.tangosol.net.TopicService;
21+
22+
import com.tangosol.util.ObservableMap;
23+
import com.tangosol.util.WrapperObservableMap;
1224

1325
import java.util.Map;
1426

@@ -40,9 +52,20 @@ public PagedTopicStorageScheme(CachingScheme schemeStorage, PagedTopicScheme top
4052
@Override
4153
public Map realizeMap(ParameterResolver resolver, Dependencies dependencies)
4254
{
43-
f_schemeTopic.ensureConfiguredService(resolver, dependencies);
55+
PagedTopicService service = (PagedTopicService) f_schemeTopic.ensureConfiguredService(resolver, dependencies);
56+
ObservableMap map = (ObservableMap) super.realizeMap(resolver, dependencies);
57+
String sName = (String) resolver.resolve("cache-name").getExpression().evaluate(resolver);
4458

45-
return super.realizeMap(resolver, dependencies);
59+
if (PagedTopicCaches.Names.SUBSCRIPTIONS.isA(sName))
60+
{
61+
BackingMapManagerContext context = ((PagedTopicService) service).getBackingMapManager().getContext();
62+
map = new PagedTopicSubscriptionsBackingMap(map, context);
63+
}
64+
else if (PagedTopicCaches.Names.CONTENT.isA(sName))
65+
{
66+
map = new PagedTopicContentBackingMap(map);
67+
}
68+
return map;
4669
}
4770

4871
@Override
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package com.tangosol.internal.net.topic.impl.paged;
9+
10+
import com.tangosol.net.cache.ConfigurableCacheMap;
11+
12+
import com.tangosol.util.ObservableMap;
13+
14+
import com.tangosol.util.WrapperObservableMap;
15+
16+
import java.util.Collection;
17+
18+
/**
19+
* A base class for backing map implementation used by
20+
* paged topic caches.
21+
*
22+
* @author Jonathan Knight 2022.08.11
23+
*/
24+
@SuppressWarnings("rawtypes")
25+
public abstract class PagedTopicBackingMap
26+
extends WrapperObservableMap
27+
implements ConfigurableCacheMap
28+
{
29+
/**
30+
* Create a {@link PagedTopicBackingMap}.
31+
*
32+
* @param map the backing map to delegate to
33+
*/
34+
@SuppressWarnings("unchecked")
35+
protected PagedTopicBackingMap(ObservableMap map)
36+
{
37+
super(map);
38+
if (!(map instanceof ConfigurableCacheMap))
39+
{
40+
throw new IllegalArgumentException("map must implement ConfigurableCacheMap");
41+
}
42+
}
43+
44+
/**
45+
* Return the wrapped {@link ConfigurableCacheMap}.
46+
*
47+
* @return the wrapped {@link ConfigurableCacheMap}
48+
*/
49+
protected ConfigurableCacheMap getConfigurableCacheMap()
50+
{
51+
return (ConfigurableCacheMap) getMap();
52+
}
53+
54+
@Override
55+
public int getUnits()
56+
{
57+
return getConfigurableCacheMap().getUnits();
58+
}
59+
60+
@Override
61+
public int getHighUnits()
62+
{
63+
return getConfigurableCacheMap().getHighUnits();
64+
}
65+
66+
@Override
67+
public void setHighUnits(int cMax)
68+
{
69+
getConfigurableCacheMap().setHighUnits(cMax);
70+
}
71+
72+
@Override
73+
public int getLowUnits()
74+
{
75+
return getConfigurableCacheMap().getLowUnits();
76+
}
77+
78+
@Override
79+
public void setLowUnits(int cUnits)
80+
{
81+
getConfigurableCacheMap().setLowUnits(cUnits);
82+
}
83+
84+
@Override
85+
public int getUnitFactor()
86+
{
87+
return getConfigurableCacheMap().getUnitFactor();
88+
}
89+
90+
@Override
91+
public void setUnitFactor(int nFactor)
92+
{
93+
getConfigurableCacheMap().setUnitFactor(nFactor);
94+
}
95+
96+
@Override
97+
public void evict(Object oKey)
98+
{
99+
getConfigurableCacheMap().evict(oKey);
100+
}
101+
102+
@Override
103+
public void evictAll(Collection colKeys)
104+
{
105+
getConfigurableCacheMap().evictAll(colKeys);
106+
}
107+
108+
@Override
109+
public void evict()
110+
{
111+
getConfigurableCacheMap().evict();
112+
}
113+
114+
@Override
115+
public EvictionApprover getEvictionApprover()
116+
{
117+
return getConfigurableCacheMap().getEvictionApprover();
118+
}
119+
120+
@Override
121+
public void setEvictionApprover(EvictionApprover approver)
122+
{
123+
getConfigurableCacheMap().setEvictionApprover(approver);
124+
}
125+
126+
@Override
127+
public int getExpiryDelay()
128+
{
129+
return getConfigurableCacheMap().getExpiryDelay();
130+
}
131+
132+
@Override
133+
public void setExpiryDelay(int cMillis)
134+
{
135+
getConfigurableCacheMap().setExpiryDelay(cMillis);
136+
}
137+
138+
@Override
139+
public long getNextExpiryTime()
140+
{
141+
return getConfigurableCacheMap().getNextExpiryTime();
142+
}
143+
144+
@Override
145+
public ConfigurableCacheMap.Entry getCacheEntry(Object oKey)
146+
{
147+
return getConfigurableCacheMap().getCacheEntry(oKey);
148+
}
149+
150+
@Override
151+
public EvictionPolicy getEvictionPolicy()
152+
{
153+
return getConfigurableCacheMap().getEvictionPolicy();
154+
}
155+
156+
@Override
157+
public void setEvictionPolicy(EvictionPolicy policy)
158+
{
159+
getConfigurableCacheMap().setEvictionPolicy(policy);
160+
}
161+
162+
@Override
163+
public UnitCalculator getUnitCalculator()
164+
{
165+
return getConfigurableCacheMap().getUnitCalculator();
166+
}
167+
168+
@Override
169+
public void setUnitCalculator(UnitCalculator calculator)
170+
{
171+
getConfigurableCacheMap().setUnitCalculator(calculator);
172+
}
173+
174+
@Override
175+
@SuppressWarnings("unchecked")
176+
public Object put(Object key, Object value, long cMillis)
177+
{
178+
return getConfigurableCacheMap().put(key,value,cMillis);
179+
}
180+
}

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicBackingMapManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2022, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -10,6 +10,7 @@
1010

1111
import com.tangosol.config.expression.ParameterResolver;
1212

13+
import com.tangosol.internal.net.topic.impl.paged.statistics.PagedTopicServiceWithStatistics;
1314
import com.tangosol.internal.net.topic.impl.paged.statistics.PagedTopicStatistics;
1415

1516
import com.tangosol.net.ExtensibleConfigurableCacheFactory;
@@ -149,7 +150,8 @@ private PagedTopicDependencies createTopicDependencies(String sName)
149150

150151
private PagedTopicStatistics createStatistics(PagedTopicDependencies dependencies, String sTopicName)
151152
{
152-
return new PagedTopicStatistics(dependencies.getConfiguredChannelCount(), sTopicName);
153+
PagedTopicServiceWithStatistics service = (PagedTopicServiceWithStatistics) getContext().getCacheService();
154+
return new PagedTopicStatistics(service, dependencies.getConfiguredChannelCount(), sTopicName);
153155
}
154156

155157
private PagedTopicStatistics ensureStatistics(String sTopicName)

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,16 @@ public Set<NamedCache> getCaches()
991991
@SuppressWarnings("unchecked")
992992
public int getRemainingMessages(SubscriberGroupId id, int... anChannel)
993993
{
994+
if (f_topicService.getCurrentClusterTopicsApiVersion() >= PagedTopicService.TOPIC_API_v3)
995+
{
996+
return f_topicService.getRemainingMessages(f_sTopicName, id, anChannel);
997+
}
998+
999+
// if we get here the cluster is running a mixed version with members too old
1000+
// to use the more efficient remaining messages metric
1001+
Logger.info("Currently running a mixed version cluster that does not support the more efficient" +
1002+
" method to calculate remaining messages");
1003+
9941004
boolean fHasSubscription = Subscriptions.containsKey(new Subscription.Key(0, 0, id));
9951005
if (fHasSubscription || anChannel.length > 0)
9961006
{
@@ -1029,8 +1039,9 @@ public int getRemainingMessages(SubscriberGroupId id, int... anChannel)
10291039
.getValueFromInternalConverter().convert(bin)).intValue();
10301040
return fHasSubscription ? count : count + anChannel.length;
10311041
}
1042+
10321043
// subscriber group does not exist, return the total number of messages
1033-
return Data.size();
1044+
return 0;
10341045
}
10351046

10361047
// ----- object methods -------------------------------------------------

0 commit comments

Comments
 (0)