Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 114 additions & 45 deletions src/cpp/src/kvs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <fstream>
#include <iostream>
#include <sstream>
#include <filesystem>

// TODO Default Value Handling TBD
// TODO Add Score Logging
Expand Down Expand Up @@ -326,6 +327,7 @@ score::Result<bool> Kvs::key_exists(const std::string_view key)
return result;
}


/* Retrieve the value associated with a key*/
score::Result<KvsValue> Kvs::get_value(const std::string_view key)
{
Expand Down Expand Up @@ -475,6 +477,57 @@ score::ResultBlank Kvs::remove_key(const std::string_view key)
return result;
}

score::Result<size_t> Kvs::get_file_size(const score::filesystem::Path& file_path) {
std::error_code ec;
const auto size = std::filesystem::file_size(file_path.CStr(), ec);

if (ec) {
// Check if the error is "file not found"
if (ec == std::errc::no_such_file_or_directory) {
// File does not exist, its size is 0. This is not an error.
return 0;
}
logger->LogError() << "Error: Could not get size of file " << file_path << ": " << ec.message();
return score::MakeUnexpected(ErrorCode::PhysicalStorageFailure);
}

return size;
}

/* Helper Function to get current storage size of all persisted files (defaults and historical snapshots) */
score::Result<size_t> Kvs::get_current_storage_size() {
size_t total_size = 0;
const std::array<const char*, 2> file_extensions = {".json", ".hash"};

// Add the size of the default files
const std::string default_suffix = "_default";
for (const char* extension : file_extensions) {
const score::filesystem::Path file_path = filename_prefix.Native() + default_suffix + extension;
auto size_result = get_file_size(file_path);
if (!size_result) {
return size_result; // Propagate error directly
}
total_size += size_result.value();
}

// Add the size of historical snapshots (1 to N).
// The loop starts at 1 to intentionally exclude the current working snapshot (index 0),
// allowing the caller to add its size manually for a final check.
for (size_t snapshot_index = 1; snapshot_index <= KVS_MAX_SNAPSHOTS; ++snapshot_index) {
const std::string snapshot_suffix = "_" + to_string(snapshot_index);

for (const char* extension : file_extensions) {
const score::filesystem::Path file_path = filename_prefix.Native() + snapshot_suffix + extension;
auto size_result = get_file_size(file_path);
if (!size_result) {
return size_result; // Propagate error directly
}
total_size += size_result.value();
}
}
return total_size;
}

/* Helper Function to write JSON data to a file for flush process (also adds Hash file)*/
score::ResultBlank Kvs::write_json_data(const std::string& buf)
{
Expand Down Expand Up @@ -521,66 +574,82 @@ score::ResultBlank Kvs::write_json_data(const std::string& buf)
return result;
}

