Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-streets-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/query-core': patch
---

Made context.signal consume aware with streamedQuery
43 changes: 42 additions & 1 deletion packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,12 @@ describe('streamedQuery', () => {
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
streamFn: () => createAsyncNumberGenerator(3),
streamFn: (context) => {
// just consume the signal
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
const numbers = context.signal ? 3 : 0
return createAsyncNumberGenerator(numbers)
},
refetchMode: 'append',
}),
})
Expand Down Expand Up @@ -420,6 +425,42 @@ describe('streamedQuery', () => {
})
})

test('should not abort when signal not consumed', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
streamFn: () => createAsyncNumberGenerator(3),
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(60)

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

unsubscribe()

await vi.advanceTimersByTimeAsync(50)

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})
})

test('should support custom reducer', async () => {
const key = queryKey()

Expand Down
25 changes: 11 additions & 14 deletions packages/query-core/src/infiniteQueryBehavior.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { addToEnd, addToStart, ensureQueryFn } from './utils'
import {
addConsumeAwareSignal,
addToEnd,
addToStart,
ensureQueryFn,
} from './utils'
import type { QueryBehavior } from './query'
import type {
InfiniteData,
Expand All @@ -23,19 +28,11 @@ export function infiniteQueryBehavior<TQueryFnData, TError, TData, TPageParam>(
const fetchFn = async () => {
let cancelled = false
const addSignalProperty = (object: unknown) => {
Object.defineProperty(object, 'signal', {
enumerable: true,
get: () => {
if (context.signal.aborted) {
cancelled = true
} else {
context.signal.addEventListener('abort', () => {
cancelled = true
})
}
return context.signal
},
})
addConsumeAwareSignal(
object,
() => context.signal,
() => (cancelled = true),
)
}

const queryFn = ensureQueryFn(context.options, context.fetchOptions)
Expand Down
30 changes: 25 additions & 5 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
import { addConsumeAwareSignal, addToEnd } from './utils'
import type {
OmitKeyof,
QueryFunction,
QueryFunctionContext,
QueryKey,
} from './types'

type BaseStreamedQueryParams<TQueryFnData, TQueryKey extends QueryKey> = {
streamFn: (
Expand Down Expand Up @@ -73,10 +78,25 @@ export function streamedQuery<

let result = initialValue

const stream = await streamFn(context)
let cancelled = false
const streamFnContext = addConsumeAwareSignal<
OmitKeyof<typeof context, 'signal'>
>(
{
client: context.client,
meta: context.meta,
queryKey: context.queryKey,
pageParam: context.pageParam,
direction: context.direction,
},
() => context.signal,
() => (cancelled = true),
)

const stream = await streamFn(streamFnContext)

for await (const chunk of stream) {
if (context.signal.aborted) {
if (cancelled) {
break
}

Expand All @@ -90,7 +110,7 @@ export function streamedQuery<
}

// finalize result: replace-refetching needs to write to the cache
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
if (isRefetch && refetchMode === 'replace' && !cancelled) {
context.client.setQueryData<TData>(context.queryKey, result)
}

Expand Down
29 changes: 29 additions & 0 deletions packages/query-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,32 @@ export function shouldThrowError<T extends (...args: Array<any>) => boolean>(

return !!throwOnError
}

export function addConsumeAwareSignal<T>(
object: T,
getSignal: () => AbortSignal,
onCancelled: VoidFunction,
): T & { signal: AbortSignal } {
let consumed = false

Object.defineProperty(object, 'signal', {
enumerable: true,
get: () => {
const signal = getSignal()
if (consumed) {
return signal
}

consumed = true
if (signal.aborted) {
onCancelled()
} else {
signal.addEventListener('abort', onCancelled, { once: true })
}

return signal
},
})

return object as T & { signal: AbortSignal }
}