From 3265beb40a4c55194f1fbf3d277391ada28a64e0 Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Wed, 26 Nov 2025 15:56:30 +0530 Subject: [PATCH 1/8] chore: added waiting plugin --- packages/core/src/analytics.ts | 65 +++++++++++++++++++++++++++- packages/core/src/plugins/Waiting.ts | 54 +++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/plugins/Waiting.ts diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 1ded23b6a..349c566b9 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -72,6 +72,7 @@ import { translateHTTPError, } from './errors'; import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; +import { WaitingPlugin } from './plugins/Waiting'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -97,6 +98,11 @@ export class SegmentClient { private isAddingPlugins = false; private timeline: Timeline; + // running state (matches Kotlin's running flag) + private isRunning = true; + + // Waiting plugin instance (buffers events while paused) + private waitingPlugin?: WaitingPlugin; private pluginsToAdd: Plugin[] = []; @@ -200,6 +206,18 @@ export class SegmentClient { this.store = store; this.timeline = new Timeline(); + // create and add waiting plugin immediately so early events get buffered. + try { + this.waitingPlugin = new WaitingPlugin(); + // add directly to timeline via addPlugin to ensure configure() is called immediately + this.addPlugin(this.waitingPlugin); + // initial running state false until init completes (mirrors Kotlin semantics) + this.isRunning = false; + } catch (e) { + // if WaitingPlugin instantiation or add fails, fallback to running=true + this.isRunning = true; + } + // Initialize the watchables this.context = { get: this.store.context.get, @@ -296,6 +314,9 @@ export class SegmentClient { await this.storageReady(); } + // Pause pipeline at init start (buffer events until init completes) + this.pauseEventProcessing(); + // Get new settings from segment // It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins // which make use of the settings object to initialize @@ -309,7 +330,8 @@ export class SegmentClient { ]); await this.onReady(); this.isReady.value = true; - + // Resume pipeline before processing pending events so WaitingPlugin flushes + await this.resumeEventProcessing(); // Process all pending events await this.processPendingEvents(); // Trigger manual flush @@ -1027,4 +1049,45 @@ export class SegmentClient { return totalEventsCount; } + /* + * Running / pause/resume helpers (Kotlin parity) + */ + + public running() { + return this.isRunning; + } + /** + * Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin. + * An auto-resume will be scheduled after `timeout` ms. + */ + public pauseEventProcessing(timeout = 30000) { + if (!this.isRunning) { + return; + } + + this.isRunning = false; + try { + this.waitingPlugin?.pause(); + } catch { + // ignore if plugin not present + } + + // auto-resume after timeout to avoid permanent blocking + setTimeout(() => { + void this.resumeEventProcessing(); + }, timeout); + } + public async resumeEventProcessing() { + if (this.isRunning) { + return; + } + + this.isRunning = true; + + try { + await this.waitingPlugin?.resume(); + } catch { + // ignore plugin errors during resume + } + } } diff --git a/packages/core/src/plugins/Waiting.ts b/packages/core/src/plugins/Waiting.ts new file mode 100644 index 000000000..848960cf7 --- /dev/null +++ b/packages/core/src/plugins/Waiting.ts @@ -0,0 +1,54 @@ +import { SegmentClient } from 'src'; +import { Plugin } from 'src/plugin'; +import { PluginType, SegmentEvent } from 'src/types'; + +/** + * WaitingPlugin + * Buffers events when paused and releases them when resumed. + */ +export class WaitingPlugin extends Plugin { + public type = PluginType.before; + private paused = true; + private buffer: SegmentEvent[] = []; + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + isPaused() { + return this.paused; + } + + pause() { + this.paused = true; + } + + async resume() { + if (!this.paused) { + return; + } + + this.paused = false; + + const events = [...this.buffer]; + this.buffer = []; + + for (const event of events) { + try { + if (this.analytics !== undefined) { + await this.analytics.process(event); + } + } catch (err) { + // Ignore individual errors + } + } + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + if (this.paused) { + this.buffer.push(event); + return undefined; + } + return event; + } +} From 8bb3074b2a93b12ce36c4d6e42d1fffc5fee2b16 Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Wed, 24 Dec 2025 13:18:35 +0530 Subject: [PATCH 2/8] feat: waiting plugin add waiting plugin interface. any plugin implement this interface will pause event processing when it's added to analytics any plugin implement this interface can pause and resume event processing as needed --- .../plugins/ExampleWaitingPlugin.tsx | 33 ++ packages/core/src/analytics.ts | 109 +++--- packages/core/src/plugin.ts | 33 +- packages/core/src/plugins/Waiting.ts | 54 --- .../src/plugins/__tests__/Waiting.test.ts | 338 ++++++++++++++++++ .../src/test-helpers/exampleWaitingPlugin.ts | 62 ++++ packages/core/src/test-helpers/index.ts | 2 + 7 files changed, 525 insertions(+), 106 deletions(-) create mode 100644 examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx delete mode 100644 packages/core/src/plugins/Waiting.ts create mode 100644 packages/core/src/plugins/__tests__/Waiting.test.ts create mode 100644 packages/core/src/test-helpers/exampleWaitingPlugin.ts diff --git a/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx new file mode 100644 index 000000000..1015966e6 --- /dev/null +++ b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx @@ -0,0 +1,33 @@ +import { + WaitingPlugin, + PluginType, + Plugin, + +} from '@segment/analytics-react-native'; + +import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native'; +export class ExampleWaitingPlugin extends WaitingPlugin { + type = PluginType.enrichment; + analytics = undefined; + tracked = false; + + /** + * Called when settings are updated + */ + update(_settings: SegmentAPISettings, _type: UpdateType) { + if (this.type === PluginType.before) { + // delay 3 seconds, then resume event processing + setTimeout(() => { + this.resume(); + }, 3000); + } + } + + /** + * Called for track events + */ + track(event: SegmentEvent) { + this.tracked = true; + return event; + } +} \ No newline at end of file diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 349c566b9..8975eb76a 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -72,7 +72,7 @@ import { translateHTTPError, } from './errors'; import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; -import { WaitingPlugin } from './plugins/Waiting'; +import { WaitingPlugin } from './plugin'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -98,11 +98,6 @@ export class SegmentClient { private isAddingPlugins = false; private timeline: Timeline; - // running state (matches Kotlin's running flag) - private isRunning = true; - - // Waiting plugin instance (buffers events while paused) - private waitingPlugin?: WaitingPlugin; private pluginsToAdd: Plugin[] = []; @@ -206,17 +201,17 @@ export class SegmentClient { this.store = store; this.timeline = new Timeline(); - // create and add waiting plugin immediately so early events get buffered. - try { - this.waitingPlugin = new WaitingPlugin(); - // add directly to timeline via addPlugin to ensure configure() is called immediately - this.addPlugin(this.waitingPlugin); - // initial running state false until init completes (mirrors Kotlin semantics) - this.isRunning = false; - } catch (e) { - // if WaitingPlugin instantiation or add fails, fallback to running=true - this.isRunning = true; - } + // // create and add waiting plugin immediately so early events get buffered. + // try { + // this.waitingPlugin = new WaitingPlugin(); + // // add directly to timeline via addPlugin to ensure configure() is called immediately + // this.addPlugin(this.waitingPlugin); + // // initial running state false until init completes (mirrors Kotlin semantics) + // this.isRunning = false; + // } catch (e) { + // // if WaitingPlugin instantiation or add fails, fallback to running=true + // this.isRunning = true; + // } // Initialize the watchables this.context = { @@ -313,10 +308,6 @@ export class SegmentClient { if ((await this.store.isReady.get(true)) === false) { await this.storageReady(); } - - // Pause pipeline at init start (buffer events until init completes) - this.pauseEventProcessing(); - // Get new settings from segment // It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins // which make use of the settings object to initialize @@ -330,8 +321,6 @@ export class SegmentClient { ]); await this.onReady(); this.isReady.value = true; - // Resume pipeline before processing pending events so WaitingPlugin flushes - await this.resumeEventProcessing(); // Process all pending events await this.processPendingEvents(); // Trigger manual flush @@ -487,10 +476,11 @@ export class SegmentClient { settings ); } - - if (!this.isReady.value) { + console.log('!this.isReady.value', !this.isReady.value); + if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) { this.pluginsToAdd.push(plugin); } else { + console.log('this.addPlugin'); this.addPlugin(plugin); } } @@ -498,6 +488,12 @@ export class SegmentClient { private addPlugin(plugin: Plugin) { plugin.configure(this); this.timeline.add(plugin); + //check for waiting plugin here + if (plugin instanceof WaitingPlugin) { + console.log('add plugin'); + this.pauseEventProcessingForPlugin(plugin); + } + this.triggerOnPluginLoaded(plugin); } @@ -534,7 +530,7 @@ export class SegmentClient { ): Promise { const event = await this.applyContextData(incomingEvent); this.flushPolicyExecuter.notify(event); - return this.timeline.process(event); + return await this.timeline.process(event); } private async trackDeepLinks() { @@ -1049,45 +1045,58 @@ export class SegmentClient { return totalEventsCount; } - /* - * Running / pause/resume helpers (Kotlin parity) - */ + private resumeTimeoutId?: ReturnType; + private waitingPlugins = new Set(); - public running() { - return this.isRunning; - } /** * Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin. * An auto-resume will be scheduled after `timeout` ms. */ - public pauseEventProcessing(timeout = 30000) { - if (!this.isRunning) { + pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.add(plugin); + } + this.pauseEventProcessing(); + } + async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.delete(plugin); + } + if (this.waitingPlugins.size > 0) { + return; // still blocked + } + + await this.resumeEventProcessing(); + } + + pauseEventProcessing(timeout = 30000) { + // IMPORTANT: ignore repeated pauses + if (!this.isReady.value) { return; } - this.isRunning = false; - try { - this.waitingPlugin?.pause(); - } catch { - // ignore if plugin not present + this.isReady.value = false; + + // clear previous timeout if any + if (this.resumeTimeoutId) { + clearTimeout(this.resumeTimeoutId); } - // auto-resume after timeout to avoid permanent blocking - setTimeout(() => { - void this.resumeEventProcessing(); + this.resumeTimeoutId = setTimeout(async () => { + await this.resumeEventProcessing(); }, timeout); } - public async resumeEventProcessing() { - if (this.isRunning) { + async resumeEventProcessing() { + if (this.isReady.value) { return; } - this.isRunning = true; - - try { - await this.waitingPlugin?.resume(); - } catch { - // ignore plugin errors during resume + if (this.resumeTimeoutId) { + clearTimeout(this.resumeTimeoutId); + this.resumeTimeoutId = undefined; } + // this.waitingPlugins.clear(); + this.isReady.value = true; + await this.processPendingEvents(); } } diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19f..d1bd1840c 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin { key = ''; timeline = new Timeline(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + store: any; private hasSettings() { return this.analytics?.settings.get()?.[this.key] !== undefined; } - private isEnabled(event: SegmentEvent): boolean { + protected isEnabled(event: SegmentEvent): boolean { let customerDisabled = false; if (event.integrations?.[this.key] === false) { customerDisabled = true; @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin { if (analytics) { plugin.configure(analytics); } + + if (analytics && plugin instanceof WaitingPlugin) { + analytics.pauseEventProcessingForPlugin(plugin); + } this.timeline.add(plugin); return plugin; } @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin { type: PluginType.before, event, }); - if (beforeResult === undefined) { return; } @@ -210,3 +215,27 @@ export class UtilityPlugin extends EventPlugin {} // For internal platform-specific bits export class PlatformPlugin extends Plugin {} + +export { PluginType }; + +/** + * WaitingPlugin + * Buffers events when paused and releases them when resumed. + */ +export class WaitingPlugin extends Plugin { + constructor() { + super(); + } + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + pause() { + this.analytics?.pauseEventProcessingForPlugin(this); + } + + async resume() { + await this.analytics?.resumeEventProcessingForPlugin(this); + } +} diff --git a/packages/core/src/plugins/Waiting.ts b/packages/core/src/plugins/Waiting.ts deleted file mode 100644 index 848960cf7..000000000 --- a/packages/core/src/plugins/Waiting.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { SegmentClient } from 'src'; -import { Plugin } from 'src/plugin'; -import { PluginType, SegmentEvent } from 'src/types'; - -/** - * WaitingPlugin - * Buffers events when paused and releases them when resumed. - */ -export class WaitingPlugin extends Plugin { - public type = PluginType.before; - private paused = true; - private buffer: SegmentEvent[] = []; - - configure(analytics: SegmentClient) { - super.configure(analytics); - } - - isPaused() { - return this.paused; - } - - pause() { - this.paused = true; - } - - async resume() { - if (!this.paused) { - return; - } - - this.paused = false; - - const events = [...this.buffer]; - this.buffer = []; - - for (const event of events) { - try { - if (this.analytics !== undefined) { - await this.analytics.process(event); - } - } catch (err) { - // Ignore individual errors - } - } - } - - execute(event: SegmentEvent): SegmentEvent | undefined { - if (this.paused) { - this.buffer.push(event); - return undefined; - } - return event; - } -} diff --git a/packages/core/src/plugins/__tests__/Waiting.test.ts b/packages/core/src/plugins/__tests__/Waiting.test.ts new file mode 100644 index 000000000..d97a7e87e --- /dev/null +++ b/packages/core/src/plugins/__tests__/Waiting.test.ts @@ -0,0 +1,338 @@ +import { SegmentClient } from '../../analytics'; +import { DestinationPlugin } from '../../plugin'; + +//import { SegmentDestination } from '../SegmentDestination'; +import { + ExampleWaitingPlugin, + ExampleWaitingPlugin1, + getMockLogger, + ManualResumeWaitingPlugin, + MockSegmentStore, + StubDestinationPlugin, +} from '../../test-helpers'; + +jest.useFakeTimers(); + +describe('WaitingPlugin', () => { + const store = new MockSegmentStore(); + const baseConfig = { + writeKey: 'test-key', + flushAt: 1, + flushInterval: 0, + trackAppLifecycleEvents: false, + autoAddSegmentDestination: false, + }; + + beforeEach(() => { + store.reset(); + jest.clearAllMocks(); + jest.clearAllTimers(); + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('test resume after timeout', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + client.pauseEventProcessing(1000); + + expect(client.isReady.value).toBe(false); + + jest.advanceTimersByTime(2000); + + // Allow microtasks from setTimeout → resumeEventProcessing + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + }); + test('test manual resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + client.pauseEventProcessing(); + + expect(client.isReady.value).toBe(false); + + await client.resumeEventProcessing(); + + expect(client.isReady.value).toBe(true); + }); + test('pause does not dispatch timeout if already paused', () => { + const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + + client.pauseEventProcessing(); + client.pauseEventProcessing(); + client.pauseEventProcessing(); + + expect(setTimeoutSpy).toHaveBeenCalledTimes(1); + }); + test('WaitingPlugin makes analytics wait', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const plugin = new ExampleWaitingPlugin(); + + client.add({ plugin }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const trackSpy = jest.spyOn(client as any, 'startTimelineProcessing'); + + client.track('foo'); + + expect(client.isReady.value).toBe(false); + + // Event should NOT be processed while paused + expect(trackSpy).not.toHaveBeenCalled(); + + await Promise.resolve(); + + jest.advanceTimersByTime(1000); + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + + // Event should now be processed + expect(trackSpy).toHaveBeenCalledTimes(1); + }); + + test('timeout force resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const waitingPlugin = new ManualResumeWaitingPlugin(); + client.add({ plugin: waitingPlugin }); + + client.track('foo'); + + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await jest.advanceTimersByTimeAsync(30000); + + await Promise.resolve(); + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + expect(waitingPlugin.tracked).toBe(true); + }); + test('multiple WaitingPlugins', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Initially, analytics is running + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + // Create two waiting plugins + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + // Add plugins to client + client.add({ plugin: plugin1 }); + client.add({ plugin: plugin2 }); + + // Track an event while waiting plugins are active + client.track('foo'); + + // Client should now be paused + expect(client.isReady.value).toBe(false); + + // Plugins should not have tracked the event yet + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the first plugin + await plugin1.resume(); + // Advance timers to simulate any internal delays + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Still paused because plugin2 is waiting + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the second plugin + await plugin2.resume(); + // Advance timers to flush + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Now analytics should be running + expect(client.isReady.value).toBe(true); + // Both plugins should have tracked the event + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); + test('WaitingPlugin makes analytics to wait on DestinationPlugin', async () => { + jest.useFakeTimers(); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Initially, analytics is running + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + const waitingPlugin = new ExampleWaitingPlugin1(); + const stubDestinationPlugin: DestinationPlugin = + new StubDestinationPlugin(); + // Add destination to analytics + client.add({ plugin: stubDestinationPlugin }); + // Add waiting plugin inside destination + stubDestinationPlugin.add(waitingPlugin); + + // Track event + await client.track('foo'); + + // Analytics should pause + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + await Promise.resolve(); + + jest.advanceTimersByTime(30000); + await jest.runAllTimersAsync(); + + // 🔑 flush remaining promise chains + await Promise.resolve(); + await Promise.resolve(); + + // Analytics resumed + expect(client.isReady.value).toBe(true); + // Waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + + jest.useRealTimers(); + }); + test('timeout force resume on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // analytics running initially + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const waitingPlugin = new ExampleWaitingPlugin1(); // no manual resume + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + + // add destination + client.add({ plugin: destinationPlugin }); + + // add waiting plugin inside destination + destinationPlugin.add(waitingPlugin); + + // track event + await client.track('foo'); + + // analytics should pause + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await Promise.resolve(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + + await Promise.resolve(); + await Promise.resolve(); + + // analytics resumed + expect(client.isReady.value).toBe(true); + + // waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + }); + test('test multiple WaitingPlugin on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // analytics running initially + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + client.add({ plugin: destinationPlugin }); + + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + destinationPlugin.add(plugin1); + destinationPlugin.add(plugin2); + + // track event + await client.track('foo'); + + // analytics paused + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + // Resume the first plugin + await plugin1.resume(); + + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // still paused because plugin2 not resumed + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + plugin2.resume(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + await Promise.resolve(); + // analytics resumed + expect(client.isReady.value).toBe(true); + + // both plugins executed + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); +}); diff --git a/packages/core/src/test-helpers/exampleWaitingPlugin.ts b/packages/core/src/test-helpers/exampleWaitingPlugin.ts new file mode 100644 index 000000000..d47c02b02 --- /dev/null +++ b/packages/core/src/test-helpers/exampleWaitingPlugin.ts @@ -0,0 +1,62 @@ +import { PluginType, SegmentEvent } from '../types'; +import { SegmentClient } from '../analytics'; +import { DestinationPlugin, WaitingPlugin } from '../plugin'; + +export class ExampleWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + console.log('exampleWaitingPlugin configure'); + super.configure(analytics); + // Simulate async work (network, native module init, etc.) + setTimeout(() => { + console.log('ExampleWaitingPlugin: ready!'); + void analytics.resumeEventProcessing(); + }, 1000); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + super.execute(event); + this.tracked = true; + return event; + } +} +export class ExampleWaitingPlugin1 extends WaitingPlugin { + public type = PluginType.before; + public tracked = false; + + constructor() { + super(); + } + configure(analytics: SegmentClient) { + super.configure(analytics); + } + execute(event: SegmentEvent): SegmentEvent | undefined { + console.log('ExampleWaitingPlugin1 received event', event.type); + this.tracked = true; + return event; + } +} + +export class ManualResumeWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + console.log('ManualResumeWaitingPlugin received event', event.type); + this.tracked = true; + return event; + } +} + +export class StubDestinationPlugin extends DestinationPlugin { + key = 'StubDestination'; + protected isEnabled(_event: SegmentEvent): boolean { + return true; + } +} diff --git a/packages/core/src/test-helpers/index.ts b/packages/core/src/test-helpers/index.ts index d8a6aff04..90b8d350a 100644 --- a/packages/core/src/test-helpers/index.ts +++ b/packages/core/src/test-helpers/index.ts @@ -5,3 +5,5 @@ export * from './mockSegmentStore'; export * from './mockTimeline'; export * from './setupSegmentClient'; export * from './utils'; +export * from './exampleWaitingPlugin'; + From f1d725e85ed02b7e686ec719d03c2ca136ca8002 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 02:26:22 +0000 Subject: [PATCH 3/8] Initial plan From aff9877e203db60bc44c5ad1394a83fa28a47837 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 02:32:15 +0000 Subject: [PATCH 4/8] Implement review feedback: use running state instead of isReady Co-authored-by: abueide <19354425+abueide@users.noreply.github.com> --- packages/core/src/analytics.ts | 65 ++++++++++------- .../core/src/plugins/QueueFlushingPlugin.ts | 7 ++ .../src/plugins/__tests__/Waiting.test.ts | 72 +++++++++---------- packages/core/src/storage/sovranStorage.ts | 46 ++++++++++++ packages/core/src/storage/types.ts | 2 + 5 files changed, 129 insertions(+), 63 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 8975eb76a..0d4ad1c31 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -121,6 +121,10 @@ export class SegmentClient { * Access or subscribe to client enabled */ readonly enabled: Watchable & Settable; + /** + * Access or subscribe to running state (controls event processing) + */ + readonly running: Watchable & Settable; /** * Access or subscribe to client context */ @@ -201,17 +205,19 @@ export class SegmentClient { this.store = store; this.timeline = new Timeline(); - // // create and add waiting plugin immediately so early events get buffered. - // try { - // this.waitingPlugin = new WaitingPlugin(); - // // add directly to timeline via addPlugin to ensure configure() is called immediately - // this.addPlugin(this.waitingPlugin); - // // initial running state false until init completes (mirrors Kotlin semantics) - // this.isRunning = false; - // } catch (e) { - // // if WaitingPlugin instantiation or add fails, fallback to running=true - // this.isRunning = true; - // } + /* + * create and add waiting plugin immediately so early events get buffered. + * try { + * this.waitingPlugin = new WaitingPlugin(); + * // add directly to timeline via addPlugin to ensure configure() is called immediately + * this.addPlugin(this.waitingPlugin); + * // initial running state false until init completes (mirrors Kotlin semantics) + * this.isRunning = false; + * } catch (e) { + * // if WaitingPlugin instantiation or add fails, fallback to running=true + * this.isRunning = true; + * } + */ // Initialize the watchables this.context = { @@ -271,6 +277,12 @@ export class SegmentClient { onChange: this.store.enabled.onChange, }; + this.running = { + get: this.store.running.get, + set: this.store.running.set, + onChange: this.store.running.onChange, + }; + // add segment destination plugin unless // asked not to via configuration. if (this.config.autoAddSegmentDestination === true) { @@ -321,6 +333,8 @@ export class SegmentClient { ]); await this.onReady(); this.isReady.value = true; + // Set running to true to start event processing + await this.store.running.set(true); // Process all pending events await this.processPendingEvents(); // Trigger manual flush @@ -476,11 +490,9 @@ export class SegmentClient { settings ); } - console.log('!this.isReady.value', !this.isReady.value); - if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) { + if (!this.isReady.value) { this.pluginsToAdd.push(plugin); } else { - console.log('this.addPlugin'); this.addPlugin(plugin); } } @@ -1048,7 +1060,7 @@ export class SegmentClient { private resumeTimeoutId?: ReturnType; private waitingPlugins = new Set(); - /** + /* * Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin. * An auto-resume will be scheduled after `timeout` ms. */ @@ -1071,23 +1083,23 @@ export class SegmentClient { pauseEventProcessing(timeout = 30000) { // IMPORTANT: ignore repeated pauses - if (!this.isReady.value) { + const running = this.store.running.get(); + if (!running) { return; } - this.isReady.value = false; + void this.store.running.set(false); - // clear previous timeout if any - if (this.resumeTimeoutId) { - clearTimeout(this.resumeTimeoutId); + // Only set timeout if not already set (prevents multiple waiting plugins from overwriting) + if (!this.resumeTimeoutId) { + this.resumeTimeoutId = setTimeout(async () => { + await this.resumeEventProcessing(); + }, timeout); } - - this.resumeTimeoutId = setTimeout(async () => { - await this.resumeEventProcessing(); - }, timeout); } async resumeEventProcessing() { - if (this.isReady.value) { + const running = this.store.running.get(); + if (running) { return; } @@ -1095,8 +1107,7 @@ export class SegmentClient { clearTimeout(this.resumeTimeoutId); this.resumeTimeoutId = undefined; } - // this.waitingPlugins.clear(); - this.isReady.value = true; + await this.store.running.set(true); await this.processPendingEvents(); } } diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 1580ee288..645593adc 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -73,6 +73,13 @@ export class QueueFlushingPlugin extends UtilityPlugin { * Calls the onFlush callback with the events in the queue */ async flush() { + // Check if event processing is running + const running = this.analytics?.running.get(); + if (!running) { + this.analytics?.logger.info('Event processing is paused, skipping flush'); + return; + } + // Wait for the queue to be restored try { await this.isRestored; diff --git a/packages/core/src/plugins/__tests__/Waiting.test.ts b/packages/core/src/plugins/__tests__/Waiting.test.ts index d97a7e87e..dcaa4a59d 100644 --- a/packages/core/src/plugins/__tests__/Waiting.test.ts +++ b/packages/core/src/plugins/__tests__/Waiting.test.ts @@ -41,19 +41,19 @@ describe('WaitingPlugin', () => { store, }); - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); client.pauseEventProcessing(1000); - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); jest.advanceTimersByTime(2000); // Allow microtasks from setTimeout → resumeEventProcessing await Promise.resolve(); - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); }); test('test manual resume', async () => { const client = new SegmentClient({ @@ -62,18 +62,18 @@ describe('WaitingPlugin', () => { store, }); - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); client.pauseEventProcessing(); - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); await client.resumeEventProcessing(); - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); }); - test('pause does not dispatch timeout if already paused', () => { + test('pause does not dispatch timeout if already paused', async () => { const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); const client = new SegmentClient({ @@ -82,7 +82,7 @@ describe('WaitingPlugin', () => { store, }); - client.isReady.value = true; + await client.store.running.set(true); client.pauseEventProcessing(); client.pauseEventProcessing(); @@ -97,8 +97,8 @@ describe('WaitingPlugin', () => { store, }); - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); const plugin = new ExampleWaitingPlugin(); @@ -109,7 +109,7 @@ describe('WaitingPlugin', () => { client.track('foo'); - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); // Event should NOT be processed while paused expect(trackSpy).not.toHaveBeenCalled(); @@ -119,7 +119,7 @@ describe('WaitingPlugin', () => { jest.advanceTimersByTime(1000); await Promise.resolve(); - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); // Event should now be processed expect(trackSpy).toHaveBeenCalledTimes(1); @@ -132,15 +132,15 @@ describe('WaitingPlugin', () => { store, }); - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); const waitingPlugin = new ManualResumeWaitingPlugin(); client.add({ plugin: waitingPlugin }); client.track('foo'); - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await jest.advanceTimersByTimeAsync(30000); @@ -148,7 +148,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); await Promise.resolve(); - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); expect(waitingPlugin.tracked).toBe(true); }); test('multiple WaitingPlugins', async () => { @@ -159,8 +159,8 @@ describe('WaitingPlugin', () => { }); // Initially, analytics is running - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); // Create two waiting plugins const plugin1 = new ExampleWaitingPlugin1(); @@ -174,7 +174,7 @@ describe('WaitingPlugin', () => { client.track('foo'); // Client should now be paused - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); // Plugins should not have tracked the event yet expect(plugin1.tracked).toBe(false); @@ -187,7 +187,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Still paused because plugin2 is waiting - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); @@ -198,7 +198,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Now analytics should be running - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); // Both plugins should have tracked the event expect(plugin1.tracked).toBe(true); expect(plugin2.tracked).toBe(true); @@ -213,8 +213,8 @@ describe('WaitingPlugin', () => { }); // Initially, analytics is running - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); const waitingPlugin = new ExampleWaitingPlugin1(); const stubDestinationPlugin: DestinationPlugin = new StubDestinationPlugin(); @@ -227,7 +227,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // Analytics should pause - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await Promise.resolve(); @@ -239,7 +239,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Analytics resumed - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); // Waiting plugin executed expect(waitingPlugin.tracked).toBe(true); @@ -253,8 +253,8 @@ describe('WaitingPlugin', () => { }); // analytics running initially - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); const waitingPlugin = new ExampleWaitingPlugin1(); // no manual resume const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); @@ -269,7 +269,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // analytics should pause - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await Promise.resolve(); @@ -281,7 +281,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // analytics resumed - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); // waiting plugin executed expect(waitingPlugin.tracked).toBe(true); @@ -294,8 +294,8 @@ describe('WaitingPlugin', () => { }); // analytics running initially - client.isReady.value = true; - expect(client.isReady.value).toBe(true); + await client.store.running.set(true); + expect(client.store.running.get()).toBe(true); const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); client.add({ plugin: destinationPlugin }); @@ -310,7 +310,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // analytics paused - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); // Resume the first plugin @@ -320,7 +320,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // still paused because plugin2 not resumed - expect(client.isReady.value).toBe(false); + expect(client.store.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); plugin2.resume(); @@ -329,7 +329,7 @@ describe('WaitingPlugin', () => { await jest.runAllTimersAsync(); await Promise.resolve(); // analytics resumed - expect(client.isReady.value).toBe(true); + expect(await client.store.running.get(true)).toBe(true); // both plugins executed expect(plugin1.tracked).toBe(true); diff --git a/packages/core/src/storage/sovranStorage.ts b/packages/core/src/storage/sovranStorage.ts index ff308a027..b14ccf1f4 100644 --- a/packages/core/src/storage/sovranStorage.ts +++ b/packages/core/src/storage/sovranStorage.ts @@ -41,6 +41,7 @@ type Data = { filters: DestinationFilters; pendingEvents: SegmentEvent[]; enabled: boolean; + running: boolean; }; const INITIAL_VALUES: Data = { @@ -56,6 +57,7 @@ const INITIAL_VALUES: Data = { }, pendingEvents: [], enabled: true, + running: false, }; const isEverythingReady = (state: ReadinessStore) => @@ -190,6 +192,9 @@ export class SovranStorage implements Storage { readonly enabledStore: Store<{ enabled: boolean }>; readonly enabled: Watchable & Settable; + readonly runningStore: Store<{ running: boolean }>; + readonly running: Watchable & Settable; + constructor(config: StorageConfig) { this.storeId = config.storeId; this.storePersistor = config.storePersistor; @@ -201,6 +206,7 @@ export class SovranStorage implements Storage { hasRestoredFilters: false, hasRestoredPendingEvents: false, hasRestoredEnabled: false, + hasRestoredRunning: false, }); const markAsReadyGenerator = (key: keyof ReadinessStore) => () => { @@ -537,6 +543,46 @@ export class SovranStorage implements Storage { }; + this.runningStore = createStore( + { running: INITIAL_VALUES.running }, + { + persist: { + storeId: `${this.storeId}-running`, + persistor: this.storePersistor, + saveDelay: this.storePersistorSaveDelay, + onInitialized: markAsReadyGenerator('hasRestoredRunning'), + }, + } + ); + // Accessor object for running + this.running = { + get: createGetter( + () => { + const state = this.runningStore.getState(); + return state.running; + }, + async () => { + const value = await this.runningStore.getState(true); + return value.running; + } + ), + + onChange: (callback: (value: boolean) => void) => { + return this.runningStore.subscribe((store) => { + callback(store.running); + }); + }, + + set: async (value: boolean | ((prev: boolean) => boolean)) => { + const { running } = await this.runningStore.dispatch((state) => { + const newRunning = + value instanceof Function ? value(state.running) : value; + return { running: newRunning }; + }); + return running; + }, + }; + this.fixAnonymousId(); } diff --git a/packages/core/src/storage/types.ts b/packages/core/src/storage/types.ts index 91955b715..734a1b945 100644 --- a/packages/core/src/storage/types.ts +++ b/packages/core/src/storage/types.ts @@ -60,6 +60,7 @@ export interface ReadinessStore { hasRestoredFilters: boolean; hasRestoredPendingEvents: boolean; hasRestoredEnabled: boolean; + hasRestoredRunning: boolean; } /** @@ -93,6 +94,7 @@ export interface Storage { Settable & Queue; readonly enabled: Watchable & Settable; + readonly running: Watchable & Settable; } export type DeepLinkData = { referring_application: string; From 7d815d96187d4eb78f13c69c62cc8c6cf165f711 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 02:37:55 +0000 Subject: [PATCH 5/8] Add running state to MockSegmentStore and update tests Co-authored-by: abueide <19354425+abueide@users.noreply.github.com> --- packages/core/src/analytics.ts | 5 ++ .../src/plugins/__tests__/Waiting.test.ts | 88 +++++++++++-------- .../core/src/test-helpers/mockSegmentStore.ts | 20 +++++ 3 files changed, 78 insertions(+), 35 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 0d4ad1c31..fe15227f2 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -524,6 +524,11 @@ export class SegmentClient { if (this.enabled.get() === false) { return; } + if (!this.running.get()) { + // If not running, queue the event for later processing + await this.store.pendingEvents.add(event); + return event; + } if (this.isReady.value) { return this.startTimelineProcessing(event); } else { diff --git a/packages/core/src/plugins/__tests__/Waiting.test.ts b/packages/core/src/plugins/__tests__/Waiting.test.ts index dcaa4a59d..abd017a91 100644 --- a/packages/core/src/plugins/__tests__/Waiting.test.ts +++ b/packages/core/src/plugins/__tests__/Waiting.test.ts @@ -41,19 +41,19 @@ describe('WaitingPlugin', () => { store, }); - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); client.pauseEventProcessing(1000); - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); jest.advanceTimersByTime(2000); // Allow microtasks from setTimeout → resumeEventProcessing await Promise.resolve(); - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); }); test('test manual resume', async () => { const client = new SegmentClient({ @@ -62,16 +62,16 @@ describe('WaitingPlugin', () => { store, }); - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); client.pauseEventProcessing(); - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); await client.resumeEventProcessing(); - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); }); test('pause does not dispatch timeout if already paused', async () => { const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); @@ -82,7 +82,7 @@ describe('WaitingPlugin', () => { store, }); - await client.store.running.set(true); + await client.running.set(true); client.pauseEventProcessing(); client.pauseEventProcessing(); @@ -97,8 +97,11 @@ describe('WaitingPlugin', () => { store, }); - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + + await client.running.set(true); + expect(client.running.get()).toBe(true); const plugin = new ExampleWaitingPlugin(); @@ -109,7 +112,7 @@ describe('WaitingPlugin', () => { client.track('foo'); - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); // Event should NOT be processed while paused expect(trackSpy).not.toHaveBeenCalled(); @@ -119,7 +122,7 @@ describe('WaitingPlugin', () => { jest.advanceTimersByTime(1000); await Promise.resolve(); - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); // Event should now be processed expect(trackSpy).toHaveBeenCalledTimes(1); @@ -132,15 +135,18 @@ describe('WaitingPlugin', () => { store, }); - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + + await client.running.set(true); + expect(client.running.get()).toBe(true); const waitingPlugin = new ManualResumeWaitingPlugin(); client.add({ plugin: waitingPlugin }); client.track('foo'); - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await jest.advanceTimersByTimeAsync(30000); @@ -148,7 +154,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); await Promise.resolve(); - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); expect(waitingPlugin.tracked).toBe(true); }); test('multiple WaitingPlugins', async () => { @@ -158,9 +164,12 @@ describe('WaitingPlugin', () => { store, }); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + // Initially, analytics is running - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); // Create two waiting plugins const plugin1 = new ExampleWaitingPlugin1(); @@ -174,7 +183,7 @@ describe('WaitingPlugin', () => { client.track('foo'); // Client should now be paused - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); // Plugins should not have tracked the event yet expect(plugin1.tracked).toBe(false); @@ -187,7 +196,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Still paused because plugin2 is waiting - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); @@ -198,7 +207,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Now analytics should be running - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); // Both plugins should have tracked the event expect(plugin1.tracked).toBe(true); expect(plugin2.tracked).toBe(true); @@ -212,9 +221,12 @@ describe('WaitingPlugin', () => { store, }); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + // Initially, analytics is running - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); const waitingPlugin = new ExampleWaitingPlugin1(); const stubDestinationPlugin: DestinationPlugin = new StubDestinationPlugin(); @@ -227,7 +239,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // Analytics should pause - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await Promise.resolve(); @@ -239,7 +251,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // Analytics resumed - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); // Waiting plugin executed expect(waitingPlugin.tracked).toBe(true); @@ -252,9 +264,12 @@ describe('WaitingPlugin', () => { store, }); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + // analytics running initially - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); const waitingPlugin = new ExampleWaitingPlugin1(); // no manual resume const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); @@ -269,7 +284,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // analytics should pause - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(waitingPlugin.tracked).toBe(false); await Promise.resolve(); @@ -281,7 +296,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // analytics resumed - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); // waiting plugin executed expect(waitingPlugin.tracked).toBe(true); @@ -293,9 +308,12 @@ describe('WaitingPlugin', () => { store, }); + // Set isReady so plugins can be added immediately + (client as any).isReady.value = true; + // analytics running initially - await client.store.running.set(true); - expect(client.store.running.get()).toBe(true); + await client.running.set(true); + expect(client.running.get()).toBe(true); const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); client.add({ plugin: destinationPlugin }); @@ -310,7 +328,7 @@ describe('WaitingPlugin', () => { await client.track('foo'); // analytics paused - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); // Resume the first plugin @@ -320,7 +338,7 @@ describe('WaitingPlugin', () => { await Promise.resolve(); // still paused because plugin2 not resumed - expect(client.store.running.get()).toBe(false); + expect(client.running.get()).toBe(false); expect(plugin1.tracked).toBe(false); expect(plugin2.tracked).toBe(false); plugin2.resume(); @@ -329,7 +347,7 @@ describe('WaitingPlugin', () => { await jest.runAllTimersAsync(); await Promise.resolve(); // analytics resumed - expect(await client.store.running.get(true)).toBe(true); + expect(await client.running.get(true)).toBe(true); // both plugins executed expect(plugin1.tracked).toBe(true); diff --git a/packages/core/src/test-helpers/mockSegmentStore.ts b/packages/core/src/test-helpers/mockSegmentStore.ts index 2b16f2bbd..c4d46901d 100644 --- a/packages/core/src/test-helpers/mockSegmentStore.ts +++ b/packages/core/src/test-helpers/mockSegmentStore.ts @@ -25,6 +25,7 @@ import { createGetter } from '../storage/helpers'; export type StoreData = { isReady: boolean; enabled: boolean; + running: boolean; context?: DeepPartial; settings: SegmentAPIIntegrations; consentSettings?: SegmentAPIConsentSettings; @@ -38,6 +39,7 @@ export type StoreData = { const INITIAL_VALUES: StoreData = { isReady: true, enabled: true, + running: true, context: undefined, settings: { [SEGMENT_DESTINATION_KEY]: {}, @@ -93,6 +95,7 @@ export class MockSegmentStore implements Storage { deepLinkData: createCallbackManager(), pendingEvents: createCallbackManager(), enabled: createCallbackManager(), + running: createCallbackManager(), }; readonly isReady = { @@ -123,6 +126,23 @@ export class MockSegmentStore implements Storage { }, }; + readonly running = { + get: createMockStoreGetter(() => { + return this.data.running; + }), + onChange: (_callback: (value: boolean) => void) => { + return () => { + return; + }; + }, + set: async (value: boolean | ((prev: boolean) => boolean)) => { + this.data.running = + value instanceof Function ? value(this.data.running ?? true) : value; + this.callbacks.running.run(this.data.running); + return this.data.running; + }, + }; + readonly context: Watchable | undefined> & Settable> = { get: createMockStoreGetter(() => ({ ...this.data.context })), From 6aebe14dae4f730640f820d4bbb9746745696c72 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 02:39:23 +0000 Subject: [PATCH 6/8] Remove debug console.log from addPlugin Co-authored-by: abueide <19354425+abueide@users.noreply.github.com> --- packages/core/src/analytics.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index fe15227f2..60e546275 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -502,7 +502,6 @@ export class SegmentClient { this.timeline.add(plugin); //check for waiting plugin here if (plugin instanceof WaitingPlugin) { - console.log('add plugin'); this.pauseEventProcessingForPlugin(plugin); } From 7638ade296c3ab609c57451b9330dc64a46d78c0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 02:40:34 +0000 Subject: [PATCH 7/8] Improve documentation and add comment for void operator Co-authored-by: abueide <19354425+abueide@users.noreply.github.com> --- packages/core/src/analytics.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 60e546275..aa191c8ca 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -1064,9 +1064,9 @@ export class SegmentClient { private resumeTimeoutId?: ReturnType; private waitingPlugins = new Set(); - /* - * Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin. - * An auto-resume will be scheduled after `timeout` ms. + /** + * Pause event processing for a specific waiting plugin. + * Events will be buffered until all waiting plugins resume. */ pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { if (plugin) { @@ -1092,6 +1092,7 @@ export class SegmentClient { return; } + // Fire-and-forget: state is updated synchronously in-memory, persistence happens async void this.store.running.set(false); // Only set timeout if not already set (prevents multiple waiting plugins from overwriting) From 93ac6afe85e3b58e925743b44dde9c3a7b793b9e Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 11 Feb 2026 21:02:18 -0600 Subject: [PATCH 8/8] Fix WaitingPlugin implementation and add comprehensive documentation - Fix test helpers to properly call this.resume() instead of bypassing plugin tracking - Add update() method to ExampleWaitingPlugin1 to match Kotlin SDK behavior (3s delay) - Remove console.log statements from test helpers - Fix ExampleWaitingPlugin.tsx logic bug and add comprehensive documentation - Remove commented-out code block from analytics.ts constructor - Add extensive JSDoc to WaitingPlugin class with example usage and use cases - Add JSDoc to pauseEventProcessing/resumeEventProcessing methods - Document common use cases: IDFA permissions, SDK initialization, remote config All 9 WaitingPlugin tests passing. Implementation now has better parity with Kotlin SDK and includes documentation for developers. Co-Authored-By: Claude --- .../plugins/ExampleWaitingPlugin.tsx | 36 ++++++++--- packages/core/src/analytics.ts | 42 ++++++++----- packages/core/src/plugin.ts | 61 ++++++++++++++++++- .../src/test-helpers/exampleWaitingPlugin.ts | 38 ++++++++---- 4 files changed, 138 insertions(+), 39 deletions(-) diff --git a/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx index 1015966e6..fa933ac9b 100644 --- a/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx +++ b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx @@ -1,24 +1,40 @@ import { WaitingPlugin, PluginType, - Plugin, - } from '@segment/analytics-react-native'; -import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native'; +import type { + SegmentAPISettings, + SegmentEvent, + UpdateType, +} from '@segment/analytics-react-native'; + +/** + * Example WaitingPlugin that demonstrates how to pause event processing + * until an async operation completes. + * + * Use cases: + * - Waiting for IDFA/advertising ID permissions + * - Initializing native SDKs or modules + * - Loading required configuration from remote sources + * + * The plugin automatically pauses event processing when added to the client. + * Call resume() when your async operation completes to start processing events. + */ export class ExampleWaitingPlugin extends WaitingPlugin { type = PluginType.enrichment; - analytics = undefined; tracked = false; /** - * Called when settings are updated + * Called when settings are updated from Segment. + * For initial settings, we simulate an async operation and then resume. */ - update(_settings: SegmentAPISettings, _type: UpdateType) { - if (this.type === PluginType.before) { - // delay 3 seconds, then resume event processing + update(_settings: SegmentAPISettings, type: UpdateType) { + if (type === UpdateType.initial) { + // Simulate async work (e.g., requesting permissions, loading data) setTimeout(() => { - this.resume(); + // Resume event processing once async work is complete + this.resume(); }, 3000); } } @@ -28,6 +44,6 @@ export class ExampleWaitingPlugin extends WaitingPlugin { */ track(event: SegmentEvent) { this.tracked = true; - return event; + return event; } } \ No newline at end of file diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index aa191c8ca..eeac0de07 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -205,20 +205,6 @@ export class SegmentClient { this.store = store; this.timeline = new Timeline(); - /* - * create and add waiting plugin immediately so early events get buffered. - * try { - * this.waitingPlugin = new WaitingPlugin(); - * // add directly to timeline via addPlugin to ensure configure() is called immediately - * this.addPlugin(this.waitingPlugin); - * // initial running state false until init completes (mirrors Kotlin semantics) - * this.isRunning = false; - * } catch (e) { - * // if WaitingPlugin instantiation or add fails, fallback to running=true - * this.isRunning = true; - * } - */ - // Initialize the watchables this.context = { get: this.store.context.get, @@ -1065,8 +1051,11 @@ export class SegmentClient { private waitingPlugins = new Set(); /** - * Pause event processing for a specific waiting plugin. + * Pause event processing for a specific WaitingPlugin. * Events will be buffered until all waiting plugins resume. + * + * @param plugin - The WaitingPlugin requesting the pause + * @internal This is called automatically when a WaitingPlugin is added */ pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { if (plugin) { @@ -1074,17 +1063,32 @@ export class SegmentClient { } this.pauseEventProcessing(); } + + /** + * Resume event processing for a specific WaitingPlugin. + * If all waiting plugins have resumed, buffered events will be processed. + * + * @param plugin - The WaitingPlugin that has completed its async work + * @internal This is called automatically when a WaitingPlugin calls resume() + */ async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) { if (plugin) { this.waitingPlugins.delete(plugin); } if (this.waitingPlugins.size > 0) { - return; // still blocked + return; // still blocked by other waiting plugins } await this.resumeEventProcessing(); } + /** + * Pause event processing globally. + * New events will be buffered in memory until resumeEventProcessing() is called. + * Automatically resumes after the specified timeout to prevent permanent blocking. + * + * @param timeout - Milliseconds to wait before auto-resuming (default: 30000) + */ pauseEventProcessing(timeout = 30000) { // IMPORTANT: ignore repeated pauses const running = this.store.running.get(); @@ -1102,6 +1106,12 @@ export class SegmentClient { }, timeout); } } + + /** + * Resume event processing and process all buffered events. + * This is called automatically by WaitingPlugins when they complete, + * or after the timeout expires. + */ async resumeEventProcessing() { const running = this.store.running.get(); if (running) { diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d1bd1840c..393385209 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -219,22 +219,79 @@ export class PlatformPlugin extends Plugin {} export { PluginType }; /** - * WaitingPlugin - * Buffers events when paused and releases them when resumed. + * WaitingPlugin - A base class for plugins that need to pause event processing + * until an asynchronous operation completes. + * + * When a WaitingPlugin is added to the Analytics client, it automatically pauses + * event processing. Events are buffered in memory until the plugin calls resume(). + * If resume() is not called within 30 seconds, event processing automatically resumes. + * + * @example + * ```typescript + * class IDFAPlugin extends WaitingPlugin { + * type = PluginType.enrichment; + * + * configure(analytics: SegmentClient) { + * super.configure(analytics); + * // Request IDFA permission + * requestTrackingPermission().then((status) => { + * if (status === 'authorized') { + * // Add IDFA to context + * } + * this.resume(); // Resume event processing + * }); + * } + * + * track(event: SegmentEvent) { + * // Enrich event with IDFA if available + * return event; + * } + * } + * ``` + * + * Common use cases: + * - Waiting for user permissions (IDFA, location, notifications) + * - Initializing native SDKs that provide enrichment data + * - Loading remote configuration required for event processing + * - Waiting for authentication state before sending events + * + * @remarks + * Multiple WaitingPlugins can be active simultaneously. Event processing + * only resumes when ALL waiting plugins have called resume() or timed out. + * + * WaitingPlugins can be added at any plugin type (before, enrichment, destination). + * They can also be added to DestinationPlugins to pause only that destination's + * event processing. */ export class WaitingPlugin extends Plugin { constructor() { super(); } + /** + * Configure the plugin with the Analytics client. + * Automatically pauses event processing when called. + * Override this method to perform async initialization, then call resume(). + * + * @param analytics - The Analytics client instance + */ configure(analytics: SegmentClient) { super.configure(analytics); } + /** + * Manually pause event processing. + * Generally not needed as adding a WaitingPlugin automatically pauses processing. + */ pause() { this.analytics?.pauseEventProcessingForPlugin(this); } + /** + * Resume event processing for this plugin. + * Call this method when your async operation completes. + * If all WaitingPlugins have resumed, buffered events will be processed. + */ async resume() { await this.analytics?.resumeEventProcessingForPlugin(this); } diff --git a/packages/core/src/test-helpers/exampleWaitingPlugin.ts b/packages/core/src/test-helpers/exampleWaitingPlugin.ts index d47c02b02..46751912b 100644 --- a/packages/core/src/test-helpers/exampleWaitingPlugin.ts +++ b/packages/core/src/test-helpers/exampleWaitingPlugin.ts @@ -1,18 +1,20 @@ -import { PluginType, SegmentEvent } from '../types'; +import { PluginType, SegmentEvent, SegmentAPISettings, UpdateType } from '../types'; import { SegmentClient } from '../analytics'; import { DestinationPlugin, WaitingPlugin } from '../plugin'; +/** + * Example WaitingPlugin that automatically resumes after 1 second. + * Used for testing the waiting plugin mechanism. + */ export class ExampleWaitingPlugin extends WaitingPlugin { public type = PluginType.enrichment; public tracked = false; configure(analytics: SegmentClient) { - console.log('exampleWaitingPlugin configure'); super.configure(analytics); // Simulate async work (network, native module init, etc.) setTimeout(() => { - console.log('ExampleWaitingPlugin: ready!'); - void analytics.resumeEventProcessing(); + this.resume(); }, 1000); } @@ -22,23 +24,34 @@ export class ExampleWaitingPlugin extends WaitingPlugin { return event; } } + +/** + * Example WaitingPlugin that resumes after 3 seconds when settings are updated. + * Mimics the Kotlin SDK's ExampleWaitingPlugin behavior. + */ export class ExampleWaitingPlugin1 extends WaitingPlugin { public type = PluginType.before; public tracked = false; - constructor() { - super(); - } - configure(analytics: SegmentClient) { - super.configure(analytics); + update(_settings: SegmentAPISettings, type: UpdateType) { + if (type === UpdateType.initial) { + // Simulate async initialization that takes 3 seconds + setTimeout(() => { + this.resume(); + }, 3000); + } } + execute(event: SegmentEvent): SegmentEvent | undefined { - console.log('ExampleWaitingPlugin1 received event', event.type); this.tracked = true; return event; } } +/** + * Example WaitingPlugin that requires manual resume() call. + * Used for testing manual control of event processing. + */ export class ManualResumeWaitingPlugin extends WaitingPlugin { public type = PluginType.enrichment; public tracked = false; @@ -48,12 +61,15 @@ export class ManualResumeWaitingPlugin extends WaitingPlugin { } execute(event: SegmentEvent): SegmentEvent | undefined { - console.log('ManualResumeWaitingPlugin received event', event.type); this.tracked = true; return event; } } +/** + * Stub destination plugin for testing. + * Always enabled and accepts all events. + */ export class StubDestinationPlugin extends DestinationPlugin { key = 'StubDestination'; protected isEnabled(_event: SegmentEvent): boolean {