diff --git a/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx new file mode 100644 index 000000000..5b72b5fff --- /dev/null +++ b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx @@ -0,0 +1,46 @@ +import {WaitingPlugin, PluginType} from '@segment/analytics-react-native'; + +import type { + SegmentAPISettings, + SegmentEvent, + UpdateType, +} from '@segment/analytics-react-native'; + +/** + * Example WaitingPlugin that demonstrates how to pause event processing + * until an async operation completes. + * + * Use cases: + * - Waiting for IDFA/advertising ID permissions + * - Initializing native SDKs or modules + * - Loading required configuration from remote sources + * + * The plugin automatically pauses event processing when added to the client. + * Call resume() when your async operation completes to start processing events. + */ +export class ExampleWaitingPlugin extends WaitingPlugin { + type = PluginType.enrichment; + tracked = false; + + /** + * Called when settings are updated from Segment. + * For initial settings, we simulate an async operation and then resume. + */ + update(_settings: SegmentAPISettings, type: UpdateType) { + if (type === UpdateType.initial) { + // Simulate async work (e.g., requesting permissions, loading data) + setTimeout(() => { + // Resume event processing once async work is complete + this.resume(); + }, 3000); + } + } + + /** + * Called for track events + */ + track(event: SegmentEvent) { + this.tracked = true; + return event; + } +} diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 1ded23b6a..eeac0de07 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -72,6 +72,7 @@ import { translateHTTPError, } from './errors'; import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; +import { WaitingPlugin } from './plugin'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -120,6 +121,10 @@ export class SegmentClient { * Access or subscribe to client enabled */ readonly enabled: Watchable & Settable; + /** + * Access or subscribe to running state (controls event processing) + */ + readonly running: Watchable & Settable; /** * Access or subscribe to client context */ @@ -258,6 +263,12 @@ export class SegmentClient { onChange: this.store.enabled.onChange, }; + this.running = { + get: this.store.running.get, + set: this.store.running.set, + onChange: this.store.running.onChange, + }; + // add segment destination plugin unless // asked not to via configuration. if (this.config.autoAddSegmentDestination === true) { @@ -295,7 +306,6 @@ export class SegmentClient { if ((await this.store.isReady.get(true)) === false) { await this.storageReady(); } - // Get new settings from segment // It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins // which make use of the settings object to initialize @@ -309,7 +319,8 @@ export class SegmentClient { ]); await this.onReady(); this.isReady.value = true; - + // Set running to true to start event processing + await this.store.running.set(true); // Process all pending events await this.processPendingEvents(); // Trigger manual flush @@ -465,7 +476,6 @@ export class SegmentClient { settings ); } - if (!this.isReady.value) { this.pluginsToAdd.push(plugin); } else { @@ -476,6 +486,11 @@ export class SegmentClient { private addPlugin(plugin: Plugin) { plugin.configure(this); this.timeline.add(plugin); + //check for waiting plugin here + if (plugin instanceof WaitingPlugin) { + this.pauseEventProcessingForPlugin(plugin); + } + this.triggerOnPluginLoaded(plugin); } @@ -494,6 +509,11 @@ export class SegmentClient { if (this.enabled.get() === false) { return; } + if (!this.running.get()) { + // If not running, queue the event for later processing + await this.store.pendingEvents.add(event); + return event; + } if (this.isReady.value) { return this.startTimelineProcessing(event); } else { @@ -512,7 +532,7 @@ export class SegmentClient { ): Promise { const event = await this.applyContextData(incomingEvent); this.flushPolicyExecuter.notify(event); - return this.timeline.process(event); + return await this.timeline.process(event); } private async trackDeepLinks() { @@ -1027,4 +1047,82 @@ export class SegmentClient { return totalEventsCount; } + private resumeTimeoutId?: ReturnType; + private waitingPlugins = new Set(); + + /** + * Pause event processing for a specific WaitingPlugin. + * Events will be buffered until all waiting plugins resume. + * + * @param plugin - The WaitingPlugin requesting the pause + * @internal This is called automatically when a WaitingPlugin is added + */ + pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.add(plugin); + } + this.pauseEventProcessing(); + } + + /** + * Resume event processing for a specific WaitingPlugin. + * If all waiting plugins have resumed, buffered events will be processed. + * + * @param plugin - The WaitingPlugin that has completed its async work + * @internal This is called automatically when a WaitingPlugin calls resume() + */ + async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.delete(plugin); + } + if (this.waitingPlugins.size > 0) { + return; // still blocked by other waiting plugins + } + + await this.resumeEventProcessing(); + } + + /** + * Pause event processing globally. + * New events will be buffered in memory until resumeEventProcessing() is called. + * Automatically resumes after the specified timeout to prevent permanent blocking. + * + * @param timeout - Milliseconds to wait before auto-resuming (default: 30000) + */ + pauseEventProcessing(timeout = 30000) { + // IMPORTANT: ignore repeated pauses + const running = this.store.running.get(); + if (!running) { + return; + } + + // Fire-and-forget: state is updated synchronously in-memory, persistence happens async + void this.store.running.set(false); + + // Only set timeout if not already set (prevents multiple waiting plugins from overwriting) + if (!this.resumeTimeoutId) { + this.resumeTimeoutId = setTimeout(async () => { + await this.resumeEventProcessing(); + }, timeout); + } + } + + /** + * Resume event processing and process all buffered events. + * This is called automatically by WaitingPlugins when they complete, + * or after the timeout expires. + */ + async resumeEventProcessing() { + const running = this.store.running.get(); + if (running) { + return; + } + + if (this.resumeTimeoutId) { + clearTimeout(this.resumeTimeoutId); + this.resumeTimeoutId = undefined; + } + await this.store.running.set(true); + await this.processPendingEvents(); + } } diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19f..393385209 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin { key = ''; timeline = new Timeline(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + store: any; private hasSettings() { return this.analytics?.settings.get()?.[this.key] !== undefined; } - private isEnabled(event: SegmentEvent): boolean { + protected isEnabled(event: SegmentEvent): boolean { let customerDisabled = false; if (event.integrations?.[this.key] === false) { customerDisabled = true; @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin { if (analytics) { plugin.configure(analytics); } + + if (analytics && plugin instanceof WaitingPlugin) { + analytics.pauseEventProcessingForPlugin(plugin); + } this.timeline.add(plugin); return plugin; } @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin { type: PluginType.before, event, }); - if (beforeResult === undefined) { return; } @@ -210,3 +215,84 @@ export class UtilityPlugin extends EventPlugin {} // For internal platform-specific bits export class PlatformPlugin extends Plugin {} + +export { PluginType }; + +/** + * WaitingPlugin - A base class for plugins that need to pause event processing + * until an asynchronous operation completes. + * + * When a WaitingPlugin is added to the Analytics client, it automatically pauses + * event processing. Events are buffered in memory until the plugin calls resume(). + * If resume() is not called within 30 seconds, event processing automatically resumes. + * + * @example + * ```typescript + * class IDFAPlugin extends WaitingPlugin { + * type = PluginType.enrichment; + * + * configure(analytics: SegmentClient) { + * super.configure(analytics); + * // Request IDFA permission + * requestTrackingPermission().then((status) => { + * if (status === 'authorized') { + * // Add IDFA to context + * } + * this.resume(); // Resume event processing + * }); + * } + * + * track(event: SegmentEvent) { + * // Enrich event with IDFA if available + * return event; + * } + * } + * ``` + * + * Common use cases: + * - Waiting for user permissions (IDFA, location, notifications) + * - Initializing native SDKs that provide enrichment data + * - Loading remote configuration required for event processing + * - Waiting for authentication state before sending events + * + * @remarks + * Multiple WaitingPlugins can be active simultaneously. Event processing + * only resumes when ALL waiting plugins have called resume() or timed out. + * + * WaitingPlugins can be added at any plugin type (before, enrichment, destination). + * They can also be added to DestinationPlugins to pause only that destination's + * event processing. + */ +export class WaitingPlugin extends Plugin { + constructor() { + super(); + } + + /** + * Configure the plugin with the Analytics client. + * Automatically pauses event processing when called. + * Override this method to perform async initialization, then call resume(). + * + * @param analytics - The Analytics client instance + */ + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + /** + * Manually pause event processing. + * Generally not needed as adding a WaitingPlugin automatically pauses processing. + */ + pause() { + this.analytics?.pauseEventProcessingForPlugin(this); + } + + /** + * Resume event processing for this plugin. + * Call this method when your async operation completes. + * If all WaitingPlugins have resumed, buffered events will be processed. + */ + async resume() { + await this.analytics?.resumeEventProcessingForPlugin(this); + } +} diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 1580ee288..6f46e8b49 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -73,6 +73,13 @@ export class QueueFlushingPlugin extends UtilityPlugin { * Calls the onFlush callback with the events in the queue */ async flush() { + // Check if event processing is running + const running = this.analytics?.running.get(); + if (running === false) { + this.analytics?.logger.info('Event processing is paused, skipping flush'); + return; + } + // Wait for the queue to be restored try { await this.isRestored; diff --git a/packages/core/src/plugins/__tests__/Waiting.test.ts b/packages/core/src/plugins/__tests__/Waiting.test.ts new file mode 100644 index 000000000..aad4a62ca --- /dev/null +++ b/packages/core/src/plugins/__tests__/Waiting.test.ts @@ -0,0 +1,361 @@ +import { SegmentClient } from '../../analytics'; +import { DestinationPlugin } from '../../plugin'; + +//import { SegmentDestination } from '../SegmentDestination'; +import { + ExampleWaitingPlugin, + ExampleWaitingPlugin1, + getMockLogger, + ManualResumeWaitingPlugin, + MockSegmentStore, + StubDestinationPlugin, +} from '../../test-helpers'; + +jest.useFakeTimers(); + +// Type for accessing internal client properties in tests +type ClientWithInternals = SegmentClient & { + isReady: { value: boolean }; +}; + +describe('WaitingPlugin', () => { + const store = new MockSegmentStore(); + const baseConfig = { + writeKey: 'test-key', + flushAt: 1, + flushInterval: 0, + trackAppLifecycleEvents: false, + autoAddSegmentDestination: false, + }; + + beforeEach(() => { + store.reset(); + jest.clearAllMocks(); + jest.clearAllTimers(); + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('test resume after timeout', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + await client.running.set(true); + expect(client.running.get()).toBe(true); + + client.pauseEventProcessing(1000); + + expect(client.running.get()).toBe(false); + + jest.advanceTimersByTime(2000); + + // Allow microtasks from setTimeout → resumeEventProcessing + await Promise.resolve(); + + expect(await client.running.get(true)).toBe(true); + }); + test('test manual resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + await client.running.set(true); + expect(client.running.get()).toBe(true); + + client.pauseEventProcessing(); + + expect(client.running.get()).toBe(false); + + await client.resumeEventProcessing(); + + expect(await client.running.get(true)).toBe(true); + }); + test('pause does not dispatch timeout if already paused', async () => { + const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + await client.running.set(true); + + client.pauseEventProcessing(); + client.pauseEventProcessing(); + client.pauseEventProcessing(); + + expect(setTimeoutSpy).toHaveBeenCalledTimes(1); + }); + test('WaitingPlugin makes analytics wait', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + await client.running.set(true); + expect(client.running.get()).toBe(true); + + const plugin = new ExampleWaitingPlugin(); + + client.add({ plugin }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const trackSpy = jest.spyOn(client as any, 'startTimelineProcessing'); + + client.track('foo'); + + expect(client.running.get()).toBe(false); + + // Event should NOT be processed while paused + expect(trackSpy).not.toHaveBeenCalled(); + + await Promise.resolve(); + + jest.advanceTimersByTime(1000); + await Promise.resolve(); + + expect(await client.running.get(true)).toBe(true); + + // Event should now be processed + expect(trackSpy).toHaveBeenCalledTimes(1); + }); + + test('timeout force resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + await client.running.set(true); + expect(client.running.get()).toBe(true); + + const waitingPlugin = new ManualResumeWaitingPlugin(); + client.add({ plugin: waitingPlugin }); + + client.track('foo'); + + expect(client.running.get()).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await jest.advanceTimersByTimeAsync(30000); + + await Promise.resolve(); + await Promise.resolve(); + + expect(await client.running.get(true)).toBe(true); + expect(waitingPlugin.tracked).toBe(true); + }); + test('multiple WaitingPlugins', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + // Initially, analytics is running + await client.running.set(true); + expect(client.running.get()).toBe(true); + + // Create two waiting plugins + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + // Add plugins to client + client.add({ plugin: plugin1 }); + client.add({ plugin: plugin2 }); + + // Track an event while waiting plugins are active + client.track('foo'); + + // Client should now be paused + expect(client.running.get()).toBe(false); + + // Plugins should not have tracked the event yet + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the first plugin + await plugin1.resume(); + // Advance timers to simulate any internal delays + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Still paused because plugin2 is waiting + expect(client.running.get()).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the second plugin + await plugin2.resume(); + // Advance timers to flush + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Now analytics should be running + expect(await client.running.get(true)).toBe(true); + // Both plugins should have tracked the event + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); + test('WaitingPlugin makes analytics to wait on DestinationPlugin', async () => { + jest.useFakeTimers(); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + // Initially, analytics is running + await client.running.set(true); + expect(client.running.get()).toBe(true); + const waitingPlugin = new ExampleWaitingPlugin1(); + const stubDestinationPlugin: DestinationPlugin = + new StubDestinationPlugin(); + // Add destination to analytics + client.add({ plugin: stubDestinationPlugin }); + // Add waiting plugin inside destination + stubDestinationPlugin.add(waitingPlugin); + + // Track event + await client.track('foo'); + + // Analytics should pause + expect(client.running.get()).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + await Promise.resolve(); + + jest.advanceTimersByTime(30000); + await jest.runAllTimersAsync(); + + // 🔑 flush remaining promise chains + await Promise.resolve(); + await Promise.resolve(); + + // Analytics resumed + expect(await client.running.get(true)).toBe(true); + // Waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + + jest.useRealTimers(); + }); + test('timeout force resume on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + // analytics running initially + await client.running.set(true); + expect(client.running.get()).toBe(true); + + const waitingPlugin = new ExampleWaitingPlugin1(); // no manual resume + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + + // add destination + client.add({ plugin: destinationPlugin }); + + // add waiting plugin inside destination + destinationPlugin.add(waitingPlugin); + + // track event + await client.track('foo'); + + // analytics should pause + expect(client.running.get()).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await Promise.resolve(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + + await Promise.resolve(); + await Promise.resolve(); + + // analytics resumed + expect(await client.running.get(true)).toBe(true); + + // waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + }); + test('test multiple WaitingPlugin on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Set isReady so plugins can be added immediately + (client as ClientWithInternals).isReady.value = true; + + // analytics running initially + await client.running.set(true); + expect(client.running.get()).toBe(true); + + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + client.add({ plugin: destinationPlugin }); + + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + destinationPlugin.add(plugin1); + destinationPlugin.add(plugin2); + + // track event + await client.track('foo'); + + // analytics paused + expect(client.running.get()).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + // Resume the first plugin + await plugin1.resume(); + + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // still paused because plugin2 not resumed + expect(client.running.get()).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + plugin2.resume(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + await Promise.resolve(); + // analytics resumed + expect(await client.running.get(true)).toBe(true); + + // both plugins executed + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); +}); diff --git a/packages/core/src/storage/sovranStorage.ts b/packages/core/src/storage/sovranStorage.ts index d85db7ae5..a78bdfac8 100644 --- a/packages/core/src/storage/sovranStorage.ts +++ b/packages/core/src/storage/sovranStorage.ts @@ -41,6 +41,7 @@ type Data = { filters: DestinationFilters; pendingEvents: SegmentEvent[]; enabled: boolean; + running: boolean; }; const INITIAL_VALUES: Data = { @@ -56,6 +57,7 @@ const INITIAL_VALUES: Data = { }, pendingEvents: [], enabled: true, + running: false, }; const isEverythingReady = (state: ReadinessStore) => @@ -190,6 +192,9 @@ export class SovranStorage implements Storage { readonly enabledStore: Store<{ enabled: boolean }>; readonly enabled: Watchable & Settable; + readonly runningStore: Store<{ running: boolean }>; + readonly running: Watchable & Settable; + constructor(config: StorageConfig) { this.storeId = config.storeId; this.storePersistor = config.storePersistor; @@ -201,6 +206,7 @@ export class SovranStorage implements Storage { hasRestoredFilters: false, hasRestoredPendingEvents: false, hasRestoredEnabled: false, + hasRestoredRunning: false, }); const markAsReadyGenerator = (key: keyof ReadinessStore) => () => { @@ -536,6 +542,46 @@ export class SovranStorage implements Storage { }, }; + this.runningStore = createStore( + { running: INITIAL_VALUES.running }, + { + persist: { + storeId: `${this.storeId}-running`, + persistor: this.storePersistor, + saveDelay: this.storePersistorSaveDelay, + onInitialized: markAsReadyGenerator('hasRestoredRunning'), + }, + } + ); + // Accessor object for running + this.running = { + get: createGetter( + () => { + const state = this.runningStore.getState(); + return state.running; + }, + async () => { + const value = await this.runningStore.getState(true); + return value.running; + } + ), + + onChange: (callback: (value: boolean) => void) => { + return this.runningStore.subscribe((store) => { + callback(store.running); + }); + }, + + set: async (value: boolean | ((prev: boolean) => boolean)) => { + const { running } = await this.runningStore.dispatch((state) => { + const newRunning = + value instanceof Function ? value(state.running) : value; + return { running: newRunning }; + }); + return running; + }, + }; + this.fixAnonymousId(); } diff --git a/packages/core/src/storage/types.ts b/packages/core/src/storage/types.ts index 91955b715..734a1b945 100644 --- a/packages/core/src/storage/types.ts +++ b/packages/core/src/storage/types.ts @@ -60,6 +60,7 @@ export interface ReadinessStore { hasRestoredFilters: boolean; hasRestoredPendingEvents: boolean; hasRestoredEnabled: boolean; + hasRestoredRunning: boolean; } /** @@ -93,6 +94,7 @@ export interface Storage { Settable & Queue; readonly enabled: Watchable & Settable; + readonly running: Watchable & Settable; } export type DeepLinkData = { referring_application: string; diff --git a/packages/core/src/test-helpers/exampleWaitingPlugin.ts b/packages/core/src/test-helpers/exampleWaitingPlugin.ts new file mode 100644 index 000000000..c11a544b5 --- /dev/null +++ b/packages/core/src/test-helpers/exampleWaitingPlugin.ts @@ -0,0 +1,83 @@ +import { + PluginType, + SegmentEvent, + SegmentAPISettings, + UpdateType, +} from '../types'; +import { SegmentClient } from '../analytics'; +import { DestinationPlugin, WaitingPlugin } from '../plugin'; + +/** + * Example WaitingPlugin that automatically resumes after 1 second. + * Used for testing the waiting plugin mechanism. + */ +export class ExampleWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + super.configure(analytics); + // Simulate async work (network, native module init, etc.) + setTimeout(() => { + this.resume(); + }, 1000); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + super.execute(event); + this.tracked = true; + return event; + } +} + +/** + * Example WaitingPlugin that resumes after 3 seconds when settings are updated. + * Mimics the Kotlin SDK's ExampleWaitingPlugin behavior. + */ +export class ExampleWaitingPlugin1 extends WaitingPlugin { + public type = PluginType.before; + public tracked = false; + + update(_settings: SegmentAPISettings, type: UpdateType) { + if (type === UpdateType.initial) { + // Simulate async initialization that takes 3 seconds + setTimeout(() => { + this.resume(); + }, 3000); + } + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + this.tracked = true; + return event; + } +} + +/** + * Example WaitingPlugin that requires manual resume() call. + * Used for testing manual control of event processing. + */ +export class ManualResumeWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + this.tracked = true; + return event; + } +} + +/** + * Stub destination plugin for testing. + * Always enabled and accepts all events. + */ +export class StubDestinationPlugin extends DestinationPlugin { + key = 'StubDestination'; + protected isEnabled(_event: SegmentEvent): boolean { + return true; + } +} diff --git a/packages/core/src/test-helpers/index.ts b/packages/core/src/test-helpers/index.ts index d8a6aff04..bd33b90ab 100644 --- a/packages/core/src/test-helpers/index.ts +++ b/packages/core/src/test-helpers/index.ts @@ -5,3 +5,4 @@ export * from './mockSegmentStore'; export * from './mockTimeline'; export * from './setupSegmentClient'; export * from './utils'; +export * from './exampleWaitingPlugin'; diff --git a/packages/core/src/test-helpers/mockSegmentStore.ts b/packages/core/src/test-helpers/mockSegmentStore.ts index 2b16f2bbd..c4d46901d 100644 --- a/packages/core/src/test-helpers/mockSegmentStore.ts +++ b/packages/core/src/test-helpers/mockSegmentStore.ts @@ -25,6 +25,7 @@ import { createGetter } from '../storage/helpers'; export type StoreData = { isReady: boolean; enabled: boolean; + running: boolean; context?: DeepPartial; settings: SegmentAPIIntegrations; consentSettings?: SegmentAPIConsentSettings; @@ -38,6 +39,7 @@ export type StoreData = { const INITIAL_VALUES: StoreData = { isReady: true, enabled: true, + running: true, context: undefined, settings: { [SEGMENT_DESTINATION_KEY]: {}, @@ -93,6 +95,7 @@ export class MockSegmentStore implements Storage { deepLinkData: createCallbackManager(), pendingEvents: createCallbackManager(), enabled: createCallbackManager(), + running: createCallbackManager(), }; readonly isReady = { @@ -123,6 +126,23 @@ export class MockSegmentStore implements Storage { }, }; + readonly running = { + get: createMockStoreGetter(() => { + return this.data.running; + }), + onChange: (_callback: (value: boolean) => void) => { + return () => { + return; + }; + }, + set: async (value: boolean | ((prev: boolean) => boolean)) => { + this.data.running = + value instanceof Function ? value(this.data.running ?? true) : value; + this.callbacks.running.run(this.data.running); + return this.data.running; + }, + }; + readonly context: Watchable | undefined> & Settable> = { get: createMockStoreGetter(() => ({ ...this.data.context })),