/* Flush the key-value store*/
score::ResultBlank Kvs::flush()
{
score::ResultBlank result = score::MakeUnexpected(ErrorCode::UnmappedError);
/* Create JSON Object */
score::Result<std::string> Kvs::serialize_and_check() {
score::json::Object root_obj;
bool error = false;

// 1. Serialize the current KVS map to a buffer
{
std::unique_lock<std::mutex> lock(kvs_mutex, std::try_to_lock);
if (lock.owns_lock())
{
for (const auto& [key, value] : kvs)
{
auto conv = kvsvalue_to_any(value);
if (!conv)
{
result = score::MakeUnexpected(static_cast<ErrorCode>(*conv.error()));
error = true;
break;
}
else
{
root_obj.emplace(key, std::move(conv.value()) /*emplace in map uses move operator*/
);
if (!conv) {
return score::MakeUnexpected(static_cast<ErrorCode>(*conv.error()));
}
root_obj.emplace(key, std::move(conv.value()));
}
}
else
{
result = score::MakeUnexpected(ErrorCode::MutexLockFailed);
error = true;
} else {
return score::MakeUnexpected(ErrorCode::MutexLockFailed);
}
}

if (!error)
{
/* Serialize Buffer */
auto buf_res = writer->ToBuffer(root_obj);
if (!buf_res)
{
result = score::MakeUnexpected(ErrorCode::JsonGeneratorError);
}
else
{
/* Rotate Snapshots */
auto rotate_result = snapshot_rotate();
if (!rotate_result)
{
result = rotate_result;
}
else
{
/* Write JSON Data */
std::string buf = std::move(buf_res.value());
result = write_json_data(buf);
}
}
auto buf_res = writer->ToBuffer(root_obj);
if (!buf_res) {
return score::MakeUnexpected(ErrorCode::JsonGeneratorError);
}
const std::string& buf = buf_res.value();

return result;
// 2. Get the size of all other persisted files
auto current_size_res = get_current_storage_size();
if (!current_size_res) {
return score::MakeUnexpected(static_cast<ErrorCode>(*current_size_res.error()));
}

// 3. Calculate the potential total size
const size_t total_size = current_size_res.value() + buf.size() + HASH_FILE_SIZE;

// 4. Check against the limit
if (total_size > KVS_MAX_STORAGE_BYTES) {
logger->LogError() << "error: KVS storage limit would be exceeded. total_size:" << total_size
<< " KVS_MAX_STORAGE_BYTES:" << KVS_MAX_STORAGE_BYTES;
return score::MakeUnexpected(ErrorCode::OutOfStorageSpace);
}

return buf;
}

/* Flush the key-value store*/
score::ResultBlank Kvs::flush() {
auto result = serialize_and_check();
if (!result) {
return score::MakeUnexpected(static_cast<ErrorCode>(*result.error()));
}

auto rotate_result = snapshot_rotate();
if (!rotate_result) {
return rotate_result;
}

return write_json_data(result.value());
}

/* Performs a 'dry run' to check the potential storage size */
score::Result<size_t> Kvs::calculate_potential_size() {
auto result = serialize_and_check();
if (!result) {
return score::MakeUnexpected(static_cast<ErrorCode>(*result.error()));
}

// Re-calculate size to return it, as serialize_and_check only returns the buffer
const std::string& buf = result.value();
auto current_size_res = get_current_storage_size();
if (!current_size_res) {
return score::MakeUnexpected(static_cast<ErrorCode>(*current_size_res.error()));
}

return current_size_res.value() + buf.size() + HASH_FILE_SIZE;
}

/* Retrieve the snapshot count*/
Expand Down
72 changes: 46 additions & 26 deletions src/cpp/src/kvs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
#include <vector>

#define KVS_MAX_SNAPSHOTS 3
#define KVS_MAX_STORAGE_BYTES (10000) /* Max total storage size for all snapshots including hash files in bytes */
static constexpr size_t HASH_FILE_SIZE = 4;

namespace score::mw::per::kvs
{

{
struct InstanceId
{
size_t id;
Expand Down Expand Up @@ -352,38 +354,56 @@ class Kvs final
*/
score::Result<score::filesystem::Path> get_hash_filename(const SnapshotId& snapshot_id) const;

private:
/* Private constructor to prevent direct instantiation */
Kvs();
/**
* @brief Performs a 'dry run' to check if the current in-memory store would
* exceed the storage limit upon flushing.
*
* This function serializes the current key-value data to a temporary buffer
* and calculates the potential total storage size. It checks this size against
* the compile-time `KVS_MAX_STORAGE_BYTES` limit.
*
* @return A score::Result object containing either:
* - On success: The estimated total size (size_t) that the KVS would occupy after a flush.
* - On failure: An `OutOfStorageSpace` error if the limit would be exceeded,
* or another ErrorCode for other failures (e.g., serialization).
*/
score::Result<size_t> calculate_potential_size();

private:
/* Private constructor to prevent direct instantiation */
Kvs();

/* Internal storage and configuration details.*/
std::mutex kvs_mutex;
std::unordered_map<std::string, KvsValue> kvs;
/* Internal storage and configuration details.*/
std::mutex kvs_mutex;
std::unordered_map<std::string, KvsValue> kvs;

/* Optional default values */
std::unordered_map<std::string, KvsValue> default_values;
/* Optional default values */
std::unordered_map<std::string, KvsValue> default_values;

/* Filename prefix */
score::filesystem::Path filename_prefix;
/* Filename prefix */
score::filesystem::Path filename_prefix;

/* Filesystem handling */
std::unique_ptr<score::filesystem::Filesystem> filesystem;
/* Filesystem handling */
std::unique_ptr<score::filesystem::Filesystem> filesystem;

/* Json handling */
std::unique_ptr<score::json::IJsonParser> parser;
std::unique_ptr<score::json::IJsonWriter> writer;
/* Json handling */
std::unique_ptr<score::json::IJsonParser> parser;
std::unique_ptr<score::json::IJsonWriter> writer;

/* Logging */
std::unique_ptr<score::mw::log::Logger> logger;
/* Logging */
std::unique_ptr<score::mw::log::Logger> logger;

/* Private Methods */
score::ResultBlank snapshot_rotate();
score::Result<std::unordered_map<std::string, KvsValue>> parse_json_data(const std::string& data);
score::Result<std::unordered_map<std::string, KvsValue>> open_json(const score::filesystem::Path& prefix,
OpenJsonNeedFile need_file);
score::ResultBlank write_json_data(const std::string& buf);
};
/* Private Methods */
score::Result<std::string> serialize_and_check();
score::Result<size_t> get_file_size(const score::filesystem::Path& file_path);
score::Result<size_t> get_current_storage_size();
score::ResultBlank snapshot_rotate();
score::Result<std::unordered_map<std::string, KvsValue>> parse_json_data(const std::string& data);
score::Result<std::unordered_map<std::string, KvsValue>> open_json(const score::filesystem::Path& prefix, OpenJsonNeedFile need_file);
score::ResultBlank write_json_data(const std::string& buf);

};

} /* namespace score::mw::per::kvs */
}/* namespace score::mw::per::kvs */

#endif /* SCORE_LIB_KVS_KVS_HPP */
88 changes: 88 additions & 0 deletions src/cpp/tests/test_kvs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,3 +1222,91 @@ TEST(kvs_get_filename, get_hashname_failure)

cleanup_environment();
}


TEST(kvs, flush_fails_when_storage_limit_exceeded) {
// Use a unique directory for this test to avoid conflicts
const std::string test_dir = "./kvs_storage_test/";
// Ensure the directory is clean before starting
std::filesystem::remove_all(test_dir);

// 1. Setup KVS
KvsBuilder builder(instance_id);
builder.dir("./kvs_storage_test/");
auto open_res = builder.build();
ASSERT_TRUE(open_res);
Kvs kvs = std::move(open_res.value());

// 2. Add data that is close to the limit.
// There is overhead for the JSON structure (key, type info, braces, etc.) and the hash file (4 bytes).
// We will make the data payload a bit smaller than the max to account for this.
size_t overhead_estimate = 100;
size_t data_size = KVS_MAX_STORAGE_BYTES - overhead_estimate;
std::string large_data(data_size, 'a');

auto set_res1 = kvs.set_value("large_data", KvsValue(large_data.c_str()));
ASSERT_TRUE(set_res1);

// 3. Flush the first batch of data and assert it succeeds.
auto flush_res1 = kvs.flush();
ASSERT_TRUE(flush_res1);

// 4. Add a little more data, which should push the total size over the limit.
auto set_res2 = kvs.set_value("extra_data", KvsValue("this should not fit"));
ASSERT_TRUE(set_res2);

// 5. The second flush should fail because the storage limit is exceeded.
auto flush_res2 = kvs.flush();
ASSERT_FALSE(flush_res2);
ASSERT_EQ(static_cast<ErrorCode>(*flush_res2.error()), ErrorCode::OutOfStorageSpace);

// Cleanup the test directory
std::filesystem::remove_all(test_dir);
}


TEST(kvs_check_size, check_size_scenarios) {
const std::string test_dir = "./kvs_check_size_test/";
std::filesystem::remove_all(test_dir);

// --- SCENARIO 1: Success on data within limits ---
{
KvsBuilder builder(InstanceId(1));
builder.dir(std::string(test_dir));
auto open_res = builder.build();
ASSERT_TRUE(open_res);
Kvs kvs = std::move(open_res.value());

// Add a small amount of data
auto set_res = kvs.set_value("key", KvsValue("some_data"));
ASSERT_TRUE(set_res);

// Check the size
auto check_res = kvs.calculate_potential_size();
ASSERT_TRUE(check_res) << "check_size should succeed for data within limits";
// Check if the returned size is plausible
EXPECT_GT(check_res.value(), 0);
EXPECT_LT(check_res.value(), KVS_MAX_STORAGE_BYTES);
}

// --- SCENARIO 2: Failure on data exceeding limits ---
{
KvsBuilder builder(InstanceId(2));
builder.dir(std::string(test_dir));
auto open_res = builder.build();
ASSERT_TRUE(open_res);
Kvs kvs = std::move(open_res.value());

// Add data that will certainly exceed the storage limit
std::string large_data(KVS_MAX_STORAGE_BYTES, 'x');
auto set_res = kvs.set_value("oversized_key", KvsValue(large_data.c_str()));
ASSERT_TRUE(set_res);

// Check the size, expecting a failure
auto check_res = kvs.calculate_potential_size();
ASSERT_FALSE(check_res) << "check_size should fail when storage limit is exceeded";
EXPECT_EQ(static_cast<ErrorCode>(*check_res.error()), ErrorCode::OutOfStorageSpace);
}

std::filesystem::remove_all(test_dir);
}
Loading