Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {WaitingPlugin, PluginType} from '@segment/analytics-react-native';

import type {
SegmentAPISettings,
SegmentEvent,
UpdateType,
} from '@segment/analytics-react-native';

/**
* Example WaitingPlugin that demonstrates how to pause event processing
* until an async operation completes.
*
* Use cases:
* - Waiting for IDFA/advertising ID permissions
* - Initializing native SDKs or modules
* - Loading required configuration from remote sources
*
* The plugin automatically pauses event processing when added to the client.
* Call resume() when your async operation completes to start processing events.
*/
export class ExampleWaitingPlugin extends WaitingPlugin {
type = PluginType.enrichment;
tracked = false;

/**
* Called when settings are updated from Segment.
* For initial settings, we simulate an async operation and then resume.
*/
update(_settings: SegmentAPISettings, type: UpdateType) {
if (type === UpdateType.initial) {
// Simulate async work (e.g., requesting permissions, loading data)
setTimeout(() => {
// Resume event processing once async work is complete
this.resume();
}, 3000);
}
}

/**
* Called for track events
*/
track(event: SegmentEvent) {
this.tracked = true;
return event;
}
}
106 changes: 102 additions & 4 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {
translateHTTPError,
} from './errors';
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
import { WaitingPlugin } from './plugin';

type OnPluginAddedCallback = (plugin: Plugin) => void;

