diff --git a/packages/fastify-app/src/test/entitlements.test.ts b/packages/fastify-app/src/test/entitlements.test.ts index 176d3ab8..d8fa336d 100644 --- a/packages/fastify-app/src/test/entitlements.test.ts +++ b/packages/fastify-app/src/test/entitlements.test.ts @@ -59,7 +59,7 @@ describe('entitlements', () => { }, ] - await stripeSync.deleteRemovedActiveEntitlements( + await stripeSync.postgresClient.deleteRemovedActiveEntitlements( customerId, activeEntitlements.map((entitlement) => entitlement.id) ) @@ -106,7 +106,7 @@ describe('entitlements', () => { }, ] - await stripeSync.deleteRemovedActiveEntitlements( + await stripeSync.postgresClient.deleteRemovedActiveEntitlements( customerId, newActiveEntitlements.map((entitlement) => entitlement.id) ) diff --git a/packages/fastify-app/src/test/incremental-sync.test.ts b/packages/fastify-app/src/test/incremental-sync.test.ts deleted file mode 100644 index 7b5e8d75..00000000 --- a/packages/fastify-app/src/test/incremental-sync.test.ts +++ /dev/null @@ -1,808 +0,0 @@ -import type Stripe from 'stripe' -import { StripeSync, runMigrations, hashApiKey } from 'stripe-experiment-sync' -import { vitest, beforeAll, afterAll, describe, test, expect, beforeEach } from 'vitest' -import { getConfig } from '../utils/config' -import { logger } from '../logger' - -let stripeSync: StripeSync | undefined -const testAccountId = 'acct_test_account' - -// Helper to get cursor from most recent sync run (for test verification - any status) -async function getCursor(resourceName: string): Promise { - const result = await stripeSync!.postgresClient.pool.query( - `SELECT o.cursor FROM stripe._sync_obj_runs o - JOIN stripe._sync_runs r ON o."_account_id" = r."_account_id" AND o.run_started_at = r.started_at - WHERE o."_account_id" = $1 AND o.object = $2 - ORDER BY r.started_at DESC - LIMIT 1`, - [testAccountId, resourceName] - ) - return result.rows[0]?.cursor ? parseInt(result.rows[0].cursor) : null -} - -beforeAll(async () => { - const config = getConfig() - await runMigrations({ - databaseUrl: config.databaseUrl, - - logger, - }) - - stripeSync = new StripeSync({ - ...config, - poolConfig: { - connectionString: config.databaseUrl, - }, - stripeAccountId: testAccountId, - }) - - // Mock Stripe account retrieval to avoid API calls - vitest.spyOn(stripeSync.stripe.accounts, 'retrieve').mockResolvedValue({ - id: testAccountId, - object: 'account', - } as Stripe.Account) - - // Ensure test account exists in database with API key hash - const apiKeyHash = hashApiKey(config.stripeSecretKey) - await stripeSync.postgresClient.upsertAccount( - { - id: testAccountId, - raw_data: { id: testAccountId, object: 'account' }, - }, - apiKeyHash - ) -}) - -afterAll(async () => { - if (stripeSync) { - await stripeSync.postgresClient.pool.end() - } -}) - -describe('Incremental Sync', () => { - beforeEach(async () => { - // Clean up test data before each test - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.products WHERE id LIKE $1', [ - 'test_prod_%', - ]) - await stripeSync.postgresClient.deleteSyncRuns(testAccountId) - // Clear cached account so it re-fetches using the mock - stripeSync.cachedAccount = null - }) - - test('should only fetch new products on second sync', async () => { - const allProducts: Stripe.Product[] = [ - { - id: 'test_prod_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - { - id: 'test_prod_2', - object: 'product', - created: 1704988800, - name: 'Product 2', - } as Stripe.Product, - { - id: 'test_prod_3', - object: 'product', - created: 1705075200, - name: 'Product 3', - } as Stripe.Product, - ] - - const newProducts: Stripe.Product[] = [ - { - id: 'test_prod_4', - object: 'product', - created: 1705161600, - name: 'Product 4', - } as Stripe.Product, - ] - - // Mock Stripe API for first sync - returns all products - const listSpy = vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: allProducts, - has_more: false, - url: '/v1/products', - }) - - // First sync - no cursor, should fetch all - await stripeSync.syncProducts() - - expect(listSpy).toHaveBeenCalledWith({ limit: 100 }) - - // Verify products were inserted - const firstResult = await stripeSync.postgresClient.pool.query( - 'SELECT COUNT(*) FROM stripe.products WHERE id LIKE $1', - ['test_prod_%'] - ) - expect(parseInt(firstResult.rows[0].count)).toBe(3) - - // Verify cursor was saved in new observability tables - const cursor = await getCursor('products') - expect(cursor).toBe(1705075200) // Max created from first sync - - // Mock Stripe API for second sync - only returns new products - const listSpyIncremental = vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: newProducts, - has_more: false, - url: '/v1/products', - }) - - // Second sync - should use cursor - await stripeSync.syncProducts() - - expect(listSpyIncremental).toHaveBeenCalledWith({ - limit: 100, - created: { gte: 1705075200 }, - }) - - // Verify new product was inserted - const secondResult = await stripeSync.postgresClient.pool.query( - 'SELECT COUNT(*) FROM stripe.products WHERE id LIKE $1', - ['test_prod_%'] - ) - expect(parseInt(secondResult.rows[0].count)).toBe(4) - - // Verify cursor was updated in new observability tables - const newCursor = await getCursor('products') - expect(newCursor).toBe(1705161600) - }) - - test('should checkpoint cursor every 100 items', async () => { - const products: Stripe.Product[] = Array.from({ length: 250 }, (_, i) => ({ - id: `test_prod_batch_${i}`, - object: 'product' as const, - created: 1704902400 + i, - name: `Product ${i}`, - })) as Stripe.Product[] - - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - }) - - // Spy on updateObjectCursor (new observability method) - const updateSpy = vitest.spyOn(stripeSync.postgresClient, 'updateObjectCursor') - - await stripeSync.syncProducts() - - // Should update cursor 3 times: after 100, after 200, after 250 - expect(updateSpy).toHaveBeenCalledTimes(3) - // Note: updateObjectCursor takes (accountId, runStartedAt, object, cursor) - // We check that the cursor values are correct - expect(updateSpy).toHaveBeenNthCalledWith( - 1, - testAccountId, - expect.any(Date), - 'products', - String(1704902400 + 99) - ) - expect(updateSpy).toHaveBeenNthCalledWith( - 2, - testAccountId, - expect.any(Date), - 'products', - String(1704902400 + 199) - ) - expect(updateSpy).toHaveBeenNthCalledWith( - 3, - testAccountId, - expect.any(Date), - 'products', - String(1704902400 + 249) - ) - - updateSpy.mockRestore() - }) - - test('should not update cursor from webhooks', async () => { - // Initial backfill - const products: Stripe.Product[] = [ - { - id: 'test_prod_webhook_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - ] - - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - }) - - await stripeSync.syncProducts() - - const initialCursor = await getCursor('products') - expect(initialCursor).toBe(1704902400) - - // Process webhook with newer product - const webhookEvent: Stripe.Event = { - id: 'evt_test', - object: 'event', - type: 'product.updated', - data: { - object: { - id: 'test_prod_webhook_2', - object: 'product', - created: 1705161600, // Much newer - name: 'Webhook Product', - } as Stripe.Product, - }, - created: 1705248000, - } as Stripe.Event - - await stripeSync.processEvent(webhookEvent) - - // Cursor should be unchanged (webhooks don't update sync cursor) - const afterCursor = await getCursor('products') - expect(afterCursor).toBe(initialCursor) - }) - - test('should use explicit filter instead of cursor when provided', async () => { - // First do a sync to set up a cursor - const setupProducts: Stripe.Product[] = [ - { - id: 'test_prod_setup_1', - object: 'product', - created: 1704902400, - name: 'Setup Product', - } as Stripe.Product, - ] - - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: setupProducts, - has_more: false, - url: '/v1/products', - }) - await stripeSync.syncProducts() - - // Verify cursor was set - const cursor = await getCursor('products') - expect(cursor).toBe(1704902400) - - // Clean up the run so we can start a new one - await stripeSync.postgresClient.deleteSyncRuns(testAccountId) - - const products: Stripe.Product[] = [ - { - id: 'test_prod_explicit_1', - object: 'product', - created: 1672531200, - name: 'Old Product', - } as Stripe.Product, - ] - - const listSpy = vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - }) - - // Call with explicit filter (earlier than cursor) - await stripeSync.syncProducts({ - created: { gte: 1672531200 }, - }) - - // Should use explicit filter, not cursor - expect(listSpy).toHaveBeenCalledWith({ - limit: 100, - created: { gte: 1672531200 }, - }) - }) - - test('should handle sync error and preserve cursor', async () => { - const products: Stripe.Product[] = [ - { - id: 'test_prod_error_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - ] - - let callCount = 0 - let hasThrown = false - vitest.spyOn(stripeSync.stripe.products, 'list').mockImplementation(async () => { - callCount++ - if (callCount === 1 && !hasThrown) { - // First page succeeds with products - return { - object: 'list', - data: products, - has_more: true, // Indicate there's more (but second fetch will fail) - url: '/v1/products', - } - } else if (callCount === 2 && !hasThrown) { - // Second fetch fails (only on first sync) - hasThrown = true - throw new Error('Simulated sync error') - } else { - // After first sync, just return products without more pages - return { - object: 'list', - data: callCount === 1 ? products : [], - has_more: false, - url: '/v1/products', - } - } - }) - - // First attempt should fail but save checkpoint - await expect(stripeSync.syncProducts()).rejects.toThrow('Simulated sync error') - - // Cursor should be saved up to checkpoint (first product was processed successfully) - const cursor = await getCursor('products') - expect(cursor).toBe(1704902400) - - // Status should be error in the new observability tables - // Check run status from sync_runs view - const runStatus = await stripeSync.postgresClient.pool.query( - `SELECT status, error_message FROM stripe.sync_runs - WHERE account_id = $1 - ORDER BY started_at DESC - LIMIT 1`, - [testAccountId] - ) - expect(runStatus.rows[0].status).toBe('error') - expect(runStatus.rows[0].error_message).toContain('Simulated sync error') - - // Check object status from _sync_obj_runs table - const objStatus = await stripeSync.postgresClient.pool.query( - `SELECT status, error_message FROM stripe._sync_obj_runs - WHERE "_account_id" = $1 AND object = $2 - ORDER BY run_started_at DESC - LIMIT 1`, - [testAccountId, 'products'] - ) - expect(objStatus.rows[0].status).toBe('error') - expect(objStatus.rows[0].error_message).toContain('Simulated sync error') - - // Second attempt should succeed - reset callCount - callCount = 0 - await stripeSync.syncProducts() - - // Status should be complete - const finalStatus = await stripeSync.postgresClient.pool.query( - `SELECT status FROM stripe.sync_runs - WHERE account_id = $1 - ORDER BY started_at DESC - LIMIT 1`, - [testAccountId] - ) - expect(finalStatus.rows[0].status).toBe('complete') - }) - - test('should work with processUntilDone using cursor automatically', async () => { - const products: Stripe.Product[] = [ - { - id: 'test_prod_backfill_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - { - id: 'test_prod_backfill_2', - object: 'product', - created: 1704988800, - name: 'Product 2', - } as Stripe.Product, - ] - - // processUntilDone now uses processNext internally which expects { data, has_more } format - const listSpy = vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - // First sync - await stripeSync.processUntilDone({ object: 'product' }) - - expect(listSpy).toHaveBeenCalledWith({ limit: 100 }) - - // Second sync should be incremental - const newProducts: Stripe.Product[] = [ - { - id: 'test_prod_backfill_3', - object: 'product', - created: 1705075200, - name: 'Product 3', - } as Stripe.Product, - ] - - const listSpyNew = vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: newProducts, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - await stripeSync.processUntilDone({ object: 'product' }) - - expect(listSpyNew).toHaveBeenCalledWith({ - limit: 100, - created: { gte: 1704988800 }, - }) - }) -}) - -describe('processNext', () => { - beforeEach(async () => { - // Clean up test data before each test - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.products WHERE id LIKE $1', [ - 'test_prod_%', - ]) - await stripeSync.postgresClient.deleteSyncRuns(testAccountId) - // Clear cached account so it re-fetches using the mock - stripeSync.cachedAccount = null - }) - - test('should return hasMore: true when more pages exist', async () => { - const products: Stripe.Product[] = [ - { - id: 'test_prod_page_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - ] - - // Mock list to return a response with has_more: true - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: true, - url: '/v1/products', - } as Stripe.ApiList) - - const result = await stripeSync.processNext('product') - - expect(result.processed).toBe(1) - expect(result.hasMore).toBe(true) - expect(result.runStartedAt).toBeInstanceOf(Date) - }) - - test('should return hasMore: false when no more pages', async () => { - const products: Stripe.Product[] = [ - { - id: 'test_prod_page_2', - object: 'product', - created: 1704902400, - name: 'Product 2', - } as Stripe.Product, - ] - - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - const result = await stripeSync.processNext('product') - - expect(result.processed).toBe(1) - expect(result.hasMore).toBe(false) - expect(result.runStartedAt).toBeInstanceOf(Date) - }) - - test('processNext should continue historical paging on second call (starting_after), not switch into incremental mode early', async () => { - // First page - const firstPageProducts: Stripe.Product[] = [ - { - id: 'test_prod_inc_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - ] - - // Second page - const secondPageProducts: Stripe.Product[] = [ - { - id: 'test_prod_inc_2', - object: 'product', - created: 1704988800, - name: 'Product 2', - } as Stripe.Product, - ] - - // Create spy once and chain mock responses - the Proxy wrapper makes - // each access return a new function, so we need to keep the spy reference - const listSpy = vitest.spyOn(stripeSync.stripe.products, 'list') - listSpy - .mockResolvedValueOnce({ - object: 'list', - data: firstPageProducts, - has_more: true, - url: '/v1/products', - } as Stripe.ApiList) - .mockResolvedValueOnce({ - object: 'list', - data: secondPageProducts, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - await stripeSync.processNext('product') - await stripeSync.processNext('product') - - // Second call should continue paging within the same run using starting_after (page cursor). - // We should NOT switch into created.gte incremental mode mid-run (that causes "newest page only" issues). - expect(listSpy).toHaveBeenLastCalledWith({ - limit: 100, - starting_after: 'test_prod_inc_1', - }) - }) - - test('should handle empty response', async () => { - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: [], - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - const result = await stripeSync.processNext('product') - - expect(result.processed).toBe(0) - expect(result.hasMore).toBe(false) - expect(result.runStartedAt).toBeInstanceOf(Date) - }) - - test('should throw on API error', async () => { - vitest.spyOn(stripeSync.stripe.products, 'list').mockRejectedValue(new Error('API Error')) - - await expect(stripeSync.processNext('product')).rejects.toThrow('API Error') - }) -}) - -describe('processUntilDone', () => { - beforeEach(async () => { - // Clean up test data before each test - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.products WHERE id LIKE $1', [ - 'test_prod_%', - ]) - await stripeSync.postgresClient.deleteSyncRuns(testAccountId) - // Clear cached account so it re-fetches using the mock - stripeSync.cachedAccount = null - }) - - test('should sync products using processUntilDone', async () => { - const products: Stripe.Product[] = [ - { - id: 'test_prod_alias_1', - object: 'product', - created: 1704902400, - name: 'Product 1', - } as Stripe.Product, - ] - - // processUntilDone now uses processNext internally which expects { data, has_more } format - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - const result = await stripeSync.processUntilDone({ object: 'product' }) - - expect(result.products?.synced).toBe(1) - }) -}) - -describe('Bug regression tests', () => { - beforeEach(async () => { - // Clean up test data before each test - await stripeSync.postgresClient.pool.query( - 'DELETE FROM stripe.payment_intents WHERE id LIKE $1', - ['test_pi_%'] - ) - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.plans WHERE id LIKE $1', [ - 'test_plan_%', - ]) - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.products WHERE id LIKE $1', [ - 'test_prod_%', - ]) - await stripeSync.postgresClient.pool.query('DELETE FROM stripe.prices WHERE id LIKE $1', [ - 'test_price_%', - ]) - await stripeSync.postgresClient.deleteSyncRuns(testAccountId) - // Clear cached account so it re-fetches using the mock - stripeSync.cachedAccount = null - }) - - test('Bug 1: processUntilDone with payment_intent should NOT sync plans (switch fallthrough)', async () => { - // This test verifies the fix for the missing break statement bug. - // Previously case 'payment_intent' fell through to case 'plan'. - - const paymentIntents: Stripe.PaymentIntent[] = [ - { - id: 'test_pi_1', - object: 'payment_intent', - created: 1704902400, - amount: 1000, - currency: 'usd', - status: 'succeeded', - } as Stripe.PaymentIntent, - ] - - const plans: Stripe.Plan[] = [ - { - id: 'test_plan_1', - object: 'plan', - created: 1704902400, - amount: 500, - currency: 'usd', - interval: 'month', - } as Stripe.Plan, - ] - - // Mock payment_intents.list with { data, has_more } format - vitest.spyOn(stripeSync.stripe.paymentIntents, 'list').mockResolvedValue({ - object: 'list', - data: paymentIntents, - has_more: false, - url: '/v1/payment_intents', - } as Stripe.ApiList) - - // Mock plans.list with { data, has_more } format - vitest.spyOn(stripeSync.stripe.plans, 'list').mockResolvedValue({ - object: 'list', - data: plans, - has_more: false, - url: '/v1/plans', - } as Stripe.ApiList) - - // Request to sync ONLY payment_intent - await stripeSync.processUntilDone({ object: 'payment_intent' }) - - // Verify payment_intent was synced - const piResult = await stripeSync.postgresClient.pool.query( - 'SELECT COUNT(*) FROM stripe.payment_intents WHERE id = $1', - ['test_pi_1'] - ) - expect(parseInt(piResult.rows[0].count)).toBe(1) - - // Verify plan was NOT synced (bug fix verified) - const planResult = await stripeSync.postgresClient.pool.query( - 'SELECT COUNT(*) FROM stripe.plans WHERE id = $1', - ['test_plan_1'] - ) - expect(parseInt(planResult.rows[0].count)).toBe(0) - - // Also verify via observability: only payment_intents object run should exist - const objRuns = await stripeSync.postgresClient.pool.query( - `SELECT object FROM stripe._sync_obj_runs WHERE "_account_id" = $1`, - [testAccountId] - ) - const objects = objRuns.rows.map((r: { object: string }) => r.object) - expect(objects).not.toContain('plans') - }) - - test('Bug 2: separate processUntilDone calls create separate runs (expected behavior)', async () => { - // Each processUntilDone call creates and completes its own run. - // This is expected - separate calls = separate runs. - - const products: Stripe.Product[] = [ - { - id: 'test_prod_run_1', - object: 'product', - created: 1704902400, - name: 'P1', - } as Stripe.Product, - ] - const prices: Stripe.Price[] = [ - { - id: 'test_price_run_1', - object: 'price', - created: 1704902400, - unit_amount: 100, - currency: 'usd', - } as Stripe.Price, - ] - - // Mock with { data, has_more } format - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - vitest.spyOn(stripeSync.stripe.prices, 'list').mockResolvedValue({ - object: 'list', - data: prices, - has_more: false, - url: '/v1/prices', - } as Stripe.ApiList) - - // Run sync for 2 objects in separate calls - await stripeSync.processUntilDone({ object: 'product' }) - await stripeSync.processUntilDone({ object: 'price' }) - - // Count how many sync runs were created - const runResult = await stripeSync.postgresClient.pool.query( - `SELECT COUNT(*) FROM stripe._sync_runs WHERE "_account_id" = $1`, - [testAccountId] - ) - - // Each processUntilDone call creates and completes its own run = 2 runs - // This is expected behavior for separate calls. - expect(parseInt(runResult.rows[0].count)).toBe(2) - }) - - test('Bug 2 (detailed): processUntilDone should use ONE run for multiple objects', async () => { - // This tests that processUntilDone creates a single sync run - // and uses processNext internally for each object type. - - const products: Stripe.Product[] = [ - { - id: 'test_prod_detailed_1', - object: 'product', - created: 1704902400, - name: 'P1', - } as Stripe.Product, - ] - const prices: Stripe.Price[] = [ - { - id: 'test_price_detailed_1', - object: 'price', - created: 1704902400, - unit_amount: 100, - currency: 'usd', - } as Stripe.Price, - ] - - // Mock products to return data, then empty (simulating has_more: false) - vitest.spyOn(stripeSync.stripe.products, 'list').mockResolvedValue({ - object: 'list', - data: products, - has_more: false, - url: '/v1/products', - } as Stripe.ApiList) - - vitest.spyOn(stripeSync.stripe.prices, 'list').mockResolvedValue({ - object: 'list', - data: prices, - has_more: false, - url: '/v1/prices', - } as Stripe.ApiList) - - // Call processUntilDone for product, then price - should share ONE run - await stripeSync.processUntilDone({ object: 'product' }) - - // At this point the run is complete. Starting a new processUntilDone - // will create a NEW run (expected behavior - run was completed). - // The fix is that within a SINGLE processUntilDone('all') call, - // all objects share one run. - - // Count runs after first processUntilDone - const runResultAfterFirst = await stripeSync.postgresClient.pool.query( - `SELECT COUNT(*) FROM stripe._sync_runs WHERE "_account_id" = $1`, - [testAccountId] - ) - expect(parseInt(runResultAfterFirst.rows[0].count)).toBe(1) - - // Count object runs - should have 'products' object run - const objRunResult = await stripeSync.postgresClient.pool.query( - `SELECT object FROM stripe._sync_obj_runs WHERE "_account_id" = $1`, - [testAccountId] - ) - expect(objRunResult.rows.map((r: { object: string }) => r.object)).toContain('products') - }) -}) diff --git a/packages/sync-engine/package.json b/packages/sync-engine/package.json index b5912d6a..08641065 100644 --- a/packages/sync-engine/package.json +++ b/packages/sync-engine/package.json @@ -31,6 +31,7 @@ "test": "vitest", "test:integration": "TEST_POSTGRES_DB_URL=${TEST_POSTGRES_DB_URL:-postgresql://postgres:postgres@localhost:55432/postgres} vitest run src/stripeSync*integration.test.ts", "test:e2e": "vitest run --config vitest.e2e.config.ts", + "start": "pnpm run build && node dist/cli/index.js backfill", "generate:sigma-schema": "tsx src/sigma/schema/fetch-schema.ts" }, "files": [ diff --git a/packages/sync-engine/src/database/postgres.ts b/packages/sync-engine/src/database/postgres.ts index 0f0a3f65..75676a4b 100644 --- a/packages/sync-engine/src/database/postgres.ts +++ b/packages/sync-engine/src/database/postgres.ts @@ -354,6 +354,34 @@ export class PostgresClient { return counts } + async deletePlan(id: string): Promise { + return this.delete('plans', id) + } + + async deleteProduct(id: string): Promise { + return this.delete('products', id) + } + + async deleteTaxId(id: string): Promise { + return this.delete('tax_ids', id) + } + + async deletePrice(id: string): Promise { + return this.delete('prices', id) + } + + async deleteRemovedActiveEntitlements( + customerId: string, + currentActiveEntitlementIds: string[] + ): Promise<{ rowCount: number }> { + const prepared = sql(` + delete from "stripe"."active_entitlements" + where customer = :customerId and id <> ALL(:currentActiveEntitlementIds::text[]); + `)({ customerId, currentActiveEntitlementIds }) + const { rowCount } = await this.query(prepared.text, prepared.values) + return { rowCount: rowCount || 0 } + } + async deleteAccountWithCascade( accountId: string, useTransaction: boolean diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index a7df3c32..60297c74 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -7,8 +7,6 @@ import { Sync, SyncBackfill, SyncParams, - SyncEntitlementsParams, - SyncFeaturesParams, ProcessNextResult, ProcessNextParams, SyncObject, @@ -17,7 +15,6 @@ import { } from './types' import { managedWebhookSchema } from './schemas/managed_webhook' import { type PoolConfig } from 'pg' -import { createRetryableStripeClient } from './utils/stripeClientWrapper' import { hashApiKey } from './utils/hashApiKey' import { parseCsvObjects, runSigmaQueryAndDownloadCsv } from './sigma/sigmaApi' import { SIGMA_INGESTION_CONFIGS } from './sigma/sigmaIngestionConfigs' @@ -46,37 +43,24 @@ function getUniqueIds(entries: T[], key: string): string[] { return Array.from(set) } -export interface StripeSyncOptions { - databaseUrl: string - stripeApiKey: string - baseUrl: () => string - webhookPath?: string - stripeApiVersion?: string - autoExpandLists?: boolean - backfillRelatedEntities?: boolean - keepWebhooksOnShutdown?: boolean -} - -export interface StripSyncInfo { - baseUrl: string - webhookUrl: string -} - export class StripeSync { stripe: Stripe postgresClient: PostgresClient - private readonly resourceRegistry: Record + config: StripeSyncConfig + readonly resourceRegistry: Record - private get sigmaSchemaName(): string { + get sigmaSchemaName(): string { return this.config.sigmaSchemaName ?? 'sigma' } - constructor(private config: StripeSyncConfig) { + constructor(config: StripeSyncConfig) { + this.config = config // Create base Stripe client - const baseStripe = new Stripe(config.stripeSecretKey, { + this.stripe = new Stripe(config.stripeSecretKey, { // https://github.com/stripe/stripe-node#configuration // @ts-ignore apiVersion: config.stripeApiVersion, + maxNetworkRetries: 5, appInfo: { name: 'Stripe Sync Engine', version: pkg.version, @@ -85,13 +69,6 @@ export class StripeSync { }, }) - // Wrap with automatic retry logic for all API calls - // This ensures ALL Stripe operations are protected against: - // - Rate limits (429) - // - Server errors (500, 502, 503, 504, 424) - // - Connection errors (network failures) - this.stripe = createRetryableStripeClient(baseStripe, {}, config.logger) - this.config.logger = config.logger ?? console this.config.logger?.info( { autoExpandLists: config.autoExpandLists, stripeApiVersion: config.stripeApiVersion }, @@ -128,7 +105,7 @@ export class StripeSync { // Complements eventHandlers which maps event types → handlers for webhooks // Both registries share the same underlying upsert methods // Order field determines backfill sequence - parents before children for FK dependencies - private buildResourceRegistry(): Record { + buildResourceRegistry(): Record { const core: Record = { product: { order: 1, // No dependencies @@ -272,11 +249,11 @@ export class StripeSync { return { ...sigmaEntries, ...core } } - private isSigmaResource(object: string): boolean { + isSigmaResource(object: string): boolean { return Boolean(this.resourceRegistry[object]?.sigma) } - private sigmaResultKey(tableName: string): string { + sigmaResultKey(tableName: string): string { return tableName.replace(/_([a-z0-9])/g, (_, ch: string) => ch.toUpperCase()) } @@ -296,7 +273,7 @@ export class StripeSync { * @param account - Stripe account object * @param apiKeyHash - SHA-256 hash of API key to store for fast lookups */ - private async upsertAccount(account: Stripe.Account, apiKeyHash: string): Promise { + async upsertAccount(account: Stripe.Account, apiKeyHash: string): Promise { try { await this.postgresClient.upsertAccount( { @@ -479,7 +456,7 @@ export class StripeSync { // Note: Uses 'any' for event parameter to allow handlers with specific Stripe event types // (e.g., CustomerDeletedEvent, ProductDeletedEvent) which TypeScript won't accept // as contravariant parameters when using the base Stripe.Event type - private readonly eventHandlers: Record< + readonly eventHandlers: Record< string, (event: any, accountId: string) => Promise // eslint-disable-line @typescript-eslint/no-explicit-any > = { @@ -652,7 +629,7 @@ export class StripeSync { } // Event handler methods - private async handleChargeEvent(event: Stripe.Event, accountId: string): Promise { + async handleChargeEvent(event: Stripe.Event, accountId: string): Promise { const { entity: charge, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Charge, (id) => this.stripe.charges.retrieve(id), @@ -662,7 +639,7 @@ export class StripeSync { await this.upsertCharges([charge], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handleCustomerDeletedEvent( + async handleCustomerDeletedEvent( event: Stripe.CustomerDeletedEvent, accountId: string ): Promise { @@ -675,7 +652,7 @@ export class StripeSync { await this.upsertCustomers([customer], accountId, this.getSyncTimestamp(event, false)) } - private async handleCustomerEvent(event: Stripe.Event, accountId: string): Promise { + async handleCustomerEvent(event: Stripe.Event, accountId: string): Promise { const { entity: customer, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Customer | Stripe.DeletedCustomer, (id) => this.stripe.customers.retrieve(id), @@ -685,7 +662,7 @@ export class StripeSync { await this.upsertCustomers([customer], accountId, this.getSyncTimestamp(event, refetched)) } - private async handleCheckoutSessionEvent(event: Stripe.Event, accountId: string): Promise { + async handleCheckoutSessionEvent(event: Stripe.Event, accountId: string): Promise { const { entity: checkoutSession, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Checkout.Session, (id) => this.stripe.checkout.sessions.retrieve(id) @@ -699,7 +676,7 @@ export class StripeSync { ) } - private async handleSubscriptionEvent(event: Stripe.Event, accountId: string): Promise { + async handleSubscriptionEvent(event: Stripe.Event, accountId: string): Promise { const { entity: subscription, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Subscription, (id) => this.stripe.subscriptions.retrieve(id), @@ -715,7 +692,7 @@ export class StripeSync { ) } - private async handleTaxIdEvent(event: Stripe.Event, accountId: string): Promise { + async handleTaxIdEvent(event: Stripe.Event, accountId: string): Promise { const { entity: taxId, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.TaxId, (id) => this.stripe.taxIds.retrieve(id) @@ -724,13 +701,13 @@ export class StripeSync { await this.upsertTaxIds([taxId], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handleTaxIdDeletedEvent(event: Stripe.Event, _accountId: string): Promise { + async handleTaxIdDeletedEvent(event: Stripe.Event, _accountId: string): Promise { const taxId = event.data.object as Stripe.TaxId - await this.deleteTaxId(taxId.id) + await this.postgresClient.deleteTaxId(taxId.id) } - private async handleInvoiceEvent(event: Stripe.Event, accountId: string): Promise { + async handleInvoiceEvent(event: Stripe.Event, accountId: string): Promise { const { entity: invoice, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Invoice, (id) => this.stripe.invoices.retrieve(id), @@ -740,7 +717,7 @@ export class StripeSync { await this.upsertInvoices([invoice], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handleProductEvent(event: Stripe.Event, accountId: string): Promise { + async handleProductEvent(event: Stripe.Event, accountId: string): Promise { try { const { entity: product, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Product, @@ -751,23 +728,23 @@ export class StripeSync { } catch (err) { if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { const product = event.data.object as Stripe.Product - await this.deleteProduct(product.id) + await this.postgresClient.deleteProduct(product.id) } else { throw err } } } - private async handleProductDeletedEvent( + async handleProductDeletedEvent( event: Stripe.ProductDeletedEvent, _accountId: string ): Promise { const product = event.data.object - await this.deleteProduct(product.id) + await this.postgresClient.deleteProduct(product.id) } - private async handlePriceEvent(event: Stripe.Event, accountId: string): Promise { + async handlePriceEvent(event: Stripe.Event, accountId: string): Promise { try { const { entity: price, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Price, @@ -778,23 +755,23 @@ export class StripeSync { } catch (err) { if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { const price = event.data.object as Stripe.Price - await this.deletePrice(price.id) + await this.postgresClient.deletePrice(price.id) } else { throw err } } } - private async handlePriceDeletedEvent( + async handlePriceDeletedEvent( event: Stripe.PriceDeletedEvent, _accountId: string ): Promise { const price = event.data.object - await this.deletePrice(price.id) + await this.postgresClient.deletePrice(price.id) } - private async handlePlanEvent(event: Stripe.Event, accountId: string): Promise { + async handlePlanEvent(event: Stripe.Event, accountId: string): Promise { try { const { entity: plan, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Plan, @@ -805,23 +782,20 @@ export class StripeSync { } catch (err) { if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { const plan = event.data.object as Stripe.Plan - await this.deletePlan(plan.id) + await this.postgresClient.deletePlan(plan.id) } else { throw err } } } - private async handlePlanDeletedEvent( - event: Stripe.PlanDeletedEvent, - _accountId: string - ): Promise { + async handlePlanDeletedEvent(event: Stripe.PlanDeletedEvent, _accountId: string): Promise { const plan = event.data.object - await this.deletePlan(plan.id) + await this.postgresClient.deletePlan(plan.id) } - private async handleSetupIntentEvent(event: Stripe.Event, accountId: string): Promise { + async handleSetupIntentEvent(event: Stripe.Event, accountId: string): Promise { const { entity: setupIntent, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.SetupIntent, (id) => this.stripe.setupIntents.retrieve(id), @@ -836,10 +810,7 @@ export class StripeSync { ) } - private async handleSubscriptionScheduleEvent( - event: Stripe.Event, - accountId: string - ): Promise { + async handleSubscriptionScheduleEvent(event: Stripe.Event, accountId: string): Promise { const { entity: subscriptionSchedule, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.SubscriptionSchedule, (id) => this.stripe.subscriptionSchedules.retrieve(id), @@ -854,7 +825,7 @@ export class StripeSync { ) } - private async handlePaymentMethodEvent(event: Stripe.Event, accountId: string): Promise { + async handlePaymentMethodEvent(event: Stripe.Event, accountId: string): Promise { const { entity: paymentMethod, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.PaymentMethod, (id) => this.stripe.paymentMethods.retrieve(id) @@ -868,7 +839,7 @@ export class StripeSync { ) } - private async handleDisputeEvent(event: Stripe.Event, accountId: string): Promise { + async handleDisputeEvent(event: Stripe.Event, accountId: string): Promise { const { entity: dispute, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Dispute, (id) => this.stripe.disputes.retrieve(id), @@ -878,7 +849,7 @@ export class StripeSync { await this.upsertDisputes([dispute], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handlePaymentIntentEvent(event: Stripe.Event, accountId: string): Promise { + async handlePaymentIntentEvent(event: Stripe.Event, accountId: string): Promise { const { entity: paymentIntent, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.PaymentIntent, (id) => this.stripe.paymentIntents.retrieve(id), @@ -894,7 +865,7 @@ export class StripeSync { ) } - private async handleCreditNoteEvent(event: Stripe.Event, accountId: string): Promise { + async handleCreditNoteEvent(event: Stripe.Event, accountId: string): Promise { const { entity: creditNote, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.CreditNote, (id) => this.stripe.creditNotes.retrieve(id), @@ -909,10 +880,7 @@ export class StripeSync { ) } - private async handleEarlyFraudWarningEvent( - event: Stripe.Event, - accountId: string - ): Promise { + async handleEarlyFraudWarningEvent(event: Stripe.Event, accountId: string): Promise { const { entity: earlyFraudWarning, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Radar.EarlyFraudWarning, (id) => this.stripe.radar.earlyFraudWarnings.retrieve(id) @@ -926,7 +894,7 @@ export class StripeSync { ) } - private async handleRefundEvent(event: Stripe.Event, accountId: string): Promise { + async handleRefundEvent(event: Stripe.Event, accountId: string): Promise { const { entity: refund, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Refund, (id) => this.stripe.refunds.retrieve(id) @@ -935,7 +903,7 @@ export class StripeSync { await this.upsertRefunds([refund], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handleReviewEvent(event: Stripe.Event, accountId: string): Promise { + async handleReviewEvent(event: Stripe.Event, accountId: string): Promise { const { entity: review, refetched } = await this.fetchOrUseWebhookData( event.data.object as Stripe.Review, (id) => this.stripe.reviews.retrieve(id) @@ -944,10 +912,7 @@ export class StripeSync { await this.upsertReviews([review], accountId, false, this.getSyncTimestamp(event, refetched)) } - private async handleEntitlementSummaryEvent( - event: Stripe.Event, - accountId: string - ): Promise { + async handleEntitlementSummaryEvent(event: Stripe.Event, accountId: string): Promise { const activeEntitlementSummary = event.data .object as Stripe.Entitlements.ActiveEntitlementSummary let entitlements = activeEntitlementSummary.entitlements @@ -961,7 +926,7 @@ export class StripeSync { refetched = true } - await this.deleteRemovedActiveEntitlements( + await this.postgresClient.deleteRemovedActiveEntitlements( activeEntitlementSummary.customer, entitlements.data.map((entitlement) => entitlement.id) ) @@ -974,15 +939,15 @@ export class StripeSync { ) } - private getSyncTimestamp(event: Stripe.Event, refetched: boolean) { + getSyncTimestamp(event: Stripe.Event, refetched: boolean) { return refetched ? new Date().toISOString() : new Date(event.created * 1000).toISOString() } - private shouldRefetchEntity(entity: { object: string }) { + shouldRefetchEntity(entity: { object: string }) { return this.config.revalidateObjectsViaStripeApi?.includes(entity.object as RevalidateEntity) } - private async fetchOrUseWebhookData( + async fetchOrUseWebhookData( entity: T, fetchFn: (id: string) => Promise, entityInFinalState?: (entity: T) => boolean @@ -1187,7 +1152,7 @@ export class StripeSync { } } - private appendMigrationHint(error: unknown): Error { + appendMigrationHint(error: unknown): Error { const hint = 'Error occurred. Make sure you are up to date with DB migrations which can sometimes help with this. Details:' const withHint = (message: string) => (message.includes(hint) ? message : `${hint}\n${message}`) @@ -1205,7 +1170,7 @@ export class StripeSync { /** * Get the database resource name for a SyncObject type */ - private getResourceName(object: SyncObject): string { + getResourceName(object: SyncObject): string { const mapping: Record = { customer: 'customers', invoice: 'invoices', @@ -1233,7 +1198,7 @@ export class StripeSync { * Uses resourceRegistry for DRY list/upsert operations. * Uses the observable sync system for tracking progress. */ - private async fetchOnePage( + async fetchOnePage( object: Exclude, accountId: string, resourceName: string, @@ -1361,7 +1326,7 @@ export class StripeSync { } } - private async getSigmaFallbackCursorFromDestination( + async getSigmaFallbackCursorFromDestination( accountId: string, sigmaConfig: SigmaIngestionConfig ): Promise { @@ -1408,7 +1373,7 @@ export class StripeSync { return sigmaCursorFromEntry(sigmaConfig, entryForCursor) } - private async fetchOneSigmaPage( + async fetchOneSigmaPage( accountId: string, resourceName: string, runStartedAt: Date, @@ -1483,12 +1448,6 @@ export class StripeSync { return { processed: entries.length, hasMore, runStartedAt } } - /** - * Process all pages for all (or specified) object types until complete. - * - * @param params - Optional parameters for filtering and specifying object types - * @returns SyncBackfill with counts for each synced resource type - */ /** * Process all pages for a single object type until complete. * Loops processNext() internally until hasMore is false. @@ -1498,7 +1457,7 @@ export class StripeSync { * @param params - Optional sync parameters * @returns Sync result with count of synced items */ - private async processObjectUntilDone( + async processObjectUntilDone( object: Exclude, runStartedAt: Date, params?: SyncParams @@ -1582,7 +1541,7 @@ export class StripeSync { } } - private applySyncBackfillResult( + applySyncBackfillResult( results: SyncBackfill, object: Exclude, result: Sync @@ -1754,7 +1713,7 @@ export class StripeSync { /** * Internal implementation of processUntilDone with an existing run. */ - private async processUntilDoneWithRun( + async processUntilDoneWithRun( runStartedAt: Date, object: SyncObject | undefined, params?: SyncParams @@ -1789,840 +1748,7 @@ export class StripeSync { } } - /** - * Sync payment methods with an existing run (special case - iterates customers) - */ - private async syncPaymentMethodsWithRun( - runStartedAt: Date, - syncParams?: SyncParams - ): Promise { - const accountId = await this.getAccountId() - const resourceName = 'payment_methods' - - // Create object run - await this.postgresClient.createObjectRuns(accountId, runStartedAt, [resourceName]) - await this.postgresClient.tryStartObjectSync(accountId, runStartedAt, resourceName) - - try { - // Query customers from database - const prepared = sql( - `select id from "stripe"."customers" WHERE COALESCE(deleted, false) <> true;` - )([]) - - const customerIds = await this.postgresClient - .query(prepared.text, prepared.values) - .then(({ rows }) => rows.map((it) => it.id)) - - this.config.logger?.info(`Getting payment methods for ${customerIds.length} customers`) - - let synced = 0 - - // Process customers in parallel chunks (configurable concurrency) - const chunkSize = this.config.maxConcurrentCustomers ?? 10 - for (const customerIdChunk of chunkArray(customerIds, chunkSize)) { - await Promise.all( - customerIdChunk.map(async (customerId) => { - const CHECKPOINT_SIZE = 100 - let currentBatch: Stripe.PaymentMethod[] = [] - - // Manual pagination - each fetch() gets automatic retry protection - let hasMore = true - let startingAfter: string | undefined = undefined - - while (hasMore) { - const response: Stripe.ApiList = - await this.stripe.paymentMethods.list({ - limit: 100, - customer: customerId, - ...(startingAfter ? { starting_after: startingAfter } : {}), - }) - - for (const item of response.data) { - currentBatch.push(item) - if (currentBatch.length >= CHECKPOINT_SIZE) { - await this.upsertPaymentMethods( - currentBatch, - accountId, - syncParams?.backfillRelatedEntities - ) - synced += currentBatch.length - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - resourceName, - currentBatch.length - ) - currentBatch = [] - } - } - - hasMore = response.has_more - if (response.data.length > 0) { - startingAfter = response.data[response.data.length - 1].id - } - } - - // Process remaining items - if (currentBatch.length > 0) { - await this.upsertPaymentMethods( - currentBatch, - accountId, - syncParams?.backfillRelatedEntities - ) - synced += currentBatch.length - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - resourceName, - currentBatch.length - ) - } - }) - ) - } - - // Complete object run - await this.postgresClient.completeObjectSync(accountId, runStartedAt, resourceName) - - return { synced } - } catch (error) { - await this.postgresClient.failObjectSync( - accountId, - runStartedAt, - resourceName, - error instanceof Error ? error.message : 'Unknown error' - ) - throw error - } - } - - async syncProducts(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing products') - - return this.withSyncRun('products', 'syncProducts', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.ProductListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.products.list({ ...params, ...pagination }), - (products) => this.upsertProducts(products, accountId), - accountId, - 'products', - runStartedAt - ) - }) - } - - async syncPrices(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing prices') - - return this.withSyncRun('prices', 'syncPrices', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.PriceListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.prices.list({ ...params, ...pagination }), - (prices) => this.upsertPrices(prices, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'prices', - runStartedAt - ) - }) - } - - async syncPlans(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing plans') - - return this.withSyncRun('plans', 'syncPlans', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.PlanListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.plans.list({ ...params, ...pagination }), - (plans) => this.upsertPlans(plans, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'plans', - runStartedAt - ) - }) - } - - async syncCustomers(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing customers') - - return this.withSyncRun('customers', 'syncCustomers', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.CustomerListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.customers.list({ ...params, ...pagination }), - // @ts-expect-error - (items) => this.upsertCustomers(items, accountId), - accountId, - 'customers', - runStartedAt - ) - }) - } - - async syncSubscriptions(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing subscriptions') - - return this.withSyncRun('subscriptions', 'syncSubscriptions', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.SubscriptionListParams = { status: 'all', limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.subscriptions.list({ ...params, ...pagination }), - (items) => this.upsertSubscriptions(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'subscriptions', - runStartedAt - ) - }) - } - - async syncSubscriptionSchedules(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing subscription schedules') - - return this.withSyncRun( - 'subscription_schedules', - 'syncSubscriptionSchedules', - async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.SubscriptionScheduleListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.subscriptionSchedules.list({ ...params, ...pagination }), - (items) => - this.upsertSubscriptionSchedules(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'subscription_schedules', - runStartedAt - ) - } - ) - } - - async syncInvoices(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing invoices') - - return this.withSyncRun('invoices', 'syncInvoices', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.InvoiceListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.invoices.list({ ...params, ...pagination }), - (items) => this.upsertInvoices(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'invoices', - runStartedAt - ) - }) - } - - async syncCharges(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing charges') - - return this.withSyncRun('charges', 'syncCharges', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.ChargeListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.charges.list({ ...params, ...pagination }), - (items) => this.upsertCharges(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'charges', - runStartedAt - ) - }) - } - - async syncSetupIntents(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing setup_intents') - - return this.withSyncRun('setup_intents', 'syncSetupIntents', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.SetupIntentListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.setupIntents.list({ ...params, ...pagination }), - (items) => this.upsertSetupIntents(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'setup_intents', - runStartedAt - ) - }) - } - - async syncPaymentIntents(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing payment_intents') - - return this.withSyncRun( - 'payment_intents', - 'syncPaymentIntents', - async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.PaymentIntentListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.paymentIntents.list({ ...params, ...pagination }), - (items) => - this.upsertPaymentIntents(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'payment_intents', - runStartedAt - ) - } - ) - } - - async syncTaxIds(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing tax_ids') - - return this.withSyncRun('tax_ids', 'syncTaxIds', async (_cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.TaxIdListParams = { limit: 100 } - - return this.fetchAndUpsert( - (pagination) => this.stripe.taxIds.list({ ...params, ...pagination }), - (items) => this.upsertTaxIds(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'tax_ids', - runStartedAt - ) - }) - } - - async syncPaymentMethods(syncParams?: SyncParams): Promise { - // We can't filter by date here, it is also not possible to get payment methods without specifying a customer (you need Stripe Sigma for that -.-) - // Thus, we need to loop through all customers - this.config.logger?.info('Syncing payment method') - - return this.withSyncRun( - 'payment_methods', - 'syncPaymentMethods', - async (_cursor, runStartedAt) => { - const accountId = await this.getAccountId() - - // deleted is a generated column that may be NULL for non-deleted customers - // Use COALESCE to treat NULL as false, or use IS NOT TRUE to include NULL and false - const prepared = sql( - `select id from "stripe"."customers" WHERE COALESCE(deleted, false) <> true;` - )([]) - - const customerIds = await this.postgresClient - .query(prepared.text, prepared.values) - .then(({ rows }) => rows.map((it) => it.id)) - - this.config.logger?.info(`Getting payment methods for ${customerIds.length} customers`) - - let synced = 0 - - // Process customers in parallel chunks (configurable concurrency) - const chunkSize = this.config.maxConcurrentCustomers ?? 3 - for (const customerIdChunk of chunkArray(customerIds, chunkSize)) { - await Promise.all( - customerIdChunk.map(async (customerId) => { - const CHECKPOINT_SIZE = 100 - let currentBatch: Stripe.PaymentMethod[] = [] - - // Manual pagination - each fetch() gets automatic retry protection - let hasMore = true - let startingAfter: string | undefined = undefined - - while (hasMore) { - const response: Stripe.ApiList = - await this.stripe.paymentMethods.list({ - limit: 100, - customer: customerId, - ...(startingAfter ? { starting_after: startingAfter } : {}), - }) - - for (const item of response.data) { - currentBatch.push(item) - if (currentBatch.length >= CHECKPOINT_SIZE) { - await this.upsertPaymentMethods( - currentBatch, - accountId, - syncParams?.backfillRelatedEntities - ) - synced += currentBatch.length - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - 'payment_methods', - currentBatch.length - ) - currentBatch = [] - } - } - - hasMore = response.has_more - if (response.data.length > 0) { - startingAfter = response.data[response.data.length - 1].id - } - } - - // Process remaining items - if (currentBatch.length > 0) { - await this.upsertPaymentMethods( - currentBatch, - accountId, - syncParams?.backfillRelatedEntities - ) - synced += currentBatch.length - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - 'payment_methods', - currentBatch.length - ) - } - }) - ) - } - - // Mark object sync as complete (run completion handled by withSyncRun) - await this.postgresClient.completeObjectSync(accountId, runStartedAt, 'payment_methods') - - return { synced } - } - ) - } - - async syncDisputes(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing disputes') - - return this.withSyncRun('disputes', 'syncDisputes', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.DisputeListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.disputes.list({ ...params, ...pagination }), - (items) => this.upsertDisputes(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'disputes', - runStartedAt - ) - }) - } - - async syncEarlyFraudWarnings(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing early fraud warnings') - - return this.withSyncRun( - 'early_fraud_warnings', - 'syncEarlyFraudWarnings', - async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.Radar.EarlyFraudWarningListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.radar.earlyFraudWarnings.list({ ...params, ...pagination }), - (items) => - this.upsertEarlyFraudWarning(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'early_fraud_warnings', - runStartedAt - ) - } - ) - } - - async syncRefunds(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing refunds') - - return this.withSyncRun('refunds', 'syncRefunds', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.RefundListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.refunds.list({ ...params, ...pagination }), - (items) => this.upsertRefunds(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'refunds', - runStartedAt - ) - }) - } - - async syncCreditNotes(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing credit notes') - - return this.withSyncRun('credit_notes', 'syncCreditNotes', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.CreditNoteListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.creditNotes.list({ ...params, ...pagination }), - (creditNotes) => this.upsertCreditNotes(creditNotes, accountId), - accountId, - 'credit_notes', - runStartedAt - ) - }) - } - - async syncFeatures(syncParams?: SyncFeaturesParams): Promise { - this.config.logger?.info('Syncing features') - - return this.withSyncRun('features', 'syncFeatures', async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.Entitlements.FeatureListParams = { - limit: 100, - ...syncParams?.pagination, - } - return this.fetchAndUpsert( - () => this.stripe.entitlements.features.list(params), - (features) => this.upsertFeatures(features, accountId), - accountId, - 'features', - runStartedAt - ) - }) - } - - async syncEntitlements(customerId: string, syncParams?: SyncEntitlementsParams): Promise { - this.config.logger?.info('Syncing entitlements') - - return this.withSyncRun( - 'active_entitlements', - 'syncEntitlements', - async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.Entitlements.ActiveEntitlementListParams = { - customer: customerId, - limit: 100, - ...syncParams?.pagination, - } - return this.fetchAndUpsert( - () => this.stripe.entitlements.activeEntitlements.list(params), - (entitlements) => this.upsertActiveEntitlements(customerId, entitlements, accountId), - accountId, - 'active_entitlements', - runStartedAt - ) - } - ) - } - - async syncCheckoutSessions(syncParams?: SyncParams): Promise { - this.config.logger?.info('Syncing checkout sessions') - - return this.withSyncRun( - 'checkout_sessions', - 'syncCheckoutSessions', - async (cursor, runStartedAt) => { - const accountId = await this.getAccountId() - const params: Stripe.Checkout.SessionListParams = { limit: 100 } - - if (syncParams?.created) { - params.created = syncParams.created - } else if (cursor) { - params.created = { gte: cursor } - this.config.logger?.info(`Incremental sync from cursor: ${cursor}`) - } - - return this.fetchAndUpsert( - (pagination) => this.stripe.checkout.sessions.list({ ...params, ...pagination }), - (items) => - this.upsertCheckoutSessions(items, accountId, syncParams?.backfillRelatedEntities), - accountId, - 'checkout_sessions', - runStartedAt - ) - } - ) - } - - /** - * Helper to wrap a sync operation in the observable sync system. - * Creates/gets a sync run, sets up the object run, gets cursor, and handles completion. - * - * @param resourceName - The resource being synced (e.g., 'products', 'customers') - * @param triggeredBy - What triggered this sync (for observability) - * @param fn - The sync function to execute, receives cursor and runStartedAt - * @returns The result of the sync function - */ - private async withSyncRun( - resourceName: string, - triggeredBy: string, - fn: (cursor: number | null, runStartedAt: Date) => Promise - ): Promise { - const accountId = await this.getAccountId() - - // Get cursor from LAST COMPLETED sync (for incremental sync) - const lastCursor = await this.postgresClient.getLastCompletedCursor(accountId, resourceName) - const cursor = lastCursor ? parseInt(lastCursor) : null - - // Get or create sync run - const runKey = await this.postgresClient.getOrCreateSyncRun(accountId, triggeredBy) - if (!runKey) { - // Race condition - get active run - const activeRun = await this.postgresClient.getActiveSyncRun(accountId) - if (!activeRun) { - throw new Error('Failed to get or create sync run') - } - throw new Error('Another sync is already running for this account') - } - - const { runStartedAt } = runKey - - // Create and start object run - await this.postgresClient.createObjectRuns(accountId, runStartedAt, [resourceName]) - await this.postgresClient.tryStartObjectSync(accountId, runStartedAt, resourceName) - - try { - const result = await fn(cursor, runStartedAt) - - // Complete the sync run - await this.postgresClient.completeObjectSync(accountId, runStartedAt, resourceName) - - return result - } catch (error) { - // Fail the sync run - await this.postgresClient.failObjectSync( - accountId, - runStartedAt, - resourceName, - error instanceof Error ? error.message : 'Unknown error' - ) - throw error - } - } - - private async fetchAndUpsert( - fetch: (params?: { starting_after?: string }) => Promise>, - upsert: (items: T[], accountId: string) => Promise, - accountId: string, - resourceName: string, - runStartedAt: Date - ): Promise { - const CHECKPOINT_SIZE = 100 // Match Stripe page size - let totalSynced = 0 - let currentBatch: T[] = [] - - try { - this.config.logger?.info('Fetching items to sync from Stripe') - - try { - let hasMore = true - let startingAfter: string | undefined = undefined - - // Manual pagination loop - each fetch() call gets automatic retry protection - while (hasMore) { - const response = await fetch( - startingAfter ? { starting_after: startingAfter } : undefined - ) - - for (const item of response.data) { - currentBatch.push(item) - - // Checkpoint every 100 items (1 Stripe page) - if (currentBatch.length >= CHECKPOINT_SIZE) { - this.config.logger?.info(`Upserting batch of ${currentBatch.length} items`) - await upsert(currentBatch, accountId) - totalSynced += currentBatch.length - - // Update progress and cursor with max created from this batch - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - resourceName, - currentBatch.length - ) - const maxCreated = Math.max( - ...currentBatch.map((i) => (i as { created?: number }).created || 0) - ) - if (maxCreated > 0) { - await this.postgresClient.updateObjectCursor( - accountId, - runStartedAt, - resourceName, - String(maxCreated) - ) - this.config.logger?.info(`Checkpoint: cursor updated to ${maxCreated}`) - } - - currentBatch = [] - } - } - - hasMore = response.has_more - if (response.data.length > 0) { - startingAfter = response.data[response.data.length - 1].id - } - } - - // Process remaining items - if (currentBatch.length > 0) { - this.config.logger?.info(`Upserting final batch of ${currentBatch.length} items`) - await upsert(currentBatch, accountId) - totalSynced += currentBatch.length - - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - resourceName, - currentBatch.length - ) - const maxCreated = Math.max( - ...currentBatch.map((i) => (i as { created?: number }).created || 0) - ) - if (maxCreated > 0) { - await this.postgresClient.updateObjectCursor( - accountId, - runStartedAt, - resourceName, - String(maxCreated) - ) - } - } - } catch (error) { - // Save partial progress before re-throwing - if (currentBatch.length > 0) { - this.config.logger?.info( - `Error occurred, saving partial progress: ${currentBatch.length} items` - ) - await upsert(currentBatch, accountId) - totalSynced += currentBatch.length - - await this.postgresClient.incrementObjectProgress( - accountId, - runStartedAt, - resourceName, - currentBatch.length - ) - const maxCreated = Math.max( - ...currentBatch.map((i) => (i as { created?: number }).created || 0) - ) - if (maxCreated > 0) { - await this.postgresClient.updateObjectCursor( - accountId, - runStartedAt, - resourceName, - String(maxCreated) - ) - } - } - throw error - } - - await this.postgresClient.completeObjectSync(accountId, runStartedAt, resourceName) - - this.config.logger?.info(`Sync complete: ${totalSynced} items synced`) - return { synced: totalSynced } - } catch (error) { - await this.postgresClient.failObjectSync( - accountId, - runStartedAt, - resourceName, - error instanceof Error ? error.message : 'Unknown error' - ) - throw error - } - } - - private async upsertCharges( + async upsertCharges( charges: Stripe.Charge[], accountId: string, backfillRelatedEntities?: boolean, @@ -2647,26 +1773,29 @@ export class StripeSync { ) } - private async backfillCharges(chargeIds: string[], accountId: string) { + async backfillCharges(chargeIds: string[], accountId: string) { const missingChargeIds = await this.postgresClient.findMissingEntries('charges', chargeIds) - await this.fetchMissingEntities(missingChargeIds, (id) => + this.postgresClient.deleteRemovedActiveEntitlements(accountId, missingChargeIds) + const charges = await this.fetchMissingEntities(missingChargeIds, (id) => this.stripe.charges.retrieve(id) - ).then((charges) => this.upsertCharges(charges, accountId)) + ) + return this.upsertCharges(charges, accountId) } - private async backfillPaymentIntents(paymentIntentIds: string[], accountId: string) { + async backfillPaymentIntents(paymentIntentIds: string[], accountId: string) { const missingIds = await this.postgresClient.findMissingEntries( 'payment_intents', paymentIntentIds ) - await this.fetchMissingEntities(missingIds, (id) => + const paymentIntents = await this.fetchMissingEntities(missingIds, (id) => this.stripe.paymentIntents.retrieve(id) - ).then((paymentIntents) => this.upsertPaymentIntents(paymentIntents, accountId)) + ) + await this.upsertPaymentIntents(paymentIntents, accountId) } - private async upsertCreditNotes( + async upsertCreditNotes( creditNotes: Stripe.CreditNote[], accountId: string, backfillRelatedEntities?: boolean, @@ -2813,12 +1942,15 @@ export class StripeSync { async backfillCustomers(customerIds: string[], accountId: string) { const missingIds = await this.postgresClient.findMissingEntries('customers', customerIds) - await this.fetchMissingEntities(missingIds, (id) => this.stripe.customers.retrieve(id)) - .then((entries) => this.upsertCustomers(entries, accountId)) - .catch((err) => { - this.config.logger?.error(err, 'Failed to backfill') - throw err - }) + try { + const customers = await this.fetchMissingEntities(missingIds, (id) => + this.stripe.customers.retrieve(id) + ) + await this.upsertCustomers(customers, accountId) + } catch (err) { + this.config.logger?.error(err, 'Failed to backfill') + throw err + } } async upsertDisputes( @@ -2864,18 +1996,20 @@ export class StripeSync { ) } - backfillInvoices = async (invoiceIds: string[], accountId: string) => { + async backfillInvoices(invoiceIds: string[], accountId: string) { const missingIds = await this.postgresClient.findMissingEntries('invoices', invoiceIds) - await this.fetchMissingEntities(missingIds, (id) => this.stripe.invoices.retrieve(id)).then( - (entries) => this.upsertInvoices(entries, accountId) + const invoices = await this.fetchMissingEntities(missingIds, (id) => + this.stripe.invoices.retrieve(id) ) + await this.upsertInvoices(invoices, accountId) } - backfillPrices = async (priceIds: string[], accountId: string) => { + async backfillPrices(priceIds: string[], accountId: string) { const missingIds = await this.postgresClient.findMissingEntries('prices', priceIds) - await this.fetchMissingEntities(missingIds, (id) => this.stripe.prices.retrieve(id)).then( - (entries) => this.upsertPrices(entries, accountId) + const prices = await this.fetchMissingEntities(missingIds, (id) => + this.stripe.prices.retrieve(id) ) + await this.upsertPrices(prices, accountId) } async upsertPlans( @@ -2896,10 +2030,6 @@ export class StripeSync { ) } - async deletePlan(id: string): Promise { - return this.postgresClient.delete('plans', id) - } - async upsertPrices( prices: Stripe.Price[], accountId: string, @@ -2918,10 +2048,6 @@ export class StripeSync { ) } - async deletePrice(id: string): Promise { - return this.postgresClient.delete('prices', id) - } - async upsertProducts( products: Stripe.Product[], accountId: string, @@ -2935,16 +2061,13 @@ export class StripeSync { ) } - async deleteProduct(id: string): Promise { - return this.postgresClient.delete('products', id) - } - async backfillProducts(productIds: string[], accountId: string) { const missingProductIds = await this.postgresClient.findMissingEntries('products', productIds) - await this.fetchMissingEntities(missingProductIds, (id) => + const products = await this.fetchMissingEntities(missingProductIds, (id) => this.stripe.products.retrieve(id) - ).then((products) => this.upsertProducts(products, accountId)) + ) + await this.upsertProducts(products, accountId) } async upsertPaymentIntents( @@ -3022,10 +2145,6 @@ export class StripeSync { ) } - async deleteTaxId(id: string): Promise { - return this.postgresClient.delete('tax_ids', id) - } - async upsertSubscriptionItems( subscriptionItems: Stripe.SubscriptionItem[], accountId: string, @@ -3223,18 +2342,6 @@ export class StripeSync { return rows } - async deleteRemovedActiveEntitlements( - customerId: string, - currentActiveEntitlementIds: string[] - ): Promise<{ rowCount: number }> { - const prepared = sql(` - delete from "stripe"."active_entitlements" - where customer = :customerId and id <> ALL(:currentActiveEntitlementIds::text[]); - `)({ customerId, currentActiveEntitlementIds }) - const { rowCount } = await this.postgresClient.query(prepared.text, prepared.values) - return { rowCount: rowCount || 0 } - } - async upsertFeatures( features: Stripe.Entitlements.Feature[], accountId: string, @@ -3250,14 +2357,15 @@ export class StripeSync { async backfillFeatures(featureIds: string[], accountId: string) { const missingFeatureIds = await this.postgresClient.findMissingEntries('features', featureIds) - await this.fetchMissingEntities(missingFeatureIds, (id) => - this.stripe.entitlements.features.retrieve(id) - ) - .then((features) => this.upsertFeatures(features, accountId)) - .catch((err) => { - this.config.logger?.error(err, 'Failed to backfill features') - throw err - }) + try { + const features = await this.fetchMissingEntities(missingFeatureIds, (id) => + this.stripe.entitlements.features.retrieve(id) + ) + await this.upsertFeatures(features, accountId) + } catch (err) { + this.config.logger?.error(err, 'Failed to backfill features') + throw err + } } async upsertActiveEntitlements( @@ -3508,29 +2616,29 @@ export class StripeSync { subscriptionIds ) - await this.fetchMissingEntities(missingSubscriptionIds, (id) => + const subscriptions = await this.fetchMissingEntities(missingSubscriptionIds, (id) => this.stripe.subscriptions.retrieve(id) - ).then((subscriptions) => this.upsertSubscriptions(subscriptions, accountId)) + ) + await this.upsertSubscriptions(subscriptions, accountId) } - backfillSubscriptionSchedules = async (subscriptionIds: string[], accountId: string) => { + async backfillSubscriptionSchedules(subscriptionIds: string[], accountId: string) { const missingSubscriptionIds = await this.postgresClient.findMissingEntries( 'subscription_schedules', subscriptionIds ) - await this.fetchMissingEntities(missingSubscriptionIds, (id) => + const subscriptionSchedules = await this.fetchMissingEntities(missingSubscriptionIds, (id) => this.stripe.subscriptionSchedules.retrieve(id) - ).then((subscriptionSchedules) => - this.upsertSubscriptionSchedules(subscriptionSchedules, accountId) ) + await this.upsertSubscriptionSchedules(subscriptionSchedules, accountId) } /** * Stripe only sends the first 10 entries by default, the option will actively fetch all entries. * Uses manual pagination - each fetch() gets automatic retry protection. */ - private async expandEntity< + async expandEntity< K extends { id?: string }, P extends keyof T, T extends { id?: string } & { [key in P]?: Stripe.ApiList | null }, @@ -3572,7 +2680,7 @@ export class StripeSync { } } - private async fetchMissingEntities( + async fetchMissingEntities( ids: string[], fetch: (id: string) => Promise> ): Promise { @@ -3596,11 +2704,3 @@ export class StripeSync { await this.postgresClient.pool.end() } } - -function chunkArray(array: T[], chunkSize: number): T[][] { - const result: T[][] = [] - for (let i = 0; i < array.length; i += chunkSize) { - result.push(array.slice(i, i + chunkSize)) - } - return result -}