Skip to content

Commit 36abf87

Browse files
db: add token_balances (RLS, trigger) + backfill
Why: - Provide storage and unique identity (user_id,token_key) for native/ERC-20 balances. - RLS ensures users can read only their balances; trigger updates native balances on deposits (exclude internal transfers). Historical backfill aggregates SEND net transfers and ETH deposits. Test plan: - Apply migrations locally: table, indexes, RLS, trigger created. - Insert a receive (external→Send Account) → native balance increments. - Backfill runs without relation/lock errors; balances reflect SEND+ETH merged per address/chain.
1 parent 401915c commit 36abf87

File tree

3 files changed

+316
-0
lines changed

3 files changed

+316
-0
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
create sequence "public"."token_balances_id_seq";
2+
3+
alter type "public"."verification_type" add value if not exists 'send_token_hodler' after 'send_ceiling';
4+
5+
create table "public"."token_balances" (
6+
"id" bigint not null default nextval('token_balances_id_seq'::regclass),
7+
"user_id" uuid not null,
8+
"address" citext not null,
9+
"chain_id" integer not null,
10+
"token" bytea,
11+
"balance" numeric not null default 0,
12+
"updated_at" timestamp with time zone not null default (now() AT TIME ZONE 'utc'::text),
13+
"token_key" text generated always as (
14+
CASE
15+
WHEN (token IS NULL) THEN 'eth'::text
16+
ELSE encode(token, 'hex'::text)
17+
END) stored
18+
);
19+
20+
21+
alter table "public"."token_balances" enable row level security;;
22+
23+
alter sequence "public"."token_balances_id_seq" owned by "public"."token_balances"."id";
24+
25+
CREATE INDEX token_balances_address_idx ON public.token_balances USING btree (address, chain_id);
26+
27+
CREATE UNIQUE INDEX token_balances_pkey ON public.token_balances USING btree (id);
28+
29+
CREATE INDEX token_balances_token_balance_idx ON public.token_balances USING btree (token, balance);
30+
31+
CREATE INDEX token_balances_token_user_idx ON public.token_balances USING btree (token, user_id);
32+
33+
CREATE UNIQUE INDEX token_balances_user_token_key_uniq ON public.token_balances USING btree (user_id, token_key);
34+
35+
alter table "public"."token_balances" add constraint "token_balances_pkey" PRIMARY KEY using index "token_balances_pkey";
36+
37+
alter table "public"."token_balances" add constraint "token_balances_user_id_fkey" FOREIGN KEY (user_id) REFERENCES auth.users(id) ON DELETE CASCADE not valid;
38+
39+
alter table "public"."token_balances" validate constraint "token_balances_user_id_fkey";
40+
41+
set check_function_bodies = off;
42+
43+
CREATE OR REPLACE FUNCTION public.send_account_receives_update_token_balances()
44+
RETURNS trigger
45+
LANGUAGE plpgsql
46+
SECURITY DEFINER
47+
SET search_path TO 'public'
48+
AS $function$
49+
DECLARE
50+
_to_user_id uuid;
51+
_to_address citext;
52+
_from_is_send boolean := false;
53+
BEGIN
54+
-- Recipient address is NEW.log_addr
55+
SELECT sa.user_id
56+
INTO _to_user_id
57+
FROM public.send_accounts sa
58+
WHERE sa.address = concat('0x', encode(NEW.log_addr, 'hex'))::citext
59+
AND sa.chain_id = NEW.chain_id::int
60+
LIMIT 1;
61+
62+
-- Sender is a Send Account? If yes, this is an internal transfer; let workflow handle it.
63+
SELECT EXISTS (
64+
SELECT 1
65+
FROM public.send_accounts sa
66+
WHERE sa.address = concat('0x', encode(NEW.sender, 'hex'))::citext
67+
AND sa.chain_id = NEW.chain_id::int
68+
) INTO _from_is_send;
69+
70+
-- Only update balances for deposits: recipient is a Send Account, sender is NOT a Send Account
71+
IF _to_user_id IS NOT NULL AND NOT _from_is_send THEN
72+
_to_address := concat('0x', encode(NEW.log_addr, 'hex'))::citext;
73+
74+
INSERT INTO public.token_balances (user_id, address, chain_id, token, balance, updated_at)
75+
VALUES (
76+
_to_user_id,
77+
_to_address,
78+
NEW.chain_id::int,
79+
NULL,
80+
NEW.value,
81+
to_timestamp(NEW.block_time) at time zone 'UTC'
82+
)
83+
ON CONFLICT (user_id, token_key)
84+
DO UPDATE SET
85+
balance = public.token_balances.balance + EXCLUDED.balance,
86+
address = EXCLUDED.address,
87+
chain_id = EXCLUDED.chain_id,
88+
updated_at = EXCLUDED.updated_at;
89+
END IF;
90+
91+
RETURN NEW;
92+
END;
93+
$function$;
94+
95+
GRANT ALL ON TABLE "public"."token_balances" TO "anon";
96+
GRANT ALL ON TABLE "public"."token_balances" TO "authenticated";
97+
GRANT ALL ON TABLE "public"."token_balances" TO "service_role";
98+
99+
create policy "Users can see their own token balances"
100+
on "public"."token_balances"
101+
as permissive
102+
for select
103+
to authenticated
104+
using ((select auth.uid() = user_id));
105+
106+
107+
CREATE TRIGGER send_account_receives_trigger_update_token_balances AFTER INSERT ON public.send_account_receives FOR EACH ROW EXECUTE FUNCTION public.send_account_receives_update_token_balances();
108+
109+
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
SELECT insert_verification_value(
2+
distribution_number => 20,
3+
type => 'send_token_hodler'::public.verification_type,
4+
multiplier_min => 0,
5+
multiplier_max => 0,
6+
multiplier_step => 0
7+
);
8+
9+
10+
-- Data backfill: populate token_balances from historical send_token_transfers
11+
-- Goal: sum all transfers (in - out since account creation) for SEND token and upsert into token_balances
12+
-- Safe to re-run: uses ON CONFLICT to overwrite balance with final sum.
13+
14+
DO $$
15+
BEGIN
16+
-- Ensure the token_balances table exists (created in prior migration); if not, abort early
17+
IF NOT EXISTS (
18+
SELECT 1 FROM information_schema.tables
19+
WHERE table_schema = 'public' AND table_name = 'token_balances'
20+
) THEN
21+
RAISE NOTICE 'token_balances does not exist; skipping backfill.';
22+
RETURN;
23+
END IF;
24+
25+
WITH send_token AS (
26+
SELECT decode('Eab49138BA2Ea6dd776220fE26b7b8E446638956','hex') AS token
27+
),
28+
sa AS (
29+
SELECT sa.user_id,
30+
sa.address,
31+
sa.chain_id,
32+
decode(replace(sa.address::text,'0x',''),'hex') AS addr_bytea
33+
FROM public.send_accounts sa
34+
),
35+
di AS (
36+
SELECT sa.user_id,
37+
sa.address,
38+
sa.chain_id,
39+
SUM(stt.v)::numeric AS amt_in,
40+
MAX(stt.block_time) AS max_bt
41+
FROM sa
42+
JOIN public.send_account_created sac ON sac.account = sa.addr_bytea
43+
JOIN public.send_token_transfers stt
44+
ON stt.t = sa.addr_bytea
45+
AND stt.log_addr = (SELECT token FROM send_token)
46+
AND stt.block_num >= sac.block_num
47+
GROUP BY sa.user_id, sa.address, sa.chain_id
48+
),
49+
dout AS (
50+
SELECT sa.user_id,
51+
sa.address,
52+
sa.chain_id,
53+
SUM(stt.v)::numeric AS amt_out,
54+
MAX(stt.block_time) AS max_bt
55+
FROM sa
56+
JOIN public.send_account_created sac ON sac.account = sa.addr_bytea
57+
JOIN public.send_token_transfers stt
58+
ON stt.f = sa.addr_bytea
59+
AND stt.log_addr = (SELECT token FROM send_token)
60+
AND stt.block_num >= sac.block_num
61+
GROUP BY sa.user_id, sa.address, sa.chain_id
62+
),
63+
final AS (
64+
SELECT COALESCE(di.user_id, dout.user_id) AS user_id,
65+
COALESCE(di.address, dout.address) AS address,
66+
COALESCE(di.chain_id, dout.chain_id) AS chain_id,
67+
(COALESCE(di.amt_in, 0) - COALESCE(dout.amt_out, 0))::numeric AS balance,
68+
GREATEST(COALESCE(di.max_bt, 0), COALESCE(dout.max_bt, 0)) AS max_block_time
69+
FROM di
70+
FULL OUTER JOIN dout ON di.user_id = dout.user_id
71+
AND di.address = dout.address
72+
AND di.chain_id = dout.chain_id
73+
),
74+
-- ETH/native deposits: sum receives where recipient is a Send Account and sender is NOT a Send Account
75+
eth_dep AS (
76+
SELECT sa.user_id,
77+
sa.address,
78+
sa.chain_id,
79+
SUM(rec.value)::numeric AS amt_dep,
80+
MAX(rec.block_time) AS max_bt
81+
FROM sa
82+
JOIN public.send_account_created sac ON sac.account = sa.addr_bytea
83+
JOIN public.send_account_receives rec
84+
ON rec.log_addr = sa.addr_bytea
85+
AND rec.chain_id = sa.chain_id
86+
AND rec.block_num >= sac.block_num
87+
LEFT JOIN public.send_accounts sa_sender
88+
ON sa_sender.address = concat('0x', encode(rec.sender, 'hex'))::citext
89+
AND sa_sender.chain_id = sa.chain_id
90+
WHERE sa_sender.id IS NULL -- deposits only (external → Send Account)
91+
GROUP BY sa.user_id, sa.address, sa.chain_id
92+
)
93+
-- Insert SEND token balances
94+
INSERT INTO public.token_balances(user_id, address, chain_id, token, balance, updated_at)
95+
SELECT f.user_id,
96+
f.address,
97+
f.chain_id,
98+
(SELECT token FROM send_token),
99+
f.balance,
100+
CASE WHEN f.max_block_time > 0 THEN to_timestamp(f.max_block_time) AT TIME ZONE 'UTC'
101+
ELSE now() AT TIME ZONE 'UTC' END
102+
FROM final f
103+
ON CONFLICT (user_id, token_key) DO UPDATE
104+
SET balance = EXCLUDED.balance,
105+
address = EXCLUDED.address,
106+
chain_id = EXCLUDED.chain_id,
107+
updated_at = EXCLUDED.updated_at;
108+
109+
-- Insert ETH/native deposit balances (token = NULL)
110+
WITH sa AS (
111+
SELECT sa.user_id,
112+
sa.address,
113+
sa.chain_id,
114+
decode(replace(sa.address::text,'0x',''),'hex') AS addr_bytea
115+
FROM public.send_accounts sa
116+
),
117+
eth_dep AS (
118+
SELECT sa.user_id,
119+
sa.address,
120+
sa.chain_id,
121+
SUM(rec.value)::numeric AS amt_dep,
122+
MAX(rec.block_time) AS max_bt
123+
FROM sa
124+
JOIN public.send_account_created sac ON sac.account = sa.addr_bytea
125+
JOIN public.send_account_receives rec
126+
ON rec.log_addr = sa.addr_bytea
127+
AND rec.chain_id = sa.chain_id
128+
AND rec.block_num >= sac.block_num
129+
LEFT JOIN public.send_accounts sa_sender
130+
ON sa_sender.address = concat('0x', encode(rec.sender, 'hex'))::citext
131+
AND sa_sender.chain_id = sa.chain_id
132+
WHERE sa_sender.id IS NULL -- deposits only (external → Send Account)
133+
GROUP BY sa.user_id, sa.address, sa.chain_id
134+
)
135+
INSERT INTO public.token_balances(user_id, address, chain_id, token, balance, updated_at)
136+
SELECT ed.user_id,
137+
ed.address,
138+
ed.chain_id,
139+
NULL AS token,
140+
ed.amt_dep,
141+
CASE WHEN ed.max_bt > 0 THEN to_timestamp(ed.max_bt) AT TIME ZONE 'UTC'
142+
ELSE now() AT TIME ZONE 'UTC' END
143+
FROM eth_dep ed
144+
ON CONFLICT (user_id, token_key) DO UPDATE
145+
SET balance = public.token_balances.balance + EXCLUDED.balance,
146+
address = EXCLUDED.address,
147+
chain_id = EXCLUDED.chain_id,
148+
updated_at = EXCLUDED.updated_at;
149+
END $$;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
-- token_balances (Temporal-driven): single source-of-truth rows
2+
-- Keep reads simple; balances are written by Temporal activities after
3+
-- confirmed transfers. No advancer/cursor/functions here.
4+
5+
-- Balances table (global per-user balances per token or native asset)
6+
CREATE TABLE IF NOT EXISTS public.token_balances (
7+
id bigserial PRIMARY KEY,
8+
user_id uuid NOT NULL,
9+
address public.citext NOT NULL,
10+
chain_id integer NOT NULL,
11+
token bytea NULL, -- NULL reserved for native ETH in future
12+
balance numeric NOT NULL DEFAULT 0,
13+
updated_at timestamptz NOT NULL DEFAULT (now() AT TIME ZONE 'utc'),
14+
token_key text GENERATED ALWAYS AS (
15+
CASE WHEN token IS NULL THEN 'eth' ELSE encode(token, 'hex') END
16+
) STORED
17+
);
18+
19+
-- Foreign keys
20+
ALTER TABLE ONLY public.token_balances
21+
ADD CONSTRAINT token_balances_user_id_fkey FOREIGN KEY (user_id)
22+
REFERENCES auth.users(id) ON DELETE CASCADE;
23+
24+
ALTER TABLE ONLY public.token_balances
25+
ADD CONSTRAINT token_balances_address_chain_fkey FOREIGN KEY (address, chain_id)
26+
REFERENCES public.send_accounts(address, chain_id) ON DELETE CASCADE;
27+
28+
-- Helpful indexes and uniqueness
29+
CREATE UNIQUE INDEX IF NOT EXISTS token_balances_user_token_key_uniq
30+
ON public.token_balances (user_id, token_key);
31+
CREATE INDEX IF NOT EXISTS token_balances_token_user_idx
32+
ON public.token_balances (token, user_id);
33+
CREATE INDEX IF NOT EXISTS token_balances_token_balance_idx
34+
ON public.token_balances (token, balance);
35+
CREATE INDEX IF NOT EXISTS token_balances_address_idx
36+
ON public.token_balances (address, chain_id);
37+
38+
-- Enable direct SELECT with RLS: users see their own rows
39+
ALTER TABLE public.token_balances ENABLE ROW LEVEL SECURITY;
40+
DO $$
41+
BEGIN
42+
IF NOT EXISTS (
43+
SELECT 1 FROM pg_policies
44+
WHERE schemaname = 'public'
45+
AND tablename = 'token_balances'
46+
AND policyname = 'Users can see their own token balances'
47+
) THEN
48+
CREATE POLICY "Users can see their own token balances"
49+
ON public.token_balances
50+
FOR SELECT
51+
TO authenticated
52+
USING (auth.uid() = user_id);
53+
END IF;
54+
END $$;
55+
56+
GRANT ALL ON TABLE "public"."token_balances" TO "anon";
57+
GRANT ALL ON TABLE "public"."token_balances" TO "authenticated";
58+
GRANT ALL ON TABLE "public"."token_balances" TO "service_role";

0 commit comments

Comments
 (0)