Expand Down Expand Up @@ -120,6 +121,10 @@ export class SegmentClient {
* Access or subscribe to client enabled
*/
readonly enabled: Watchable<boolean> & Settable<boolean>;
/**
* Access or subscribe to running state (controls event processing)
*/
readonly running: Watchable<boolean> & Settable<boolean>;
/**
* Access or subscribe to client context
*/
Expand Down Expand Up @@ -258,6 +263,12 @@ export class SegmentClient {
onChange: this.store.enabled.onChange,
};

this.running = {
get: this.store.running.get,
set: this.store.running.set,
onChange: this.store.running.onChange,
};

// add segment destination plugin unless
// asked not to via configuration.
if (this.config.autoAddSegmentDestination === true) {
Expand Down Expand Up @@ -295,7 +306,6 @@ export class SegmentClient {
if ((await this.store.isReady.get(true)) === false) {
await this.storageReady();
}

// Get new settings from segment
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
// which make use of the settings object to initialize
Expand All @@ -309,7 +319,8 @@ export class SegmentClient {
]);
await this.onReady();
this.isReady.value = true;

// Set running to true to start event processing
await this.store.running.set(true);
// Process all pending events
await this.processPendingEvents();
// Trigger manual flush
Expand Down Expand Up @@ -465,7 +476,6 @@ export class SegmentClient {
settings
);
}

if (!this.isReady.value) {
this.pluginsToAdd.push(plugin);
} else {
Expand All @@ -476,6 +486,11 @@ export class SegmentClient {
private addPlugin(plugin: Plugin) {
plugin.configure(this);
this.timeline.add(plugin);
//check for waiting plugin here
if (plugin instanceof WaitingPlugin) {
this.pauseEventProcessingForPlugin(plugin);
}

this.triggerOnPluginLoaded(plugin);
}

Expand All @@ -494,6 +509,11 @@ export class SegmentClient {
if (this.enabled.get() === false) {
return;
}
if (!this.running.get()) {
// If not running, queue the event for later processing
await this.store.pendingEvents.add(event);
return event;
}
if (this.isReady.value) {
return this.startTimelineProcessing(event);
} else {
Expand All @@ -512,7 +532,7 @@ export class SegmentClient {
): Promise<SegmentEvent | undefined> {
const event = await this.applyContextData(incomingEvent);
this.flushPolicyExecuter.notify(event);
return this.timeline.process(event);
return await this.timeline.process(event);
}

private async trackDeepLinks() {
Expand Down Expand Up @@ -1027,4 +1047,82 @@ export class SegmentClient {

return totalEventsCount;
}
private resumeTimeoutId?: ReturnType<typeof setTimeout>;
private waitingPlugins = new Set<WaitingPlugin>();

/**
* Pause event processing for a specific WaitingPlugin.
* Events will be buffered until all waiting plugins resume.
*
* @param plugin - The WaitingPlugin requesting the pause
* @internal This is called automatically when a WaitingPlugin is added
*/
pauseEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.add(plugin);
}
this.pauseEventProcessing();
}

/**
* Resume event processing for a specific WaitingPlugin.
* If all waiting plugins have resumed, buffered events will be processed.
*
* @param plugin - The WaitingPlugin that has completed its async work
* @internal This is called automatically when a WaitingPlugin calls resume()
*/
async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.delete(plugin);
}
if (this.waitingPlugins.size > 0) {
return; // still blocked by other waiting plugins
}

await this.resumeEventProcessing();
}

/**
* Pause event processing globally.
* New events will be buffered in memory until resumeEventProcessing() is called.
* Automatically resumes after the specified timeout to prevent permanent blocking.
*
* @param timeout - Milliseconds to wait before auto-resuming (default: 30000)
*/
pauseEventProcessing(timeout = 30000) {
// IMPORTANT: ignore repeated pauses
const running = this.store.running.get();
if (!running) {
return;
}

// Fire-and-forget: state is updated synchronously in-memory, persistence happens async
void this.store.running.set(false);

// Only set timeout if not already set (prevents multiple waiting plugins from overwriting)
if (!this.resumeTimeoutId) {
this.resumeTimeoutId = setTimeout(async () => {
await this.resumeEventProcessing();
}, timeout);
}
}

/**
* Resume event processing and process all buffered events.
* This is called automatically by WaitingPlugins when they complete,
* or after the timeout expires.
*/
async resumeEventProcessing() {
const running = this.store.running.get();
if (running) {
return;
}

if (this.resumeTimeoutId) {
clearTimeout(this.resumeTimeoutId);
this.resumeTimeoutId = undefined;
}
await this.store.running.set(true);
await this.processPendingEvents();
}
}
90 changes: 88 additions & 2 deletions packages/core/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin {
key = '';

timeline = new Timeline();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
store: any;

private hasSettings() {
return this.analytics?.settings.get()?.[this.key] !== undefined;
}

private isEnabled(event: SegmentEvent): boolean {
protected isEnabled(event: SegmentEvent): boolean {
let customerDisabled = false;
if (event.integrations?.[this.key] === false) {
customerDisabled = true;
Expand All @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin {
if (analytics) {
plugin.configure(analytics);
}

if (analytics && plugin instanceof WaitingPlugin) {
analytics.pauseEventProcessingForPlugin(plugin);
}
this.timeline.add(plugin);
return plugin;
}
Expand Down Expand Up @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin {
type: PluginType.before,
event,
});

if (beforeResult === undefined) {
return;
}
Expand Down Expand Up @@ -210,3 +215,84 @@ export class UtilityPlugin extends EventPlugin {}

// For internal platform-specific bits
export class PlatformPlugin extends Plugin {}

export { PluginType };

/**
* WaitingPlugin - A base class for plugins that need to pause event processing
* until an asynchronous operation completes.
*
* When a WaitingPlugin is added to the Analytics client, it automatically pauses
* event processing. Events are buffered in memory until the plugin calls resume().
* If resume() is not called within 30 seconds, event processing automatically resumes.
*
* @example
* ```typescript
* class IDFAPlugin extends WaitingPlugin {
* type = PluginType.enrichment;
*
* configure(analytics: SegmentClient) {
* super.configure(analytics);
* // Request IDFA permission
* requestTrackingPermission().then((status) => {
* if (status === 'authorized') {
* // Add IDFA to context
* }
* this.resume(); // Resume event processing
* });
* }
*
* track(event: SegmentEvent) {
* // Enrich event with IDFA if available
* return event;
* }
* }
* ```
*
* Common use cases:
* - Waiting for user permissions (IDFA, location, notifications)
* - Initializing native SDKs that provide enrichment data
* - Loading remote configuration required for event processing
* - Waiting for authentication state before sending events
*
* @remarks
* Multiple WaitingPlugins can be active simultaneously. Event processing
* only resumes when ALL waiting plugins have called resume() or timed out.
*
* WaitingPlugins can be added at any plugin type (before, enrichment, destination).
* They can also be added to DestinationPlugins to pause only that destination's
* event processing.
*/
export class WaitingPlugin extends Plugin {
constructor() {
super();
}

/**
* Configure the plugin with the Analytics client.
* Automatically pauses event processing when called.
* Override this method to perform async initialization, then call resume().
*
* @param analytics - The Analytics client instance
*/
configure(analytics: SegmentClient) {
super.configure(analytics);
}

/**
* Manually pause event processing.
* Generally not needed as adding a WaitingPlugin automatically pauses processing.
*/
pause() {
this.analytics?.pauseEventProcessingForPlugin(this);
}

/**
* Resume event processing for this plugin.
* Call this method when your async operation completes.
* If all WaitingPlugins have resumed, buffered events will be processed.
*/
async resume() {
await this.analytics?.resumeEventProcessingForPlugin(this);
}
}
7 changes: 7 additions & 0 deletions packages/core/src/plugins/QueueFlushingPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ export class QueueFlushingPlugin extends UtilityPlugin {
* Calls the onFlush callback with the events in the queue
*/
async flush() {
// Check if event processing is running
const running = this.analytics?.running.get();
if (running === false) {
this.analytics?.logger.info('Event processing is paused, skipping flush');
return;
}

// Wait for the queue to be restored
try {
await this.isRestored;
Expand Down
Loading