-
Notifications
You must be signed in to change notification settings - Fork 737
Speedy, auditable, deadlock-resistant storage tracking #2026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+1,521
−6
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| class Storage::MaterializeJob < ApplicationJob | ||
| queue_as :backend | ||
| limits_concurrency to: 1, key: ->(owner) { owner } | ||
|
|
||
| def perform(owner) | ||
| owner.materialize_storage | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| class Storage::ReconcileJob < ApplicationJob | ||
| queue_as :backend | ||
|
|
||
| def perform(owner) | ||
| owner.reconcile_storage | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| module Account::Storage | ||
| extend ActiveSupport::Concern | ||
| include Storage::Totaled | ||
|
|
||
| private | ||
| def calculate_real_storage_bytes | ||
| boards.sum { |board| board.send(:calculate_real_storage_bytes) } | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| module Board::Storage | ||
| extend ActiveSupport::Concern | ||
| include Storage::Totaled | ||
|
|
||
| # Board's own embeds (public_description) count toward itself | ||
| def board_for_storage_tracking | ||
| self | ||
| end | ||
|
|
||
| private | ||
| BATCH_SIZE = 1000 | ||
|
|
||
| # Calculate actual storage by summing blob sizes. | ||
| # | ||
| # Uses batched pluck queries to avoid loading huge ID arrays, and avoids | ||
| # ActiveRecord model queries on ActiveStorage tables to sidestep cross-pool | ||
| # issues when ActiveStorage uses separate connection pools (e.g., with replicas). | ||
| def calculate_real_storage_bytes | ||
| card_image_bytes + card_embed_bytes + comment_embed_bytes + board_embed_bytes | ||
| end | ||
|
|
||
| def card_image_bytes | ||
| sum_blob_bytes_in_batches \ | ||
| ActiveStorage::Attachment.where(record_type: "Card", name: "image"), | ||
| cards.pluck(:id) | ||
| end | ||
|
|
||
| def card_embed_bytes | ||
| sum_embed_bytes_for "Card", cards.pluck(:id) | ||
| end | ||
|
|
||
| def comment_embed_bytes | ||
| sum_embed_bytes_for "Comment", Comment.where(card_id: cards.pluck(:id)).pluck(:id) | ||
| end | ||
|
|
||
| def board_embed_bytes | ||
| sum_embed_bytes_for "Board", [ id ] | ||
| end | ||
|
|
||
| def sum_embed_bytes_for(record_type, record_ids) | ||
| rich_text_ids = ActionText::RichText \ | ||
| .where(record_type: record_type, record_id: record_ids) | ||
| .pluck(:id) | ||
|
|
||
| sum_blob_bytes_in_batches \ | ||
| ActiveStorage::Attachment.where(record_type: "ActionText::RichText", name: "embeds"), | ||
| rich_text_ids | ||
| end | ||
|
|
||
| def sum_blob_bytes_in_batches(base_scope, record_ids) | ||
| record_ids.each_slice(BATCH_SIZE).sum do |batch_ids| | ||
| blob_ids = base_scope.where(record_id: batch_ids).pluck(:blob_id) | ||
| ActiveStorage::Blob.where(id: blob_ids).sum(:byte_size) | ||
| end | ||
| end | ||
| end | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| module Storage::Totaled | ||
| extend ActiveSupport::Concern | ||
|
|
||
| included do | ||
| has_one :storage_total, as: :owner, class_name: "Storage::Total", dependent: :destroy | ||
| has_many :storage_entries, class_name: "Storage::Entry", foreign_key: foreign_key_for_storage | ||
| end | ||
|
|
||
| class_methods do | ||
| def foreign_key_for_storage | ||
| "#{model_name.singular}_id" | ||
| end | ||
| end | ||
|
|
||
| # Fast: materialized snapshot (may be slightly stale) | ||
| def bytes_used | ||
| storage_total&.bytes_stored || 0 | ||
| end | ||
|
|
||
| # Exact: snapshot + pending entries | ||
| def bytes_used_exact | ||
| (storage_total || create_storage_total!).current_usage | ||
| end | ||
|
|
||
| def materialize_storage_later | ||
| Storage::MaterializeJob.perform_later(self) | ||
| end | ||
|
|
||
| # Materialize all pending entries into snapshot | ||
| def materialize_storage | ||
| total = storage_total || create_storage_total! | ||
|
|
||
| total.with_lock do | ||
| latest_entry_id = storage_entries.maximum(:id) | ||
|
|
||
| if latest_entry_id && total.last_entry_id != latest_entry_id | ||
| scope = storage_entries.where(id: ..latest_entry_id) | ||
| scope = scope.where.not(id: ..total.last_entry_id) if total.last_entry_id | ||
| delta_sum = scope.sum(:delta) | ||
|
|
||
| total.update! bytes_stored: total.bytes_stored + delta_sum, last_entry_id: latest_entry_id | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # Reconcile ledger against actual attachment storage. | ||
| # Uses cursor to ensure consistency: captures max entry ID first, then calculates | ||
| # real bytes, then sums only entries up to that cursor. Concurrent uploads during | ||
| # calculation will have entries with IDs beyond the cursor, avoiding double-count. | ||
| def reconcile_storage | ||
| max_entry_id = storage_entries.maximum(:id) | ||
| real_bytes = calculate_real_storage_bytes | ||
| ledger_bytes = max_entry_id ? storage_entries.where(id: ..max_entry_id).sum(:delta) : 0 | ||
| diff = real_bytes - ledger_bytes | ||
|
|
||
| if diff.nonzero? | ||
| Storage::Entry.record \ | ||
| account: is_a?(Account) ? self : account, | ||
| board: is_a?(Board) ? self : nil, | ||
| recordable: nil, | ||
| delta: diff, | ||
| operation: "reconcile" | ||
| end | ||
| end | ||
|
|
||
| private | ||
| def calculate_real_storage_bytes | ||
| raise NotImplementedError, "Subclass must implement calculate_real_storage_bytes" | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| module Storage::Tracked | ||
| extend ActiveSupport::Concern | ||
|
|
||
| included do | ||
| before_update :track_board_transfer, if: :board_transfer? | ||
| end | ||
|
|
||
| # Return self as the trackable record for storage entries | ||
| def storage_tracked_record | ||
| self | ||
| end | ||
|
|
||
| # Override in models where board is determined differently (e.g., Board itself) | ||
| def board_for_storage_tracking | ||
| board | ||
| end | ||
|
|
||
| # Total bytes for all attachments on this record | ||
| def storage_bytes | ||
| attachments_for_storage.sum { |a| a.blob.byte_size } | ||
| end | ||
|
|
||
| private | ||
| def board_transfer? | ||
| respond_to?(:board_id_changed?) && board_id_changed? | ||
| end | ||
|
|
||
| def track_board_transfer | ||
| old_board_id = board_id_was | ||
| current_bytes = storage_bytes | ||
|
|
||
| if current_bytes.positive? | ||
| # Debit old board | ||
| if old_board_id | ||
| Storage::Entry.record \ | ||
| account: account, | ||
| board_id: old_board_id, | ||
| recordable: self, | ||
| delta: -current_bytes, | ||
| operation: "transfer_out" | ||
| end | ||
|
|
||
| # Credit new board | ||
| Storage::Entry.record \ | ||
| account: account, | ||
| board: board, | ||
| recordable: self, | ||
| delta: current_bytes, | ||
| operation: "transfer_in" | ||
| end | ||
| end | ||
|
|
||
| # Override if needed. Default = all direct attachments | ||
| def attachments_for_storage | ||
| ActiveStorage::Attachment.where(record: self) | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| module Storage | ||
| def self.table_name_prefix | ||
| "storage_" | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| module Storage::AttachmentTracking | ||
| extend ActiveSupport::Concern | ||
|
|
||
| included do | ||
| # Snapshot IDs in before_destroy since parent record may be deleted | ||
| # by the time after_destroy_commit runs | ||
| before_destroy :snapshot_storage_context | ||
| after_create_commit :record_storage_attach | ||
| after_destroy_commit :record_storage_detach | ||
| end | ||
|
|
||
| private | ||
| def record_storage_attach | ||
| return unless storage_tracked_record | ||
|
|
||
| Storage::Entry.record \ | ||
| account: storage_tracked_record.account, | ||
| board: storage_tracked_record.board_for_storage_tracking, | ||
| recordable: storage_tracked_record, | ||
| blob: blob, | ||
| delta: blob.byte_size, | ||
| operation: "attach" | ||
| end | ||
|
|
||
| def record_storage_detach | ||
| return unless @storage_snapshot | ||
|
|
||
| Storage::Entry.record \ | ||
| account_id: @storage_snapshot[:account_id], | ||
| board_id: @storage_snapshot[:board_id], | ||
| recordable_type: @storage_snapshot[:recordable_type], | ||
| recordable_id: @storage_snapshot[:recordable_id], | ||
| blob_id: @storage_snapshot[:blob_id], | ||
| delta: -blob.byte_size, | ||
| operation: "detach" | ||
| end | ||
|
|
||
| def snapshot_storage_context | ||
| return unless storage_tracked_record | ||
|
|
||
| @storage_snapshot = { | ||
| account_id: storage_tracked_record.account.id, | ||
| board_id: storage_tracked_record.board_for_storage_tracking&.id, | ||
| recordable_type: storage_tracked_record.class.name, | ||
| recordable_id: storage_tracked_record.id, | ||
| blob_id: blob.id | ||
| } | ||
| end | ||
|
|
||
| def storage_tracked_record | ||
| record.try(:storage_tracked_record) | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| class Storage::Entry < ApplicationRecord | ||
| belongs_to :account | ||
| belongs_to :board, optional: true | ||
| belongs_to :recordable, polymorphic: true, optional: true | ||
|
|
||
| scope :pending, ->(last_entry_id) { where.not(id: ..last_entry_id) if last_entry_id } | ||
|
|
||
| # Accepts either objects or _id params (for after_destroy_commit snapshots) | ||
| def self.record(delta:, operation:, account: nil, account_id: nil, board: nil, board_id: nil, | ||
| recordable: nil, recordable_type: nil, recordable_id: nil, blob: nil, blob_id: nil) | ||
| return if delta.zero? | ||
|
|
||
| account_id ||= account&.id | ||
| board_id ||= board&.id | ||
| blob_id ||= blob&.id | ||
|
|
||
| entry = create! \ | ||
| account_id: account_id, | ||
| board_id: board_id, | ||
| recordable_type: recordable_type || recordable&.class&.name, | ||
| recordable_id: recordable_id || recordable&.id, | ||
| blob_id: blob_id, | ||
| delta: delta, | ||
| operation: operation, | ||
| user_id: Current.user&.id, | ||
| request_id: Current.request_id | ||
|
|
||
| # Enqueue materialization - use find_by to handle cascading deletes | ||
| # (Account/Board may be destroyed while attachments are still being cleaned up) | ||
| Account.find_by(id: account_id)&.materialize_storage_later | ||
| Board.find_by(id: board_id)&.materialize_storage_later if board_id | ||
|
|
||
| entry | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| class Storage::Total < ApplicationRecord | ||
| belongs_to :owner, polymorphic: true | ||
|
|
||
| def pending_entries | ||
| owner.storage_entries.pending(last_entry_id) | ||
| end | ||
|
|
||
| # Exact current usage (snapshot + pending) | ||
| def current_usage | ||
| bytes_stored + pending_entries.sum(:delta) | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| class CreateStorageTables < ActiveRecord::Migration[8.0] | ||
| def change | ||
| # Storage ledger: debit/credit event stream | ||
| create_table :storage_entries, id: :uuid do |t| | ||
| t.references :account, type: :uuid, null: false | ||
| t.references :board, type: :uuid, null: true | ||
|
|
||
| t.references :recordable, type: :uuid, polymorphic: true, null: true | ||
|
|
||
| t.bigint :delta, null: false | ||
| t.string :operation, null: false | ||
|
|
||
| t.datetime :created_at, null: false | ||
| end | ||
|
|
||
| # Storage totals: cached snapshots | ||
| create_table :storage_totals, id: :uuid do |t| | ||
| t.references :owner, type: :uuid, polymorphic: true, null: false, index: false | ||
|
|
||
| t.bigint :bytes_stored, null: false, default: 0 | ||
| t.uuid :last_entry_id # Cursor: includes all entries <= this ID | ||
|
|
||
| t.timestamps | ||
| t.index %i[ owner_type owner_id ], unique: true | ||
| end | ||
| end | ||
| end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.