Skip to content

Commit a2ccd91

Browse files
workflows: add SEND balance + hodler activities (not wired)
Why: - Introduce compile-safe primitives without changing behavior. - readBalanceActivity (SEND-only), persistBalanceActivity (upsert token_balances), upsertSendTokenHodlerVerificationActivity (string|bigint weight; retry semantics). Test plan: - yarn lint & tsc → pass; activities compile. - Dry-run activities (where applicable) log expected paths; no wiring yet.
1 parent 6bd27f4 commit a2ccd91

File tree

1 file changed

+216
-0
lines changed

1 file changed

+216
-0
lines changed

packages/workflows/src/transfer-workflow/activities.ts

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ import {
1212
type TemporalTransferUpdate,
1313
} from './supabase'
1414
import { isAddressInTopic, isReceiveTopic, isTransferTopic } from './wagmi'
15+
import { config, readSendTokenBalanceOf, sendTokenAddress, baseMainnetClient } from '@my/wagmi'
16+
import { createSupabaseAdminClient } from 'app/utils/supabase/admin'
17+
import { hexToBytea } from 'app/utils/hexToBytea'
18+
import { getUserIdFromAddress } from '../deposit-workflow/supabase'
19+
import type { Database } from '@my/supabase/database.types'
1520

1621
type TransferActivities = {
1722
upsertTemporalSendAccountTransferActivity: (
@@ -43,6 +48,27 @@ type TransferActivities = {
4348
eventName: string
4449
eventId: string
4550
}>
51+
readBalanceActivity: (params: {
52+
token: Address
53+
account: Address
54+
}) => Promise<{
55+
userId: string
56+
token: Address
57+
balance: string
58+
address: Address
59+
chainId: number
60+
} | null>
61+
persistBalanceActivity: (params: {
62+
userId: string
63+
token: Address | null
64+
balance: string | bigint
65+
address: Address
66+
chainId: number
67+
}) => Promise<void>
68+
upsertSendTokenHodlerVerificationActivity: (params: {
69+
userId: string
70+
balance: string | bigint
71+
}) => Promise<void>
4672
}
4773

4874
export const createTransferActivities = (
@@ -209,5 +235,195 @@ export const createTransferActivities = (
209235
eventId,
210236
}
211237
},
238+
239+
async readBalanceActivity({ token, account }) {
240+
try {
241+
// SEND-only gate
242+
const chainId = baseMainnetClient.chain.id
243+
const sendAddr = sendTokenAddress[chainId]
244+
if (!sendAddr || token.toLowerCase() !== sendAddr.toLowerCase()) {
245+
return null
246+
}
247+
248+
// Resolve user_id via existing helper (send_accounts.address is CITEXT)
249+
const userId = await getUserIdFromAddress(account)
250+
if (!userId) return null
251+
252+
// Read balanceOf
253+
const balance = await readSendTokenBalanceOf(config, {
254+
args: [account],
255+
chainId,
256+
})
257+
258+
// Only return data; do not persist (simplicity)
259+
return {
260+
userId,
261+
token: sendAddr,
262+
balance: balance.toString(),
263+
address: account,
264+
chainId,
265+
}
266+
} catch (error) {
267+
if (error instanceof ApplicationFailure) throw error
268+
log.error('readBalanceActivity failed', { error })
269+
throw ApplicationFailure.nonRetryable('readBalanceActivity failed', 'READ_BALANCE_FAILED', {
270+
error,
271+
})
272+
}
273+
},
274+
275+
async persistBalanceActivity({ userId, token, balance, address, chainId }) {
276+
try {
277+
const supabaseAdmin = createSupabaseAdminClient()
278+
const payload: Database['public']['Tables']['token_balances']['Insert'] = {
279+
user_id: userId,
280+
address: address,
281+
chain_id: chainId,
282+
token: token ? hexToBytea(token) : null,
283+
balance: balance,
284+
updated_at: new Date().toISOString(),
285+
}
286+
const { error } = await supabaseAdmin
287+
.from('token_balances')
288+
.upsert([payload], { onConflict: 'user_id,token_key' })
289+
290+
if (error) {
291+
if (isRetryableDBError(error)) {
292+
throw ApplicationFailure.retryable(
293+
'Database connection error, retrying...',
294+
error.code,
295+
{
296+
error,
297+
userId,
298+
}
299+
)
300+
}
301+
throw ApplicationFailure.nonRetryable('Database error occurred', error.code, {
302+
error,
303+
userId,
304+
})
305+
}
306+
} catch (error) {
307+
if (error instanceof ApplicationFailure) throw error
308+
log.error('persistBalanceActivity failed', { error })
309+
throw ApplicationFailure.nonRetryable(
310+
error?.message ?? 'persistBalanceActivity failed',
311+
error?.code ?? 'PERSIST_BALANCE_FAILED',
312+
error
313+
)
314+
}
315+
},
316+
317+
async upsertSendTokenHodlerVerificationActivity({ userId, balance }) {
318+
try {
319+
const supabaseAdmin = createSupabaseAdminClient()
320+
const nowIso = new Date().toISOString()
321+
322+
// Resolve current distribution (qualification window contains now)
323+
const { data: distribution, error: distError } = await supabaseAdmin
324+
.from('distributions')
325+
.select('id, qualification_start, qualification_end')
326+
.lte('qualification_start', nowIso)
327+
.gte('qualification_end', nowIso)
328+
.order('qualification_start', { ascending: false })
329+
.limit(1)
330+
.maybeSingle()
331+
332+
if (distError) {
333+
if (isRetryableDBError(distError)) {
334+
throw ApplicationFailure.retryable(
335+
'Database connection error, retrying...',
336+
distError.code,
337+
{
338+
error: distError,
339+
}
340+
)
341+
}
342+
throw ApplicationFailure.nonRetryable(
343+
'Error fetching current distribution',
344+
distError.code,
345+
distError
346+
)
347+
}
348+
349+
if (!distribution) {
350+
log.info('No active distribution window; skipping hodler verification upsert')
351+
return
352+
}
353+
354+
// Insert or update without ON CONFLICT (index is not unique by design)
355+
const { data: existing, error: selectError } = await supabaseAdmin
356+
.from('distribution_verifications')
357+
.select('id')
358+
.eq('distribution_id', distribution.id)
359+
.eq('user_id', userId)
360+
.eq('type', 'send_token_hodler')
361+
.maybeSingle()
362+
363+
if (selectError) {
364+
if (isRetryableDBError(selectError)) {
365+
throw ApplicationFailure.retryable(
366+
'Database connection error, retrying...',
367+
selectError.code,
368+
{
369+
error: selectError,
370+
}
371+
)
372+
}
373+
throw ApplicationFailure.nonRetryable(
374+
'Error selecting distribution_verifications',
375+
selectError.code,
376+
selectError
377+
)
378+
}
379+
380+
let dvError: import('@supabase/supabase-js').PostgrestError | null = null
381+
if (existing && 'id' in existing && existing.id) {
382+
const { error: updateErr } = await supabaseAdmin
383+
.from('distribution_verifications')
384+
.update({ weight: balance, metadata: null })
385+
.eq('id', existing.id)
386+
dvError = updateErr
387+
} else {
388+
const { error: insertErr } = await supabaseAdmin
389+
.from('distribution_verifications')
390+
.insert([
391+
{
392+
distribution_id: distribution.id,
393+
user_id: userId,
394+
type: 'send_token_hodler',
395+
weight: balance,
396+
metadata: null,
397+
},
398+
])
399+
dvError = insertErr
400+
}
401+
402+
if (dvError) {
403+
if (isRetryableDBError(dvError)) {
404+
throw ApplicationFailure.retryable(
405+
'Database connection error, retrying...',
406+
dvError.code,
407+
{
408+
error: dvError,
409+
userId,
410+
}
411+
)
412+
}
413+
throw ApplicationFailure.nonRetryable('Database error occurred', dvError.code, {
414+
error: dvError,
415+
userId,
416+
})
417+
}
418+
} catch (error) {
419+
if (error instanceof ApplicationFailure) throw error
420+
log.error('upsertSendTokenHodlerVerificationActivity failed', { error })
421+
throw ApplicationFailure.nonRetryable(
422+
error?.message ?? 'upsertSendTokenHodlerVerificationActivity failed',
423+
error?.code ?? 'UPSERT_HODLER_VERIFICATION_FAILED',
424+
error
425+
)
426+
}
427+
},
212428
}
213429
}

0 commit comments

Comments
 (0)