Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,6 @@ export const PRICES_CONNECTION_BATCH_SIZE = 1_000
export const PRICES_CONNECTION_TIMEOUT = 30_000

export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (number of days) * (24 * 60 * 60 * 1000)

// Interval for verifying unconfirmed transactions (30 minutes)
export const UNCONFIRMED_TX_CHECK_INTERVAL = 30 * 60 * 1000
21 changes: 19 additions & 2 deletions jobs/initJobs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY, UNCONFIRMED_TX_CHECK_INTERVAL } from 'constants/index'
import { Queue } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import EventEmitter from 'events'
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers'
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker, verifyUnconfirmedTransactionsWorker } from './workers'

EventEmitter.defaultMaxListeners = 20

Expand All @@ -11,10 +11,12 @@ const main = async (): Promise<void> => {
const pricesQueue = new Queue('pricesSync', { connection: redisBullMQ })
const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ })
const cleanupQueue = new Queue('clientPaymentCleanup', { connection: redisBullMQ })
const unconfirmedTxQueue = new Queue('unconfirmedTxVerification', { connection: redisBullMQ })

await pricesQueue.obliterate({ force: true })
await blockchainQueue.obliterate({ force: true })
await cleanupQueue.obliterate({ force: true })
await unconfirmedTxQueue.obliterate({ force: true })

await pricesQueue.add('syncCurrentPrices',
{},
Expand Down Expand Up @@ -53,6 +55,21 @@ const main = async (): Promise<void> => {
)

await cleanupClientPaymentsWorker(cleanupQueue.name)

await unconfirmedTxQueue.add(
'verifyUnconfirmedTransactions',
{},
{
jobId: 'verifyUnconfirmedTransactions',
removeOnComplete: true,
removeOnFail: true,
repeat: {
every: UNCONFIRMED_TX_CHECK_INTERVAL
}
}
)

await verifyUnconfirmedTransactionsWorker(unconfirmedTxQueue.name)
}

void main()
77 changes: 75 additions & 2 deletions jobs/workers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Worker } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
import { DEFAULT_WORKER_LOCK_DURATION, NETWORK_SLUGS_FROM_IDS } from 'constants/index'
import { multiBlockchainClient } from 'services/chronikService'
import { connectAllTransactionsToPrices } from 'services/transactionService'
import { connectAllTransactionsToPrices, fetchUnconfirmedNonOrphanedTransactions, markTransactionConfirmed, markTransactionsOrphaned } from 'services/transactionService'
import { cleanupExpiredClientPayments } from 'services/clientPaymentService'

import * as priceService from 'services/priceService'
Expand Down Expand Up @@ -88,3 +88,76 @@ export const cleanupClientPaymentsWorker = async (queueName: string): Promise<vo
console.error(`[CLIENT_PAYMENT CLEANUP] job ${job?.id as string}: FAILED — ${err.message}`)
})
}

export const verifyUnconfirmedTransactionsWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
async (job) => {
console.log(`[UNCONFIRMED TX VERIFICATION] job ${job.id as string}: checking unconfirmed transactions...`)

const unconfirmedTxs = await fetchUnconfirmedNonOrphanedTransactions()
console.log(`[UNCONFIRMED TX VERIFICATION] Found ${unconfirmedTxs.length} unconfirmed transactions to verify`)

let confirmedCount = 0
let orphanedCount = 0
const oneDayAgo = Math.floor(Date.now() / 1000) - (24 * 60 * 60)

// Group transactions by hash to avoid duplicate chronik calls
const txsByHash = new Map<string, typeof unconfirmedTxs>()
for (const tx of unconfirmedTxs) {
const existing = txsByHash.get(tx.hash) ?? []
existing.push(tx)
txsByHash.set(tx.hash, existing)
}

for (const [hash, txs] of txsByHash.entries()) {
const networkId = txs[0].address.networkId
const networkSlug = NETWORK_SLUGS_FROM_IDS[networkId]

try {
const txDetails = await multiBlockchainClient.getTransactionDetails(hash, networkSlug)

// If tx has a block, it's confirmed
if (txDetails.block?.height !== undefined && txDetails.block.height !== null) {
const blockTimestamp = Number(txDetails.block.timestamp)
await markTransactionConfirmed(hash, blockTimestamp)
confirmedCount += txs.length
console.log(`[UNCONFIRMED TX VERIFICATION] Marked tx ${hash} as confirmed`)
} else {
// Still unconfirmed - txs over a day old should always be either confirmed or orphaned
const oldestTx = txs.reduce((oldest, tx) => tx.timestamp < oldest.timestamp ? tx : oldest, txs[0])
if (oldestTx.timestamp < oneDayAgo) {
console.warn(`[UNCONFIRMED TX VERIFICATION] WARNING: tx ${hash} is over 1 day old and still unconfirmed (address: ${oldestTx.address.address})`)
}
}
} catch (err: any) {
const errMsg = String(err?.message ?? err)
const is404 = /not found in the index|404/.test(errMsg)

if (is404) {
// Transaction no longer exists on the network
await markTransactionsOrphaned(hash)
orphanedCount += txs.length
console.log(`[UNCONFIRMED TX VERIFICATION] Marked tx ${hash} as orphaned (not found)`)
} else {
console.error(`[UNCONFIRMED TX VERIFICATION] Error checking tx ${hash}: ${errMsg}`)
}
}
}

console.log(`[UNCONFIRMED TX VERIFICATION] Finished: ${confirmedCount} confirmed, ${orphanedCount} orphaned`)
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
}
)

worker.on('completed', job => {
console.log(`[UNCONFIRMED TX VERIFICATION] job ${job.id as string}: completed successfully`)
})

worker.on('failed', (job, err) => {
console.error(`[UNCONFIRMED TX VERIFICATION] job ${job?.id as string}: FAILED — ${err.message}`)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- CreateIndex
CREATE INDEX `Transaction_confirmed_idx` ON `Transaction`(`confirmed`);
1 change: 1 addition & 0 deletions prisma-local/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ model Transaction {

@@unique([hash, addressId], name: "Transaction_hash_addressId_unique_constraint")
@@index([addressId, timestamp], map: "Transaction_addressId_timestamp_idx")
@@index([confirmed], map: "Transaction_confirmed_idx")
}

model Wallet {
Expand Down
41 changes: 41 additions & 0 deletions services/transactionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,47 @@ export async function markTransactionsOrphaned (hash: string): Promise<void> {
})
}

export async function markTransactionConfirmed (hash: string, timestamp: number): Promise<void> {
await prisma.transaction.updateMany({
where: {
hash
},
data: {
confirmed: true,
timestamp
}
})
}

const includeAddressNetwork = {
address: {
select: {
networkId: true,
address: true
}
}
}

const transactionWithAddressNetwork = Prisma.validator<Prisma.TransactionDefaultArgs>()(
{ include: includeAddressNetwork }
)

export type TransactionWithAddressNetwork = Prisma.TransactionGetPayload<typeof transactionWithAddressNetwork>

export async function fetchUnconfirmedNonOrphanedTransactions (): Promise<TransactionWithAddressNetwork[]> {
const oneWeekAgo = Math.floor(Date.now() / 1000) - (7 * 24 * 60 * 60)
return await prisma.transaction.findMany({
where: {
confirmed: false,
orphaned: false,
timestamp: {
gte: oneWeekAgo
}
},
include: includeAddressNetwork
})
}

async function fetchAllTransactionsWithNoPrices (): Promise<TransactionWithNetwork[]> {
const x = await prisma.transaction.findMany({
where: {
Expand Down