From d3cf198ec0b85b88c759063009b53ff9e1dc12e6 Mon Sep 17 00:00:00 2001 From: David Klakurka Date: Sun, 4 Jan 2026 19:04:02 -0800 Subject: [PATCH 1/2] Add periodic check of unconfirmed txs to make sure they are either confirmed or orphaned --- constants/index.ts | 3 + jobs/initJobs.ts | 21 +++++- jobs/workers.ts | 70 ++++++++++++++++++- .../migration.sql | 2 + prisma-local/schema.prisma | 1 + services/transactionService.ts | 41 +++++++++++ 6 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 prisma-local/migrations/20260104100000_transaction_confirmed_index/migration.sql diff --git a/constants/index.ts b/constants/index.ts index de1daea1..63968730 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -291,3 +291,6 @@ export const PRICES_CONNECTION_BATCH_SIZE = 1_000 export const PRICES_CONNECTION_TIMEOUT = 30_000 export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (number of days) * (24 * 60 * 60 * 1000) + +// Interval for verifying unconfirmed transactions (30 minutes) +export const UNCONFIRMED_TX_CHECK_INTERVAL = 30 * 60 * 1000 diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index 0a25b755..56273667 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -1,8 +1,8 @@ -import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' +import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY, UNCONFIRMED_TX_CHECK_INTERVAL } from 'constants/index' import { Queue } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import EventEmitter from 'events' -import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers' +import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker, verifyUnconfirmedTransactionsWorker } from './workers' EventEmitter.defaultMaxListeners = 20 @@ -11,10 +11,12 @@ const main = async (): Promise => { const pricesQueue = new Queue('pricesSync', { connection: redisBullMQ }) const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ }) const cleanupQueue = new Queue('clientPaymentCleanup', { connection: redisBullMQ }) + const unconfirmedTxQueue = new Queue('unconfirmedTxVerification', { connection: redisBullMQ }) await pricesQueue.obliterate({ force: true }) await blockchainQueue.obliterate({ force: true }) await cleanupQueue.obliterate({ force: true }) + await unconfirmedTxQueue.obliterate({ force: true }) await pricesQueue.add('syncCurrentPrices', {}, @@ -53,6 +55,21 @@ const main = async (): Promise => { ) await cleanupClientPaymentsWorker(cleanupQueue.name) + + await unconfirmedTxQueue.add( + 'verifyUnconfirmedTransactions', + {}, + { + jobId: 'verifyUnconfirmedTransactions', + removeOnComplete: true, + removeOnFail: true, + repeat: { + every: UNCONFIRMED_TX_CHECK_INTERVAL + } + } + ) + + await verifyUnconfirmedTransactionsWorker(unconfirmedTxQueue.name) } void main() diff --git a/jobs/workers.ts b/jobs/workers.ts index d913176f..9b70e98c 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,8 +1,8 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' -import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { DEFAULT_WORKER_LOCK_DURATION, NETWORK_SLUGS_FROM_IDS } from 'constants/index' import { multiBlockchainClient } from 'services/chronikService' -import { connectAllTransactionsToPrices } from 'services/transactionService' +import { connectAllTransactionsToPrices, fetchUnconfirmedNonOrphanedTransactions, markTransactionConfirmed, markTransactionsOrphaned } from 'services/transactionService' import { cleanupExpiredClientPayments } from 'services/clientPaymentService' import * as priceService from 'services/priceService' @@ -88,3 +88,69 @@ export const cleanupClientPaymentsWorker = async (queueName: string): Promise => { + const worker = new Worker( + queueName, + async (job) => { + console.log(`[UNCONFIRMED TX VERIFICATION] job ${job.id as string}: checking unconfirmed transactions...`) + + const unconfirmedTxs = await fetchUnconfirmedNonOrphanedTransactions() + console.log(`[UNCONFIRMED TX VERIFICATION] Found ${unconfirmedTxs.length} unconfirmed transactions to verify`) + + let confirmedCount = 0 + let orphanedCount = 0 + + // Group transactions by hash to avoid duplicate chronik calls + const txsByHash = new Map() + for (const tx of unconfirmedTxs) { + const existing = txsByHash.get(tx.hash) ?? [] + existing.push(tx) + txsByHash.set(tx.hash, existing) + } + + for (const [hash, txs] of txsByHash.entries()) { + const networkId = txs[0].address.networkId + const networkSlug = NETWORK_SLUGS_FROM_IDS[networkId] + + try { + const txDetails = await multiBlockchainClient.getTransactionDetails(hash, networkSlug) + + // If tx has a block, it's confirmed + if (txDetails.block?.height !== undefined && txDetails.block.height !== null) { + const blockTimestamp = Number(txDetails.block.timestamp) + await markTransactionConfirmed(hash, blockTimestamp) + confirmedCount += txs.length + console.log(`[UNCONFIRMED TX VERIFICATION] Marked tx ${hash} as confirmed`) + } + } catch (err: any) { + const errMsg = String(err?.message ?? err) + const is404 = /not found in the index|404/.test(errMsg) + + if (is404) { + // Transaction no longer exists on the network + await markTransactionsOrphaned(hash) + orphanedCount += txs.length + console.log(`[UNCONFIRMED TX VERIFICATION] Marked tx ${hash} as orphaned (not found)`) + } else { + console.error(`[UNCONFIRMED TX VERIFICATION] Error checking tx ${hash}: ${errMsg}`) + } + } + } + + console.log(`[UNCONFIRMED TX VERIFICATION] Finished: ${confirmedCount} confirmed, ${orphanedCount} orphaned`) + }, + { + connection: redisBullMQ, + lockDuration: DEFAULT_WORKER_LOCK_DURATION + } + ) + + worker.on('completed', job => { + console.log(`[UNCONFIRMED TX VERIFICATION] job ${job.id as string}: completed successfully`) + }) + + worker.on('failed', (job, err) => { + console.error(`[UNCONFIRMED TX VERIFICATION] job ${job?.id as string}: FAILED — ${err.message}`) + }) +} diff --git a/prisma-local/migrations/20260104100000_transaction_confirmed_index/migration.sql b/prisma-local/migrations/20260104100000_transaction_confirmed_index/migration.sql new file mode 100644 index 00000000..caab3f61 --- /dev/null +++ b/prisma-local/migrations/20260104100000_transaction_confirmed_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX `Transaction_confirmed_idx` ON `Transaction`(`confirmed`); diff --git a/prisma-local/schema.prisma b/prisma-local/schema.prisma index a1d0da8c..8c395c42 100644 --- a/prisma-local/schema.prisma +++ b/prisma-local/schema.prisma @@ -84,6 +84,7 @@ model Transaction { @@unique([hash, addressId], name: "Transaction_hash_addressId_unique_constraint") @@index([addressId, timestamp], map: "Transaction_addressId_timestamp_idx") + @@index([confirmed], map: "Transaction_confirmed_idx") } model Wallet { diff --git a/services/transactionService.ts b/services/transactionService.ts index 51f6a4ad..b7814721 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -640,6 +640,47 @@ export async function markTransactionsOrphaned (hash: string): Promise { }) } +export async function markTransactionConfirmed (hash: string, timestamp: number): Promise { + await prisma.transaction.updateMany({ + where: { + hash + }, + data: { + confirmed: true, + timestamp + } + }) +} + +const includeAddressNetwork = { + address: { + select: { + networkId: true, + address: true + } + } +} + +const transactionWithAddressNetwork = Prisma.validator()( + { include: includeAddressNetwork } +) + +export type TransactionWithAddressNetwork = Prisma.TransactionGetPayload + +export async function fetchUnconfirmedNonOrphanedTransactions (): Promise { + const oneWeekAgo = Math.floor(Date.now() / 1000) - (7 * 24 * 60 * 60) + return await prisma.transaction.findMany({ + where: { + confirmed: false, + orphaned: false, + timestamp: { + gte: oneWeekAgo + } + }, + include: includeAddressNetwork + }) +} + async function fetchAllTransactionsWithNoPrices (): Promise { const x = await prisma.transaction.findMany({ where: { From 65b3e1c03509e4f02bac122fcd55b6a9ad3ebd61 Mon Sep 17 00:00:00 2001 From: David Klakurka Date: Sun, 4 Jan 2026 19:06:23 -0800 Subject: [PATCH 2/2] Added warning for txs that are old and seemingly both not confirmed or orphaned --- jobs/workers.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jobs/workers.ts b/jobs/workers.ts index 9b70e98c..2f2d71f3 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -100,6 +100,7 @@ export const verifyUnconfirmedTransactionsWorker = async (queueName: string): Pr let confirmedCount = 0 let orphanedCount = 0 + const oneDayAgo = Math.floor(Date.now() / 1000) - (24 * 60 * 60) // Group transactions by hash to avoid duplicate chronik calls const txsByHash = new Map() @@ -122,6 +123,12 @@ export const verifyUnconfirmedTransactionsWorker = async (queueName: string): Pr await markTransactionConfirmed(hash, blockTimestamp) confirmedCount += txs.length console.log(`[UNCONFIRMED TX VERIFICATION] Marked tx ${hash} as confirmed`) + } else { + // Still unconfirmed - txs over a day old should always be either confirmed or orphaned + const oldestTx = txs.reduce((oldest, tx) => tx.timestamp < oldest.timestamp ? tx : oldest, txs[0]) + if (oldestTx.timestamp < oneDayAgo) { + console.warn(`[UNCONFIRMED TX VERIFICATION] WARNING: tx ${hash} is over 1 day old and still unconfirmed (address: ${oldestTx.address.address})`) + } } } catch (err: any) { const errMsg = String(err?.message ?? err)