diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/native_topic_sampler.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/native_topic_sampler.hpp index b87c1f95..8737cd41 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/native_topic_sampler.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/native_topic_sampler.hpp @@ -278,18 +278,24 @@ class NativeTopicSampler { * @param topic_name Full topic path * @return true if this is a system topic that should be filtered */ - static bool is_system_topic(const std::string & topic_name); + static bool is_system_topic(const std::string & topic_name); - private: - /** - * @brief Get the message type for a topic from the graph - */ - std::string get_topic_type(const std::string & topic_name); + private: + std::string get_topic_type(const std::string & topic_name); rclcpp::Node * node_; - /// Native JSON serializer for topic deserialization std::shared_ptr serializer_; + + // Getter for cached component-topic map + // Returns a copy to avoid exposing internal data while the mutex is released. + std::map + get_component_topic_map(); + + // --- Component topic map cache --- + std::map topic_map_cache_; + size_t cached_graph_change_count_{0}; + mutable std::mutex topic_map_mutex_; }; } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/native_topic_sampler.cpp b/src/ros2_medkit_gateway/src/native_topic_sampler.cpp index 5034bb96..453feaaf 100644 --- a/src/ros2_medkit_gateway/src/native_topic_sampler.cpp +++ b/src/ros2_medkit_gateway/src/native_topic_sampler.cpp @@ -486,11 +486,43 @@ std::map NativeTopicSampler::build_component_topic RCLCPP_DEBUG(node_->get_logger(), "Built topic map for %zu components", component_map.size()); return component_map; } +std::map +NativeTopicSampler::get_component_topic_map() +{ + std::lock_guard lock(topic_map_mutex_); + + // Compute a lightweight fingerprint of the ROS graph (topics+types) + // Note: ROS 2 does not expose a simple graph change counter; using + // get_topic_names_and_types() and hashing the result is a reliable + // way to detect graph changes that affect topic/component mapping. + auto all_topics = node_->get_topic_names_and_types(); + + size_t current_graph_fingerprint = 1469598103934665603ULL; // FNV-1a seed-ish + std::hash hasher; + // Incorporate topic count + current_graph_fingerprint ^= all_topics.size() + 0x9e3779b97f4a7c15ULL + (current_graph_fingerprint << 6) + (current_graph_fingerprint >> 2); + for (const auto & kv : all_topics) { + const auto & topic = kv.first; + const auto & types = kv.second; + current_graph_fingerprint ^= hasher(topic) + 0x9e3779b97f4a7c15ULL + (current_graph_fingerprint << 6) + (current_graph_fingerprint >> 2); + if (!types.empty()) { + current_graph_fingerprint ^= hasher(types[0]) + 0x9e3779b97f4a7c15ULL + (current_graph_fingerprint << 6) + (current_graph_fingerprint >> 2); + } + } + + if (topic_map_cache_.empty() || cached_graph_change_count_ != current_graph_fingerprint) { + topic_map_cache_ = build_component_topic_map(); + cached_graph_change_count_ = current_graph_fingerprint; + } + + // Return a copy to avoid callers holding a reference to internal cache + return topic_map_cache_; +} + ComponentTopics NativeTopicSampler::get_component_topics(const std::string & component_fqn) { // Build full map and extract for this component - // TODO(optimization): Cache the map and invalidate on graph changes - auto full_map = build_component_topic_map(); + const auto full_map = get_component_topic_map(); auto it = full_map.find(component_fqn); if (it != full_map.end()) { diff --git a/src/ros2_medkit_gateway/test/test_auth_manager.cpp b/src/ros2_medkit_gateway/test/test_auth_manager.cpp index 33e11d51..38703815 100644 --- a/src/ros2_medkit_gateway/test/test_auth_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_auth_manager.cpp @@ -1171,6 +1171,443 @@ TEST_F(AuthRequirementPolicyTest, PolicyDescriptions) { EXPECT_NE(write_only.description(), configurable.description()); } +// ============================================================================ +// Cache Invalidation and Coherence Tests +// ============================================================================ +// @verifies REQ_INTEROP_086, REQ_INTEROP_087 +class AuthManagerCacheTest : public ::testing::Test { + protected: + void SetUp() override { + config_ = AuthConfigBuilder() + .with_enabled(true) + .with_jwt_secret("test_secret_key_for_cache_testing_12345") + .with_algorithm(JwtAlgorithm::HS256) + .with_token_expiry(3600) + .with_refresh_token_expiry(86400) + .with_require_auth_for(AuthRequirement::WRITE) + .add_client("cache_test_user", "password", UserRole::OPERATOR) + .add_client("cache_test_admin", "admin_pass", UserRole::ADMIN) + .build(); + + auth_manager_ = std::make_unique(config_); + } + + AuthConfig config_; + std::unique_ptr auth_manager_; +}; + +// Test: Refresh token is cached and retrieved correctly +TEST_F(AuthManagerCacheTest, RefreshTokenCacheHit) { + // Authenticate to create refresh token + auto auth_result = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(auth_result.has_value()); + ASSERT_TRUE(auth_result->refresh_token.has_value()); + + const std::string & refresh_token = auth_result->refresh_token.value(); + + // First validation should succeed + auto validation1 = auth_manager_->validate_token(refresh_token, TokenType::REFRESH); + EXPECT_TRUE(validation1.valid); + EXPECT_EQ(validation1.claims->sub, "cache_test_user"); + + // Second validation (cache hit) should also succeed + auto validation2 = auth_manager_->validate_token(refresh_token, TokenType::REFRESH); + EXPECT_TRUE(validation2.valid); + EXPECT_EQ(validation2.claims->sub, "cache_test_user"); +} + +// Test: Multiple tokens for same user are cached independently +TEST_F(AuthManagerCacheTest, MultipleCachedTokensPerUser) { + // Create multiple tokens for the same user + auto token1 = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token1.has_value()); + + auto token2 = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token2.has_value()); + + // Both tokens should be distinct + EXPECT_NE(token1->access_token, token2->access_token); + EXPECT_NE(token1->refresh_token.value_or(""), token2->refresh_token.value_or("")); + + // Both should validate independently + auto val1 = auth_manager_->validate_token(token1->access_token); + auto val2 = auth_manager_->validate_token(token2->access_token); + + EXPECT_TRUE(val1.valid); + EXPECT_TRUE(val2.valid); +} + +// Test: Revoking one token doesn't affect other tokens from same user +TEST_F(AuthManagerCacheTest, RevokeTokenDoesNotAffectOthers) { + // Create two tokens + auto token1 = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token1.has_value()); + + auto token2 = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token2.has_value()); + + // Revoke first refresh token + bool revoked = auth_manager_->revoke_refresh_token(token1->refresh_token.value()); + EXPECT_TRUE(revoked); + + // First access token should be invalid (refresh token was revoked) + auto val1 = auth_manager_->validate_token(token1->access_token); + EXPECT_FALSE(val1.valid); + EXPECT_TRUE(val1.error.find("revoked") != std::string::npos); + + // Second access token should still be valid (its refresh token not revoked) + auto val2 = auth_manager_->validate_token(token2->access_token); + EXPECT_TRUE(val2.valid); +} + +// Test: Client state changes invalidate all tokens for that client +TEST_F(AuthManagerCacheTest, DisablingClientInvalidatesAllTokens) { + // Create tokens for two clients + auto token1 = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token1.has_value()); + + auto token2 = auth_manager_->authenticate("cache_test_admin", "admin_pass"); + ASSERT_TRUE(token2.has_value()); + + // Both tokens should be valid + auto val1 = auth_manager_->validate_token(token1->access_token); + auto val2 = auth_manager_->validate_token(token2->access_token); + EXPECT_TRUE(val1.valid); + EXPECT_TRUE(val2.valid); + + // Disable first client + bool disabled = auth_manager_->disable_client("cache_test_user"); + EXPECT_TRUE(disabled); + + // First token should be invalid + val1 = auth_manager_->validate_token(token1->access_token); + EXPECT_FALSE(val1.valid); + EXPECT_TRUE(val1.error.find("disabled") != std::string::npos); + + // Second token should still be valid + val2 = auth_manager_->validate_token(token2->access_token); + EXPECT_TRUE(val2.valid); +} + +// Test: Re-enabling client restores token validity +TEST_F(AuthManagerCacheTest, ReenablingClientRestoresTokenValidity) { + auto token = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(token.has_value()); + + // Token is valid + auto val = auth_manager_->validate_token(token->access_token); + EXPECT_TRUE(val.valid); + + // Disable client + auth_manager_->disable_client("cache_test_user"); + val = auth_manager_->validate_token(token->access_token); + EXPECT_FALSE(val.valid); + + // Re-enable client + auth_manager_->enable_client("cache_test_user"); + val = auth_manager_->validate_token(token->access_token); + EXPECT_TRUE(val.valid); +} + +// Test: Cleanup removes expired refresh tokens from cache +TEST_F(AuthManagerCacheTest, CleanupRemovesExpiredRefreshTokens) { + // Use short expiry for testing + AuthConfig short_expiry = AuthConfigBuilder() + .with_enabled(true) + .with_jwt_secret("test_secret_key_cleanup_test_12345") + .with_token_expiry(1) + .with_refresh_token_expiry(1) + .add_client("cleanup_user", "password", UserRole::VIEWER) + .build(); + + AuthManager manager(short_expiry); + + // Create tokens + auto result1 = manager.authenticate("cleanup_user", "password"); + ASSERT_TRUE(result1.has_value()); + + // Create another token to ensure we have multiple in cache + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto result2 = manager.authenticate("cleanup_user", "password"); + ASSERT_TRUE(result2.has_value()); + + // Wait for expiration + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Cleanup should remove expired tokens + size_t cleaned = manager.cleanup_expired_tokens(); + EXPECT_GE(cleaned, 2); + + // Cleanup again should remove none (already cleaned) + cleaned = manager.cleanup_expired_tokens(); + EXPECT_EQ(cleaned, 0); +} + +// Test: Cache consistency between refresh token revocation and access token validation +TEST_F(AuthManagerCacheTest, RefreshTokenRevocationConsistency) { + auto auth_result = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(auth_result.has_value()); + ASSERT_TRUE(auth_result->refresh_token.has_value()); + + const auto & access_token = auth_result->access_token; + const auto & refresh_token = auth_result->refresh_token.value(); + + // Both tokens should be initially valid + auto access_val = auth_manager_->validate_token(access_token); + auto refresh_val = auth_manager_->validate_token(refresh_token, TokenType::REFRESH); + EXPECT_TRUE(access_val.valid); + EXPECT_TRUE(refresh_val.valid); + + // Revoke refresh token + bool revoked = auth_manager_->revoke_refresh_token(refresh_token); + EXPECT_TRUE(revoked); + + // Access token should immediately become invalid (cache updated) + access_val = auth_manager_->validate_token(access_token); + EXPECT_FALSE(access_val.valid); + EXPECT_TRUE(access_val.error.find("revoked") != std::string::npos); + + // Refresh token should also be invalid + refresh_val = auth_manager_->validate_token(refresh_token, TokenType::REFRESH); + EXPECT_FALSE(refresh_val.valid); +} + +// Test: Attempting to revoke non-existent token +TEST_F(AuthManagerCacheTest, RevokeNonexistentToken) { + bool revoked = auth_manager_->revoke_refresh_token("nonexistent_token_id"); + EXPECT_FALSE(revoked); +} + +// Test: Client lookup cache returns correct client information +TEST_F(AuthManagerCacheTest, ClientCacheLookup) { + auto client = auth_manager_->get_client("cache_test_user"); + ASSERT_TRUE(client.has_value()); + EXPECT_EQ(client->client_id, "cache_test_user"); + EXPECT_EQ(client->role, UserRole::OPERATOR); + EXPECT_TRUE(client->enabled); + + // Second lookup should return same data (cache hit) + auto client2 = auth_manager_->get_client("cache_test_user"); + ASSERT_TRUE(client2.has_value()); + EXPECT_EQ(client2->client_id, "cache_test_user"); + EXPECT_EQ(client2->role, UserRole::OPERATOR); +} + +// Test: Non-existent client lookup doesn't pollute cache +TEST_F(AuthManagerCacheTest, NonexistentClientLookup) { + auto client = auth_manager_->get_client("nonexistent_client"); + EXPECT_FALSE(client.has_value()); + + // Should still not exist on second lookup + client = auth_manager_->get_client("nonexistent_client"); + EXPECT_FALSE(client.has_value()); +} + +// Test: Dynamic client registration is cached properly +TEST_F(AuthManagerCacheTest, DynamicClientRegistrationCache) { + // Register new client + bool registered = auth_manager_->register_client("dynamic_user", "dynamic_pass", UserRole::VIEWER); + EXPECT_TRUE(registered); + + // Should be available in cache immediately + auto client = auth_manager_->get_client("dynamic_user"); + ASSERT_TRUE(client.has_value()); + EXPECT_EQ(client->client_id, "dynamic_user"); + + // Should authenticate successfully (client in cache) + auto auth_result = auth_manager_->authenticate("dynamic_user", "dynamic_pass"); + EXPECT_TRUE(auth_result.has_value()); + + // Second authentication should use cached client + auth_result = auth_manager_->authenticate("dynamic_user", "dynamic_pass"); + EXPECT_TRUE(auth_result.has_value()); +} + +// Test: Thread-safety of concurrent token validations +TEST_F(AuthManagerCacheTest, ConcurrentTokenValidations) { + auto auth_result = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(auth_result.has_value()); + + const auto & token = auth_result->access_token; + std::vector threads; + std::atomic validation_count(0); + + // Launch 10 concurrent validation operations + for (int i = 0; i < 10; ++i) { + threads.emplace_back([this, &token, &validation_count]() { + auto result = auth_manager_->validate_token(token); + if (result.valid) { + validation_count++; + } + }); + } + + // Wait for all threads + for (auto & t : threads) { + t.join(); + } + + // All validations should succeed + EXPECT_EQ(validation_count, 10); +} + +// Test: Thread-safety of concurrent authentication operations +TEST_F(AuthManagerCacheTest, ConcurrentAuthenticationOperations) { + std::vector threads; + std::atomic success_count(0); + std::mutex results_mutex; + std::vector access_tokens; + + // Launch 5 concurrent authentication operations for different users + for (int i = 0; i < 5; ++i) { + threads.emplace_back([this, &success_count, &results_mutex, &access_tokens, i]() { + std::string client_id = (i % 2 == 0) ? "cache_test_user" : "cache_test_admin"; + std::string password = (i % 2 == 0) ? "password" : "admin_pass"; + + auto result = auth_manager_->authenticate(client_id, password); + if (result.has_value()) { + { + std::lock_guard lock(results_mutex); + access_tokens.emplace_back(result->access_token); + } + success_count++; + } + }); + } + + // Wait for all threads + for (auto & t : threads) { + t.join(); + } + + // All authentications should succeed + EXPECT_EQ(success_count, 5); + + // All tokens should be unique + std::set unique_tokens(access_tokens.begin(), access_tokens.end()); + EXPECT_EQ(unique_tokens.size(), access_tokens.size()); +} + +// Test: Thread-safety during cache invalidation +TEST_F(AuthManagerCacheTest, ConcurrentCacheInvalidation) { + // Create multiple tokens + std::vector refresh_tokens; + for (int i = 0; i < 5; ++i) { + auto result = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(result.has_value()); + refresh_tokens.emplace_back(result->refresh_token.value()); + } + + std::vector threads; + std::atomic revoke_error(false); + + // Concurrently revoke multiple tokens + for (const auto & token : refresh_tokens) { + threads.emplace_back([this, &token, &revoke_error]() { + bool revoked = auth_manager_->revoke_refresh_token(token); + if (!revoked) { + revoke_error = true; + } + }); + } + + // Wait for all threads + for (auto & t : threads) { + t.join(); + } + + // All should be revoked successfully + EXPECT_FALSE(revoke_error); +} + +// Test: Thread-safety of concurrent client enable/disable +TEST_F(AuthManagerCacheTest, ConcurrentClientStateChanges) { + std::vector threads; + std::atomic disable_count(0); + std::atomic enable_count(0); + + // Alternate disable/enable operations + for (int i = 0; i < 10; ++i) { + if (i % 2 == 0) { + threads.emplace_back([this, &disable_count]() { + bool result = auth_manager_->disable_client("cache_test_user"); + if (result) { + disable_count++; + } + }); + } else { + threads.emplace_back([this, &enable_count]() { + bool result = auth_manager_->enable_client("cache_test_user"); + if (result) { + enable_count++; + } + }); + } + } + + // Wait for all threads + for (auto & t : threads) { + t.join(); + } + + // Final state should be consistent (either enabled or disabled) + auto client = auth_manager_->get_client("cache_test_user"); + ASSERT_TRUE(client.has_value()); + // Client should have a valid state + EXPECT_TRUE(client.has_value()); +} + +// Test: Cache behavior with rapid token expiration +TEST_F(AuthManagerCacheTest, RapidExpirationCacheBehavior) { + AuthConfig rapid_expiry = AuthConfigBuilder() + .with_enabled(true) + .with_jwt_secret("test_secret_key_rapid_expiry__12345") + .with_token_expiry(1) + .with_refresh_token_expiry(2) + .add_client("rapid_user", "password", UserRole::VIEWER) + .build(); + + AuthManager manager(rapid_expiry); + + // Create and validate token + auto auth_result = manager.authenticate("rapid_user", "password"); + ASSERT_TRUE(auth_result.has_value()); + + auto token = auth_result->access_token; + auto val1 = manager.validate_token(token); + EXPECT_TRUE(val1.valid); + + // Wait for expiration + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Token should be expired + auto val2 = manager.validate_token(token); + EXPECT_FALSE(val2.valid); + EXPECT_TRUE(val2.error.find("expired") != std::string::npos); +} + +// Test: Cache lookup performance (repeated validations should not slow down) +TEST_F(AuthManagerCacheTest, CachePerformanceMultipleValidations) { + auto auth_result = auth_manager_->authenticate("cache_test_user", "password"); + ASSERT_TRUE(auth_result.has_value()); + + const auto & token = auth_result->access_token; + + // Perform 100 validations (should be fast due to caching) + auto start = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < 100; ++i) { + auto validation = auth_manager_->validate_token(token); + EXPECT_TRUE(validation.valid); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + // 100 validations should complete in reasonable time (< 1 second) + EXPECT_LT(duration.count(), 1000); +} + int main(int argc, char ** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ros2_medkit_gateway/test/test_native_topic_sampler_cache.cpp b/src/ros2_medkit_gateway/test/test_native_topic_sampler_cache.cpp new file mode 100644 index 00000000..eca7b60c --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_native_topic_sampler_cache.cpp @@ -0,0 +1,146 @@ +// Copyright 2026 +// +// Tests for NativeTopicSampler component-topic map cache behavior + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "ros2_medkit_gateway/native_topic_sampler.hpp" + +using ros2_medkit_gateway::NativeTopicSampler; + +class NativeTopicSamplerCacheTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + rclcpp::init(0, nullptr); + } + + static void TearDownTestSuite() { + rclcpp::shutdown(); + } + + void SetUp() override { + node_ = std::make_shared("test_sampler_node_cache"); + sampler_ = std::make_unique(node_.get()); + + // Publisher node in namespace /cache_test_ns + rclcpp::NodeOptions options; + options.arguments({"--ros-args", "-r", "__ns:=/cache_test_ns"}); + publisher_node_ = std::make_shared("cache_publisher", options); + + pub1_ = publisher_node_->create_publisher("/cache_test_ns/topic1", 10); + + // Allow the ROS graph to update + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + rclcpp::spin_some(node_); + } + + void TearDown() override { + pub1_.reset(); + sampler_.reset(); + publisher_node_.reset(); + node_.reset(); + } + + std::shared_ptr node_; + std::shared_ptr publisher_node_; + std::unique_ptr sampler_; + rclcpp::Publisher::SharedPtr pub1_; +}; + +TEST_F(NativeTopicSamplerCacheTest, CachePopulatedOnFirstCall) { + // Component FQN is namespace + node name + const std::string comp = "/cache_test_ns/cache_publisher"; + + auto topics = sampler_->get_component_topics(comp); + + // Should include our published topic + bool found = false; + for (const auto & t : topics.publishes) { + if (t == "/cache_test_ns/topic1") { + found = true; + break; + } + } + EXPECT_TRUE(found) << "Cache build should discover published topic on first call"; +} + +TEST_F(NativeTopicSamplerCacheTest, CachedDataReturnedOnSubsequentCalls) { + const std::string comp = "/cache_test_ns/cache_publisher"; + + auto first = sampler_->get_component_topics(comp); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + rclcpp::spin_some(node_); + auto second = sampler_->get_component_topics(comp); + + // Results should be equivalent (no graph change) + EXPECT_EQ(first.publishes.size(), second.publishes.size()); + EXPECT_EQ(first.subscribes.size(), second.subscribes.size()); +} + +TEST_F(NativeTopicSamplerCacheTest, CacheInvalidatedWhenGraphChanges) { + const std::string comp = "/cache_test_ns/cache_publisher"; + + // Ensure initial topic present + auto before = sampler_->get_component_topics(comp); + bool found_before = std::any_of(before.publishes.begin(), before.publishes.end(), [](const std::string & t) { + return t == "/cache_test_ns/topic1"; + }); + EXPECT_TRUE(found_before); + + // Remove publisher (simulate graph change) + pub1_.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + rclcpp::spin_some(node_); + + auto after = sampler_->get_component_topics(comp); + + bool found_after = std::any_of(after.publishes.begin(), after.publishes.end(), [](const std::string & t) { + return t == "/cache_test_ns/topic1"; + }); + + EXPECT_FALSE(found_after) << "Cache should be invalidated and rebuilt after publisher removed"; +} + +TEST_F(NativeTopicSamplerCacheTest, ThreadSafetyUnderConcurrentAccess) { + const std::string comp = "/cache_test_ns/cache_publisher"; + + constexpr int NUM_THREADS = 8; + constexpr int ITER = 100; + + std::atomic success_count{0}; + + std::vector threads; + for (int i = 0; i < NUM_THREADS; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < ITER; ++j) { + auto topics = sampler_->get_component_topics(comp); + // If the cache is functioning, either we find the topic or not depending on graph state + // Increment success if call returned without crashing and produced a valid structure + if (topics.publishes.size() >= 0) { + ++success_count; + } + // Small yield + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + } + + for (auto & t : threads) { + t.join(); + } + + EXPECT_EQ(success_count.load(), NUM_THREADS * ITER); +} + +int main(int argc, char ** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}