@@ -147,7 +147,8 @@ cdef class Portfolio(PortfolioFacade):
147
147
148
148
self._unrealized_pnls: dict[InstrumentId , Money] = {}
149
149
self._realized_pnls: dict[InstrumentId , Money] = {}
150
- self._snapshot_realized_pnls: dict[InstrumentId , Money] = {}
150
+ self._snapshot_sum_per_position: dict[PositionId , Money] = {}
151
+ self._snapshot_last_per_position: dict[PositionId , Money] = {}
151
152
self._snapshot_processed_counts: dict[PositionId , int] = {}
152
153
self._net_positions: dict[InstrumentId , Decimal] = {}
153
154
self._bet_positions: dict[InstrumentId , object] = {}
@@ -580,12 +581,9 @@ cdef class Portfolio(PortfolioFacade):
580
581
positions_open = positions_open,
581
582
)
582
583
583
- self ._realized_pnls[event.instrument_id] = self ._calculate_realized_pnl(
584
- instrument_id = event.instrument_id,
585
- )
586
- self ._unrealized_pnls[event.instrument_id] = self ._calculate_unrealized_pnl(
587
- instrument_id = event.instrument_id,
588
- )
584
+ # Invalidate cached PnLs - they will be recalculated on next request
585
+ self ._realized_pnls.pop(event.instrument_id, None )
586
+ self ._unrealized_pnls.pop(event.instrument_id, None )
589
587
590
588
cdef Account account = self ._cache.account(event.account_id)
591
589
@@ -684,7 +682,8 @@ cdef class Portfolio(PortfolioFacade):
684
682
self._realized_pnls.clear()
685
683
self._unrealized_pnls.clear()
686
684
self._pending_calcs.clear()
687
- self._snapshot_realized_pnls.clear()
685
+ self._snapshot_sum_per_position.clear()
686
+ self._snapshot_last_per_position.clear()
688
687
self._snapshot_processed_counts.clear()
689
688
self.analyzer.reset()
690
689
@@ -713,24 +712,15 @@ cdef class Portfolio(PortfolioFacade):
713
712
self._log.info("DISPOSED")
714
713
715
714
cdef void _ensure_snapshot_pnls_cached_for(self , InstrumentId instrument_id ):
716
- # Get all positions for this instrument (both open and closed)
717
- cdef set [PositionId] instrument_position_ids = self ._cache.position_ids(venue = None , instrument_id = instrument_id)
715
+ # Performance: This method maintains an incremental cache of snapshot PnLs
716
+ # It only unpickles new snapshots that haven't been processed yet
717
+ # Tracks sum and last PnL per position for efficient NETTING OMS support
718
718
719
- # Get snapshot position IDs for this instrument from index
719
+ # Get all position IDs that have snapshots for this instrument
720
720
cdef set [PositionId] snapshot_position_ids = self ._cache.position_snapshot_ids(instrument_id)
721
721
722
- # Combine all position IDs (active and snapshot)
723
- cdef set [PositionId] position_ids = set ()
724
-
725
- if instrument_position_ids:
726
- position_ids.update(instrument_position_ids)
727
-
728
- position_ids.update(snapshot_position_ids)
729
-
730
- if not position_ids:
731
- # Clear stale cached total when no positions or snapshots exist
732
- self ._snapshot_realized_pnls.pop(instrument_id, None )
733
- return # No positions or snapshots for this instrument
722
+ if not snapshot_position_ids:
723
+ return # Nothing to process
734
724
735
725
cdef bint rebuild = False
736
726
@@ -740,8 +730,8 @@ cdef class Portfolio(PortfolioFacade):
740
730
int prev_count
741
731
int curr_count
742
732
743
- # Detect purge/reset (count regression) to trigger full rebuild for this instrument
744
- for position_id in position_ids :
733
+ # Detect purge/reset (count regression) to trigger full rebuild
734
+ for position_id in snapshot_position_ids :
745
735
position_id_snapshots = self ._cache.position_snapshot_bytes(position_id)
746
736
curr_count = len (position_id_snapshots)
747
737
prev_count = self ._snapshot_processed_counts.get(position_id, 0 )
@@ -750,82 +740,90 @@ cdef class Portfolio(PortfolioFacade):
750
740
break
751
741
752
742
cdef:
753
- Money existing_pnl
754
743
Position snapshot
744
+ Money sum_pnl = None
745
+ Money last_pnl = None
755
746
756
747
if rebuild:
757
- # Rebuild from scratch for this instrument
758
- self ._snapshot_realized_pnls.pop(instrument_id, None )
759
-
760
- for position_id in position_ids:
748
+ # Full rebuild: process all snapshots from scratch
749
+ for position_id in snapshot_position_ids:
761
750
position_id_snapshots = self ._cache.position_snapshot_bytes(position_id)
762
751
curr_count = len (position_id_snapshots)
752
+
763
753
if curr_count:
764
754
for s in position_id_snapshots:
765
755
snapshot = pickle.loads(s)
766
- if snapshot.realized_pnl is None :
767
- continue
768
-
769
- # Aggregate if same currency; otherwise log and keep existing
770
- existing_pnl = self ._snapshot_realized_pnls.get(instrument_id)
771
- if existing_pnl is not None :
772
- if existing_pnl.currency == snapshot.realized_pnl.currency:
773
- self ._snapshot_realized_pnls[instrument_id] = Money(
774
- existing_pnl.as_double() + snapshot.realized_pnl.as_double(),
775
- existing_pnl.currency,
756
+ if snapshot.realized_pnl is not None :
757
+ if sum_pnl is None :
758
+ sum_pnl = snapshot.realized_pnl
759
+ elif sum_pnl.currency == snapshot.realized_pnl.currency:
760
+ # Accumulate all snapshot PnLs
761
+ sum_pnl = Money(
762
+ sum_pnl.as_double() + snapshot.realized_pnl.as_double(),
763
+ sum_pnl.currency
776
764
)
777
- else :
778
- self ._log.warning(
779
- f" Cannot aggregate snapshot PnLs with different currencies for {instrument_id}: "
780
- f" {existing_pnl.currency} vs {snapshot.realized_pnl.currency}" ,
781
- )
782
- else :
783
- self ._snapshot_realized_pnls[instrument_id] = snapshot.realized_pnl
784
- self ._snapshot_processed_counts[position_id] = curr_count
785
- return
765
+ # Always update last to the most recent snapshot
766
+ last_pnl = snapshot.realized_pnl
786
767
787
- # Incremental path: only unpickle new entries
788
- for position_id in position_ids:
789
- position_id_snapshots = self ._cache.position_snapshot_bytes(position_id)
790
- curr_count = len (position_id_snapshots)
791
- if curr_count == 0 :
792
- continue
793
- prev_count = self ._snapshot_processed_counts.get(position_id, 0 )
794
- if prev_count >= curr_count:
795
- continue
796
- for idx in range (prev_count, curr_count):
797
- snapshot = pickle.loads(position_id_snapshots[idx])
798
- if snapshot.realized_pnl is None :
768
+ # Update tracking structures
769
+ if sum_pnl is not None :
770
+ self ._snapshot_sum_per_position[position_id] = sum_pnl
771
+ self ._snapshot_last_per_position[position_id] = last_pnl
772
+ else :
773
+ self ._snapshot_sum_per_position.pop(position_id, None )
774
+ self ._snapshot_last_per_position.pop(position_id, None )
775
+ self ._snapshot_processed_counts[position_id] = curr_count
776
+ else :
777
+ # Incremental path: only process new snapshots
778
+ for position_id in snapshot_position_ids:
779
+ position_id_snapshots = self ._cache.position_snapshot_bytes(position_id)
780
+ curr_count = len (position_id_snapshots)
781
+ if curr_count == 0 :
799
782
continue
800
- existing_pnl = self ._snapshot_realized_pnls.get(instrument_id)
801
- if existing_pnl is not None :
802
- if existing_pnl.currency == snapshot.realized_pnl.currency:
803
- self ._snapshot_realized_pnls[instrument_id] = Money(
804
- existing_pnl.as_double() + snapshot.realized_pnl.as_double(),
805
- existing_pnl.currency,
806
- )
807
- else :
808
- self ._log.warning(
809
- f" Cannot aggregate snapshot PnLs with different currencies for {instrument_id}: "
810
- f" {existing_pnl.currency} vs {snapshot.realized_pnl.currency}" ,
811
- )
783
+ prev_count = self ._snapshot_processed_counts.get(position_id, 0 )
784
+ if prev_count >= curr_count:
785
+ continue
786
+
787
+ sum_pnl = self ._snapshot_sum_per_position.get(position_id)
788
+ last_pnl = self ._snapshot_last_per_position.get(position_id)
789
+
790
+ # Process only new snapshots
791
+ for idx in range (prev_count, curr_count):
792
+ snapshot = pickle.loads(position_id_snapshots[idx])
793
+
794
+ if snapshot.realized_pnl is not None :
795
+ if sum_pnl is None :
796
+ sum_pnl = snapshot.realized_pnl
797
+ elif sum_pnl.currency == snapshot.realized_pnl.currency:
798
+ # Add to running sum
799
+ sum_pnl = Money(
800
+ sum_pnl.as_double() + snapshot.realized_pnl.as_double(),
801
+ sum_pnl.currency
802
+ )
803
+ # Update last to most recent
804
+ last_pnl = snapshot.realized_pnl
805
+
806
+ # Update tracking structures
807
+ if sum_pnl is not None :
808
+ self ._snapshot_sum_per_position[position_id] = sum_pnl
809
+ self ._snapshot_last_per_position[position_id] = last_pnl
812
810
else :
813
- self ._snapshot_realized_pnls[instrument_id] = snapshot.realized_pnl
814
- self ._snapshot_processed_counts[position_id] = curr_count
811
+ self ._snapshot_sum_per_position.pop(position_id, None )
812
+ self ._snapshot_last_per_position.pop(position_id, None )
813
+ self ._snapshot_processed_counts[position_id] = curr_count
815
814
816
- # Prune stale entries from processed counts
817
- cdef PositionId stale_position_id
815
+ # Prune stale entries (positions that no longer have snapshots)
818
816
cdef list [PositionId] stale_ids = []
819
817
818
+ cdef PositionId stale_position_id
820
819
for stale_position_id in self ._snapshot_processed_counts:
821
- if stale_position_id not in position_ids :
820
+ if stale_position_id not in snapshot_position_ids :
822
821
stale_ids.append(stale_position_id)
823
822
824
823
for stale_position_id in stale_ids:
825
824
self ._snapshot_processed_counts.pop(stale_position_id, None )
826
-
827
- if self ._debug and self ._snapshot_realized_pnls.get(instrument_id) is not None :
828
- self ._log.debug(f" Cached snapshot realized PnL for {instrument_id}" )
825
+ self ._snapshot_sum_per_position.pop(stale_position_id, None )
826
+ self ._snapshot_last_per_position.pop(stale_position_id, None )
829
827
830
828
# -- QUERIES --------------------------------------------------------------------------------------
831
829
@@ -1612,16 +1610,12 @@ cdef class Portfolio(PortfolioFacade):
1612
1610
else :
1613
1611
currency = instrument.get_cost_currency()
1614
1612
1615
- # Ensure snapshot PnLs for this instrument are cached (incremental)
1616
1613
self ._ensure_snapshot_pnls_cached_for(instrument_id)
1617
1614
1618
1615
cdef:
1619
1616
list [Position] positions
1620
- Money cached_snapshot_pnl
1621
- Money cached_snapshot_pnl_for_instrument
1622
1617
double total_pnl = 0.0
1623
1618
double xrate
1624
- double snapshot_xrate
1625
1619
Position position
1626
1620
double pnl
1627
1621
@@ -1633,58 +1627,97 @@ cdef class Portfolio(PortfolioFacade):
1633
1627
if self ._debug:
1634
1628
self ._log.debug(f" Found {len(positions)} positions for {instrument_id}" )
1635
1629
1636
- if not positions:
1637
- # Check if we have cached snapshot PnL for this instrument
1638
- cached_snapshot_pnl = self ._snapshot_realized_pnls.get(instrument_id)
1639
-
1640
- if cached_snapshot_pnl is None :
1641
- return Money(0 , currency)
1642
-
1643
- if cached_snapshot_pnl.currency == currency:
1644
- return cached_snapshot_pnl
1645
-
1646
- xrate = self ._cache.get_xrate(
1647
- venue = account.id.get_issuer(),
1648
- from_currency = cached_snapshot_pnl.currency,
1649
- to_currency = currency,
1650
- price_type = PriceType.MID,
1651
- )
1652
- if xrate == 0 :
1653
- return None # Cannot convert currency
1654
-
1655
- return Money(cached_snapshot_pnl.as_double() * xrate, currency)
1656
-
1657
- # Add cached snapshot PnL if available
1658
- cached_snapshot_pnl_for_instrument = self ._snapshot_realized_pnls.get(instrument_id)
1630
+ # Build set of active position IDs for quick lookup
1631
+ cdef set [PositionId] active_position_ids = {p.id for p in positions}
1632
+ cdef set [PositionId] snapshot_ids = self ._cache.position_snapshot_ids(instrument_id)
1633
+ cdef set [PositionId] processed_ids = set ()
1659
1634
1660
- if cached_snapshot_pnl_for_instrument is None :
1661
- pass # No snapshot PnL to add
1662
- elif cached_snapshot_pnl_for_instrument.currency == currency:
1663
- if self ._debug:
1664
- self ._log.debug(f" Adding cached snapshot PnL: {cached_snapshot_pnl_for_instrument}" )
1665
- total_pnl += cached_snapshot_pnl_for_instrument.as_double()
1666
- else :
1667
- snapshot_xrate = self ._cache.get_xrate(
1668
- venue = account.id.get_issuer(),
1669
- from_currency = cached_snapshot_pnl_for_instrument.currency,
1670
- to_currency = currency,
1671
- price_type = PriceType.MID,
1672
- )
1673
- if snapshot_xrate == 0 :
1674
- return None # Cannot convert currency
1635
+ cdef:
1636
+ PositionId position_id
1637
+ Money sum_pnl
1638
+ Money last_pnl
1639
+ double contribution
1640
+
1641
+ # Apply the 3-case combination rule for positions with snapshots
1642
+ for position_id in snapshot_ids:
1643
+ sum_pnl = self ._snapshot_sum_per_position.get(position_id)
1644
+ if sum_pnl is None :
1645
+ continue # No PnL for this position
1646
+
1647
+ contribution = 0.0
1648
+
1649
+ if position_id not in active_position_ids:
1650
+ # Case 1: Position NOT in cache - add sum of all snapshots
1651
+ contribution = sum_pnl.as_double()
1652
+ # Mark as fully processed since position doesn't exist
1653
+ processed_ids.add(position_id)
1654
+ else :
1655
+ # Position is in cache - find it
1656
+ position = None
1657
+
1658
+ for p in positions:
1659
+ if p.id == position_id:
1660
+ position = p
1661
+ break
1662
+
1663
+ if position is None :
1664
+ continue # Should not happen
1665
+
1666
+ if position.is_open_c():
1667
+ # Case 2: Position OPEN - add sum (prior cycles) + position's realized PnL
1668
+ contribution = sum_pnl.as_double()
1669
+ # Position's PnL will be added in the positions loop below
1670
+ # Do NOT mark as processed - we still need to add current PnL
1671
+ else :
1672
+ # Case 3: Position CLOSED
1673
+ # If last snapshot equals current position realized PnL, subtract it here;
1674
+ # when we add the position realized below, net effect is `sum`.
1675
+ # If not equal (new closed cycle not snapshotted), include full `sum` here
1676
+ # and add the position realized below (net `sum + realized`).
1677
+ last_pnl = self ._snapshot_last_per_position.get(position_id)
1678
+
1679
+ if (
1680
+ last_pnl is not None
1681
+ and position.realized_pnl is not None
1682
+ and last_pnl.currency == position.realized_pnl.currency
1683
+ and last_pnl == position.realized_pnl
1684
+ ):
1685
+ contribution = sum_pnl.as_double() - last_pnl.as_double()
1686
+ else :
1687
+ contribution = sum_pnl.as_double()
1688
+ # Position's PnL will be added in the positions loop below
1689
+ # Do NOT mark as processed - we still need to add current PnL
1675
1690
1676
- total_pnl += cached_snapshot_pnl_for_instrument.as_double() * snapshot_xrate
1691
+ # Add contribution with currency conversion if needed
1692
+ if sum_pnl.currency == currency:
1693
+ total_pnl += contribution
1694
+ else :
1695
+ xrate = self ._cache.get_xrate(
1696
+ venue = account.id.get_issuer(),
1697
+ from_currency = sum_pnl.currency,
1698
+ to_currency = currency,
1699
+ price_type = PriceType.MID,
1700
+ )
1701
+ if xrate == 0 :
1702
+ return None # Cannot convert currency
1703
+ total_pnl += contribution * xrate
1677
1704
1705
+ # Second: Add realized PnL from active positions
1678
1706
for position in positions:
1679
1707
if position.instrument_id != instrument_id:
1680
1708
continue # Nothing to calculate
1681
1709
1710
+ # Skip positions that were already processed via snapshots
1711
+ if position.id in processed_ids:
1712
+ continue # Already handled in snapshot logic
1713
+
1682
1714
if position.realized_pnl is None :
1683
- continue # Nothing to calculate
1715
+ continue # No PnL to add
1684
1716
1685
1717
if self ._debug:
1686
- self ._log.debug(f" Calculating realized PnL for {position}" )
1718
+ self ._log.debug(f" Adding realized PnL for {position}" )
1687
1719
1720
+ # Add position's realized PnL
1688
1721
if isinstance (instrument, BettingInstrument):
1689
1722
bet_position = self ._bet_positions.get(position.id)
1690
1723
0 commit comments