Skip to content

Commit

Permalink
Remove ICache (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jun 5, 2023
1 parent ccd7f75 commit d3aea5b
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed

- Remove explicit `net461` handling; minimum target now `net6.0` / `FSharp.Core` v `6.0.0` [#310](https://github.com/jet/equinox/pull/310) [#323](https://github.com/jet/equinox/pull/323) [#354](https://github.com/jet/equinox/pull/354)
- Remove `Equinox.Core.ICache` (there is/was only one impl, and the interface has changed as part of [#386](https://github.com/jet/equinox/pull/386)) [#389](https://github.com/jet/equinox/pull/389)

### Fixed

Expand Down
83 changes: 27 additions & 56 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
@@ -1,38 +1,10 @@
namespace Equinox.Core

open System
open System.Runtime.Caching
open System.Threading.Tasks

type [<NoEquality; NoComparison; Struct>] CacheItemOptions =
| AbsoluteExpiration of ae: DateTimeOffset
| RelativeExpiration of re: TimeSpan
module internal CacheItemOptions =
let toPolicy = function
| AbsoluteExpiration absolute -> CacheItemPolicy(AbsoluteExpiration = absolute)
| RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative)

type ICache =
abstract member Load: key: string
* maxAge: TimeSpan
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* loadOrReload: (struct (StreamToken * 'state) voption -> Task<struct (StreamToken * 'state)>)
-> Task<struct (StreamToken * 'state)>
abstract member Save: key: string
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* timestamp: int64
* token: StreamToken * state: 'state
-> unit

namespace Equinox

open Equinox.Core
open Equinox.Core.Tracing
open System

type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) =
type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) =
let mutable currentToken = initialToken
let mutable currentState = initialState
let mutable verifiedTimestamp = initialTimestamp
Expand Down Expand Up @@ -79,13 +51,13 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) =
| null -> ValueNone
| :? CacheEntry<'state> as existingEntry -> existingEntry.TryGetValue()
| x -> failwith $"tryLoad Incompatible cache entry %A{x}"
let addOrGet key options entry =
match inner.AddOrGetExisting(key, entry, CacheItemOptions.toPolicy options) with
let addOrGet key policy entry =
match inner.AddOrGetExisting(key, entry, policy = policy) with
| null -> Ok entry
| :? CacheEntry<'state> as existingEntry -> Error existingEntry
| x -> failwith $"addOrGet Incompatible cache entry %A{x}"
let getElseAddEmptyEntry key options =
match addOrGet key options (CacheEntry<'state>.CreateEmpty()) with
let getElseAddEmptyEntry key policy =
match addOrGet key policy (CacheEntry<'state>.CreateEmpty()) with
| Ok fresh -> fresh
| Error existingEntry -> existingEntry
let addOrMergeCacheEntry isStale key options timestamp struct (token, state) =
Expand All @@ -97,37 +69,36 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
Cache(new System.Runtime.Caching.MemoryCache(name, config))
interface ICache with
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member _.Load(key, maxAge, isStale, options, loadOrReload) = task {
let loadOrReload maybeBaseState () = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let! timestamp, res = loadOrReload maybeBaseState ()
addOrMergeCacheEntry isStale key options timestamp res
return res
else // ensure we have an entry in the cache for this key; coordinate retrieval through that
let cacheSlot = getElseAddEmptyEntry key options
return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) }
// Newer values get saved; equal values update the last retrieval timestamp
member _.Save(key, isStale, options, timestamp, token, state) =
addOrMergeCacheEntry isStale key options timestamp (token, state)
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member _.Load(key, maxAge, isStale, policy, loadOrReload) = task {
let loadOrReload maybeBaseState () = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let! timestamp, res = loadOrReload maybeBaseState ()
addOrMergeCacheEntry isStale key policy timestamp res
return res
else // ensure we have an entry in the cache for this key; coordinate retrieval through that
let cacheSlot = getElseAddEmptyEntry key policy
return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) }
// Newer values get saved; equal values update the last retrieval timestamp
member _.Save(key, isStale, policy, timestamp, token, state) =
addOrMergeCacheEntry isStale key policy timestamp (token, state)

type [<NoComparison; NoEquality; RequireQualifiedAccess>] CachingStrategy =
/// Retain a single 'state per streamName.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| SlidingWindow of ICache * window: TimeSpan
| SlidingWindow of Cache * window: TimeSpan
/// Retain a single 'state per streamName.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| FixedTimeSpan of ICache * period: TimeSpan
| FixedTimeSpan of Cache * period: TimeSpan
/// Prefix is used to segregate multiple folded states per stream when they are stored in the cache.
/// Semantics are otherwise identical to <c>SlidingWindow</c>.
| SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string
| SlidingWindowPrefixed of Cache * window: TimeSpan * prefix: string
17 changes: 8 additions & 9 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let private tee f (inner: CancellationToken -> Task<struct (StreamToken * 'state
return tokenAndState }

type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state> >
(category: 'cat, cache: ICache, isStale, createKey, createOptions) =
(category: 'cat, cache: Equinox.Cache, isStale, createKey, createOptions) =
interface ICategory<'event, 'state, 'context> with
member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task {
let loadOrReload = function
Expand All @@ -35,16 +35,15 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev
let private mkKey prefix streamName =
prefix + streamName

let private optionsSlidingExpiration (slidingExpiration: TimeSpan) () =
CacheItemOptions.RelativeExpiration slidingExpiration
let private optionsFixedTimeSpan (period: TimeSpan) () =
let private policySlidingExpiration (slidingExpiration: TimeSpan) () =
System.Runtime.Caching.CacheItemPolicy(SlidingExpiration = slidingExpiration)
let private policyFixedTimeSpan (period: TimeSpan) () =
let expirationPoint = let creationDate = DateTimeOffset.UtcNow in creationDate.Add period
CacheItemOptions.AbsoluteExpiration expirationPoint

System.Runtime.Caching.CacheItemPolicy(AbsoluteExpiration = expirationPoint)
let private mapStrategy = function
| Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> struct ( cache, mkKey null, optionsFixedTimeSpan period)
| Equinox.CachingStrategy.SlidingWindow (cache, window) -> cache, mkKey null, optionsSlidingExpiration window
| Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> cache, mkKey prefix, optionsSlidingExpiration window
| Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> struct ( cache, mkKey null, policyFixedTimeSpan period)
| Equinox.CachingStrategy.SlidingWindow (cache, window) -> cache, mkKey null, policySlidingExpiration window
| Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> cache, mkKey prefix, policySlidingExpiration window

let apply isStale x (cat: 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state>): ICategory<_, _, _> =
match x with
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,12 +1331,12 @@ type CachingStrategy =
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
| SlidingWindow of ICache * window: TimeSpan
| SlidingWindow of Equinox.Cache * window: TimeSpan
/// Retain a single 'state per streamName, together with the associated etag.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Typically combined with an `Equinox.LoadOption` to minimize loads.
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
| FixedTimeSpan of ICache * period: TimeSpan
| FixedTimeSpan of Equinox.Cache * period: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event, 'state> =
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.DynamoStore/DynamoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,12 +1287,12 @@ type CachingStrategy =
/// Unless a <c>LoadOption</c> is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match
// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the unfolds in Tip
| SlidingWindow of ICache * window: TimeSpan
| SlidingWindow of Equinox.Cache * window: TimeSpan
/// Retain a single 'state per streamName, together with the associated etag.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Typically combined with an `Equinox.LoadOption` to minimize loads.
/// Unless a <c>LoadOption</c> is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match
| FixedTimeSpan of ICache * period: TimeSpan
| FixedTimeSpan of Equinox.Cache * period: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event, 'state> =
Expand Down

0 comments on commit d3aea5b

Please sign in to comment.