diff --git a/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Private/Connection/DbConnectionBase.cpp b/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Private/Connection/DbConnectionBase.cpp index 968d9e41c84..a0b9724f489 100644 --- a/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Private/Connection/DbConnectionBase.cpp +++ b/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Private/Connection/DbConnectionBase.cpp @@ -1,6 +1,7 @@ #include "Connection/DbConnectionBase.h" #include "Connection/DbConnectionBuilder.h" #include "Connection/Credentials.h" +#include "Containers/Ticker.h" #include "ModuleBindings/Types/ClientMessageType.g.h" #include "ModuleBindings/Types/SubscribeMultiType.g.h" #include "ModuleBindings/Types/UnsubscribeMultiType.g.h" @@ -23,6 +24,69 @@ UDbConnectionBase::UDbConnectionBase(const FObjectInitializer& ObjectInitializer ProcedureCallbacks = CreateDefaultSubobject(TEXT("ProcedureCallbacks")); } +UDbConnectionBase::~UDbConnectionBase() +{ + // Ensure we unregister from the ticker when destroyed + if (TickerHandle.IsValid()) + { + FTSTicker::GetCoreTicker().RemoveTicker(TickerHandle); + TickerHandle.Reset(); + } +} + +void UDbConnectionBase::SetAutoTicking(bool bAutoTick) +{ + if (bIsAutoTicking == bAutoTick) + { + return; // No change needed + } + + bIsAutoTicking = bAutoTick; + + if (bAutoTick) + { + // Register with FTSTicker for automatic frame ticking + TickerHandle = FTSTicker::GetCoreTicker().AddTicker(FTickerDelegate::CreateUObject(this, &UDbConnectionBase::OnTickerTick)); + } + else + { + // Unregister from FTSTicker + if (TickerHandle.IsValid()) + { + FTSTicker::GetCoreTicker().RemoveTicker(TickerHandle); + TickerHandle.Reset(); + } + } +} + +int32 UDbConnectionBase::GetActiveSubscriptionCount() const +{ + // Thread-safe access to active subscriptions count + return ActiveSubscriptions.Num(); +} + +int32 UDbConnectionBase::GetPendingMessageCount() const +{ + // Return count of pending messages + return PendingMessages.Num(); +} + +int32 UDbConnectionBase::GetPreprocessedMessageCount() const +{ + return PreprocessedMessages.Num(); +} + +bool UDbConnectionBase::OnTickerTick(float DeltaTime) +{ + // Called by FTSTicker each frame when auto-ticking is enabled + if (bIsAutoTicking) + { + FrameTick(); + } + // Return true to continue ticking + return true; +} + void UDbConnectionBase::Disconnect() { if (WebSocket) @@ -87,6 +151,10 @@ void UDbConnectionBase::HandleWSClosed(int32 /*StatusCode*/, const FString& Reas void UDbConnectionBase::HandleWSBinaryMessage(const TArray& Message) { + // Track message stats for memory diagnostics + TotalMessagesReceived.fetch_add(1); + TotalBytesReceived.fetch_add(Message.Num()); + //tag for arrival order const int32 Id = NextPreprocessId.GetValue(); NextPreprocessId.Increment(); @@ -148,30 +216,6 @@ void UDbConnectionBase::FrameTick() ProcessServerMessage(Msg); } } -void UDbConnectionBase::Tick(float DeltaTime) -{ - if (bIsAutoTicking) - { - FrameTick(); - } -} - -TStatId UDbConnectionBase::GetStatId() const -{ - // This is used by the engine to track tickables, we return a unique stat ID for this class - RETURN_QUICK_DECLARE_CYCLE_STAT(UMyTickableObject, STATGROUP_Tickables); -} - -bool UDbConnectionBase::IsTickable() const -{ - return bIsAutoTicking; -} - -bool UDbConnectionBase::IsTickableInEditor() const -{ - return bIsAutoTicking; -} - void UDbConnectionBase::ProcessServerMessage(const FServerMessageType& Message) { @@ -381,9 +425,16 @@ bool UDbConnectionBase::DecompressBrotli(const TArray& InData, TArray& InData, TArray& OutData) { - if (InData.Num() < 4) + if (InData.Num() < 10) // Minimum gzip header size { - UE_LOG(LogTemp, Error, TEXT("Gzip data too small")); + UE_LOG(LogTemp, Warning, TEXT("Gzip data too small (%d bytes), likely incomplete"), InData.Num()); + return false; + } + + // Verify gzip magic header (1F 8B) + if (InData[0] != 0x1F || InData[1] != 0x8B) + { + UE_LOG(LogTemp, Warning, TEXT("Invalid gzip header: %02X %02X (expected 1F 8B)"), InData[0], InData[1]); return false; } @@ -391,12 +442,21 @@ bool UDbConnectionBase::DecompressGzip(const TArray& InData, TArray MaxReasonableSize) + { + UE_LOG(LogTemp, Warning, TEXT("Gzip claims uncompressed size of %u bytes - likely incomplete data, buffering"), OutSize); + return false; + } + OutData.SetNumUninitialized(OutSize); // Attempt to decompress the Gzip data if (!FCompression::UncompressMemory(NAME_Gzip, OutData.GetData(), OutSize, InData.GetData(), InData.Num())) { - UE_LOG(LogTemp, Error, TEXT("Gzip decompression failed")); + UE_LOG(LogTemp, Warning, TEXT("Gzip decompression failed - data may be incomplete")); + OutData.Reset(); return false; } @@ -505,22 +565,73 @@ FServerMessageType UDbConnectionBase::PreProcessMessage(const TArray& Mes { if (Message.Num() == 0) { - UE_LOG(LogTemp, Error, TEXT("Empty message recived from server, ignored")); + UE_LOG(LogTemp, Error, TEXT("Empty message received from server, ignored")); return FServerMessageType{}; } - // Check if the first byte is a valid compression tag - ECompressableQueryUpdateTag Compression = static_cast(Message[0]); - TArray CompressedPayload; - CompressedPayload.Append(Message.GetData() + 1, Message.Num() - 1); - // Decompress the payload based on the compression tag + TArray DataToProcess; + ECompressableQueryUpdateTag Compression; + bool bWasAccumulating = false; + + // Thread-safe access to compressed message accumulation buffer + { + FScopeLock Lock(&CompressedBufferMutex); + + // Check if we're accumulating a fragmented compressed message + if (bAccumulatingCompressedMessage) + { + // Append incoming data to the buffer (no compression tag on continuation) + IncompleteCompressedBuffer.Append(Message.GetData(), Message.Num()); + DataToProcess = IncompleteCompressedBuffer; + Compression = BufferedCompressionType; + bWasAccumulating = true; + } + else + { + // New message - check compression tag + uint8 FirstByte = Message[0]; + if (FirstByte > 2) + { + UE_LOG(LogTemp, Error, TEXT("PreProcessMessage: Invalid compression tag %d"), FirstByte); + return FServerMessageType{}; + } + Compression = static_cast(FirstByte); + DataToProcess.Append(Message.GetData() + 1, Message.Num() - 1); + } + } + + // Decompress the payload based on the compression tag (outside mutex for parallelism) TArray Decompressed; - if (!DecompressPayload(Compression, CompressedPayload, Decompressed)) + if (!DecompressPayload(Compression, DataToProcess, Decompressed)) { + // Decompression failed - if it's a compressed format, buffer for more data + if (Compression == ECompressableQueryUpdateTag::Gzip || Compression == ECompressableQueryUpdateTag::Brotli) + { + FScopeLock Lock(&CompressedBufferMutex); + if (!bAccumulatingCompressedMessage) + { + // Start accumulating + IncompleteCompressedBuffer = DataToProcess; + BufferedCompressionType = Compression; + } + // else: already accumulating, buffer was updated above + bAccumulatingCompressedMessage = true; + return FServerMessageType{}; // Return empty, will process when complete + } UE_LOG(LogTemp, Error, TEXT("Failed to decompress incoming message")); return FServerMessageType{}; } + // Decompression succeeded - clear accumulation state + { + FScopeLock Lock(&CompressedBufferMutex); + if (bAccumulatingCompressedMessage) + { + IncompleteCompressedBuffer.Reset(); + bAccumulatingCompressedMessage = false; + } + } + // Deserialize the decompressed data into a UServerMessageType object FServerMessageType Parsed = UE::SpacetimeDB::Deserialize(Decompressed); @@ -702,4 +813,9 @@ void UDbConnectionBase::ApplyRegisteredTableUpdates(const FDatabaseUpdateType& U // Broadcast the diff for each handler Handler->BroadcastDiff(this, Context); } +} + +int32 UDbConnectionBase::GetPreprocessedTableDataCount() const +{ + return PreprocessedTableData.Num(); } \ No newline at end of file diff --git a/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Public/Connection/DbConnectionBase.h b/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Public/Connection/DbConnectionBase.h index fd7ab5fff02..38c8afa22a7 100644 --- a/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Public/Connection/DbConnectionBase.h +++ b/sdks/unreal/src/SpacetimeDbSdk/Source/SpacetimeDbSdk/Public/Connection/DbConnectionBase.h @@ -2,6 +2,7 @@ #include "CoreMinimal.h" #include "UObject/NoExportTypes.h" +#include "Containers/Ticker.h" // For FTSTicker - thread-safe ticker for manual tick registration #include "Types/Builtins.h" #include "Websocket.h" #include "Subscription.h" @@ -10,6 +11,7 @@ #include "HAL/CriticalSection.h" #include "Containers/Queue.h" #include "HAL/ThreadSafeBool.h" +#include #include "BSATN/UEBSATNHelpers.h" #include "Connection/SetReducerFlags.h" #include "Connection/Callback.h" @@ -69,16 +71,29 @@ FORCEINLINE uint32 GetTypeHash(const FPreprocessedTableKey& Key) return HashCombine(GetTypeHash(Key.TableId), GetTypeHash(Key.TableName)); } +/** + * ROOT CAUSE FIX: Removed FTickableGameObject inheritance entirely. + * + * FTickableGameObject registers itself in its constructor BEFORE our constructor body runs. + * Even with ETickableTickType::Never, UE's GENERATED_BODY() macro may interfere with + * base class initialization order, causing the default constructor to be called instead. + * + * Instead, we use FTSTicker (Thread-Safe Ticker) for manual tick registration. + * This gives us complete control over when ticking is enabled/disabled. + */ UCLASS() -class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGameObject +class SPACETIMEDBSDK_API UDbConnectionBase : public UObject { GENERATED_BODY() public: - /** The default constructor is private to prevent instantiation without using the builder. */ + /** Default constructor. Ticking is NOT enabled by default - call SetAutoTicking(true) to enable. */ explicit UDbConnectionBase(const FObjectInitializer& ObjectInitializer = FObjectInitializer::Get()); + /** Destructor - ensures ticker delegate is unbound. */ + virtual ~UDbConnectionBase(); + /** Disconnect from the server. */ UFUNCTION(BlueprintCallable, Category="SpacetimeDB") void Disconnect(); @@ -90,8 +105,13 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam UFUNCTION(BlueprintCallable, Category="SpacetimeDB") void FrameTick(); + /** + * Enables or disables automatic ticking for this connection. + * When enabled, FrameTick() will be called automatically each frame. + * This uses SetTickableTickType() to properly register/unregister with the tick system. + */ UFUNCTION(BlueprintCallable, Category="SpacetimeDB") - void SetAutoTicking(bool bAutoTick) { bIsAutoTicking = bAutoTick; } + void SetAutoTicking(bool bAutoTick); /** Send a raw JSON message to the server. */ bool SendRawMessage(const FString& Message); @@ -110,6 +130,41 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam UFUNCTION(BlueprintPure, Category = "SpacetimeDB") FSpacetimeDBConnectionId GetConnectionId() const; + // ============ Diagnostic Methods (for memory debugging) ============ + + /** Get count of active subscriptions (for diagnostics) */ + UFUNCTION(BlueprintPure, Category = "SpacetimeDB") + int32 GetActiveSubscriptionCount() const; + + /** Get count of pending messages in queue (for diagnostics) */ + UFUNCTION(BlueprintPure, Category = "SpacetimeDB") + int32 GetPendingMessageCount() const; + + /** Get count of preprocessed messages waiting (for diagnostics) */ + UFUNCTION(BlueprintPure, Category = "SpacetimeDB") + int32 GetPreprocessedMessageCount() const; + + /** Get size of incomplete compressed buffer (for diagnostics) */ + int32 GetIncompleteCompressedBufferSize() const { return IncompleteCompressedBuffer.Num(); } + + /** Get size of preprocessed table data map (for diagnostics) */ + int32 GetPreprocessedTableDataCount() const; + + /** Get the next expected release ID (for diagnosing stuck reorder buffer) */ + int32 GetNextReleaseId() const { return NextReleaseId; } + + /** Get the next preprocess ID (for diagnosing stuck reorder buffer) */ + int32 GetNextPreprocessId() const { return NextPreprocessId.GetValue(); } + + /** Get the underlying WebSocket manager (for diagnostics) */ + UWebsocketManager* GetWebSocket() const { return WebSocket; } + + /** Get total bytes received since connection (for diagnostics) */ + int64 GetTotalBytesReceived() const { return TotalBytesReceived.load(); } + + /** Get total messages received since connection (for diagnostics) */ + int64 GetTotalMessagesReceived() const { return TotalMessagesReceived.load(); } + // Typed reducer call helper: hides BSATN bytes from callers. template void CallReducerTyped(const FString& Reducer, const ArgsStruct& Args, USetReducerFlagsBase* Flags) @@ -231,13 +286,8 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam UFUNCTION() void HandleWSBinaryMessage(const TArray& Message); - virtual void Tick(float DeltaTime) override; - - virtual TStatId GetStatId() const override; - - virtual bool IsTickable() const override; - - virtual bool IsTickableInEditor() const override; + /** Called by FTSTicker each frame when auto-ticking is enabled. */ + bool OnTickerTick(float DeltaTime); /** Internal handler that processes a single server message. */ void ProcessServerMessage(const FServerMessageType& Message); @@ -248,6 +298,18 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam bool DecompressGzip(const TArray& InData, TArray& OutData); bool DecompressBrotli(const TArray& InData, TArray& OutData); + /** Mutex protecting the compressed message accumulation buffer. */ + FCriticalSection CompressedBufferMutex; + + /** Buffer for incomplete compressed messages that span multiple WebSocket frames. */ + TArray IncompleteCompressedBuffer; + + /** The compression type of the buffered incomplete message. */ + ECompressableQueryUpdateTag BufferedCompressionType = ECompressableQueryUpdateTag::Uncompressed; + + /** Whether we're currently accumulating a fragmented compressed message. */ + bool bAccumulatingCompressedMessage = false; + /** Pending messages awaiting processing on the game thread. */ TArray PendingMessages; @@ -266,6 +328,10 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam /** Id of the next message expected to be released. */ int32 NextReleaseId = 0; + /** Diagnostic counters for memory leak tracking */ + std::atomic TotalBytesReceived{0}; + std::atomic TotalMessagesReceived{0}; + // Map of table name to row deserializer TMap> TableDeserializers; FCriticalSection TableDeserializersMutex; @@ -357,6 +423,9 @@ class SPACETIMEDBSDK_API UDbConnectionBase : public UObject, public FTickableGam UPROPERTY() bool bIsAutoTicking = false; + /** Handle for the FTSTicker delegate - used to unregister when ticking is disabled or object destroyed. */ + FTSTicker::FDelegateHandle TickerHandle; + UPROPERTY() FOnConnectErrorDelegate OnConnectErrorDelegate; UPROPERTY()