From 137992f9b46744c316589f34111ff1cd923c6a9f Mon Sep 17 00:00:00 2001 From: Roman Koshelev Date: Wed, 25 Nov 2020 18:12:57 +0300 Subject: [PATCH 1/3] Removed unused method --- runtime/RuntimeLogger.cc | 33 --------------------------------- runtime/RuntimeLogger.h | 2 -- 2 files changed, 35 deletions(-) diff --git a/runtime/RuntimeLogger.cc b/runtime/RuntimeLogger.cc index 56b7c0f..de170e3 100644 --- a/runtime/RuntimeLogger.cc +++ b/runtime/RuntimeLogger.cc @@ -313,39 +313,6 @@ RuntimeLogger::preallocate() { // the user is already willing to invoke this up front cost. } -/** -* Internal helper function to wait for AIO completion. -*/ -void -RuntimeLogger::waitForAIO() { - if (hasOutstandingOperation) { - if (aio_error(&aioCb) == EINPROGRESS) { - const struct aiocb *const aiocb_list[] = {&aioCb}; - int err = aio_suspend(aiocb_list, 1, NULL); - - if (err != 0) - perror("LogCompressor's Posix AIO suspend operation failed"); - } - - int err = aio_error(&aioCb); - ssize_t ret = aio_return(&aioCb); - - if (err != 0) { - fprintf(stderr, "LogCompressor's POSIX AIO failed with %d: %s\r\n", - err, strerror(err)); - } else if (ret < 0) { - perror("LogCompressor's Posix AIO Write operation failed"); - } - ++numAioWritesCompleted; - hasOutstandingOperation = false; - - if (syncStatus == WAITING_ON_AIO) { - syncStatus = SYNC_COMPLETED; - hintSyncCompleted.notify_one(); - } - } -} - /** * Main compression thread that handles scanning through the StagingBuffers, * compressing log entries, and outputting a compressed log file. diff --git a/runtime/RuntimeLogger.h b/runtime/RuntimeLogger.h index 03ba2b9..dbdb1d0 100644 --- a/runtime/RuntimeLogger.h +++ b/runtime/RuntimeLogger.h @@ -161,8 +161,6 @@ using namespace NanoLog; void setLogFile_internal(const char *filename); - void waitForAIO(); - /** * Allocates thread-local structures if they weren't already allocated. * This is used by the generated C++ code to ensure it has space to From c1373e11dd1cc4175c8b7313e23127333cbdebd1 Mon Sep 17 00:00:00 2001 From: Roman Koshelev Date: Mon, 23 Nov 2020 22:02:48 +0300 Subject: [PATCH 2/3] Wrote my AIO POSIX asynchronous I/O is very sub-optimal. They create threads, dynamically allocate memory, and invoke a lot of system calls (which set the signal mask, priority, and more). Additional locks are also possible (if the AIO is using someone else). Moreover, they are not portable. --- runtime/DoubleBuffer.h | 111 ++++++++++++++++++++++++++++ runtime/Portability.h | 8 ++ runtime/RuntimeLogger.cc | 154 ++++++++++++++++++--------------------- runtime/RuntimeLogger.h | 30 ++++---- 4 files changed, 204 insertions(+), 99 deletions(-) create mode 100644 runtime/DoubleBuffer.h diff --git a/runtime/DoubleBuffer.h b/runtime/DoubleBuffer.h new file mode 100644 index 0000000..062f474 --- /dev/null +++ b/runtime/DoubleBuffer.h @@ -0,0 +1,111 @@ + +#pragma once +#include +#include +#include +#include +#include + +#include + +#include "Config.h" +#include "Portability.h" + +class DoubleBuffer +{ + struct Deleter { + void operator()(char *__ptr) const { free(__ptr); } + }; + + using MemPtrT = std::unique_ptr; + + static MemPtrT allocateMemory() + { + char *tmp = (char *)operator new(NanoLogConfig::OUTPUT_BUFFER_SIZE, + std::align_val_t(512), std::nothrow); + if (tmp == nullptr) { + perror( + "The NanoLog system was not able to allocate enough memory " + "to support its operations. Quitting...\n"); + std::exit(-1); + } + return MemPtrT{tmp}; + }; + + static char *accessOnce(const MemPtrT &ptr) + { + NANOLOG_READ_WRITE_BARRIER; + return ptr.get(); + } + + MemPtrT freeBuffer; + MemPtrT compressingBuffer; + MemPtrT writeBuffer; + unsigned size; + int errorCode; + + std::condition_variable condition; + std::mutex mutex; + +public: + DoubleBuffer() + : freeBuffer(allocateMemory()), + compressingBuffer(allocateMemory()), + writeBuffer(nullptr), + size(0), + errorCode(0), + condition(), + mutex(){}; + + char *getCompressingBuffer() noexcept { return compressingBuffer.get(); } + + bool writeInProgress() const { return accessOnce(freeBuffer) == nullptr; } + + int swapBuffer(unsigned count) noexcept + { + while (accessOnce(writeBuffer) != nullptr) {} + + { + std::unique_lock lock(mutex); + size = count; + std::swap(writeBuffer, compressingBuffer); + if (freeBuffer == nullptr) { + condition.wait(lock, [this]() { return freeBuffer != nullptr; }); + } else { + condition.notify_one(); + } + std::swap(freeBuffer, compressingBuffer); + return errorCode; + } + } + + void writeToFile(int file) noexcept + { + unsigned tmp_size = 0; + MemPtrT tmp_ptr = nullptr; + + { + std::unique_lock lock(mutex); + condition.wait(lock, [this]() { return writeBuffer != nullptr; }); + tmp_size = size; + std::swap(writeBuffer, tmp_ptr); + } + + int res = 0; + if (tmp_size != 0) { + res = write(file, tmp_ptr.get(), tmp_size); + res = (res < 0) ? errno : 0; + } + + while (accessOnce(freeBuffer) != nullptr) {} + + { + std::unique_lock lock(mutex); + errorCode = res; + std::swap(freeBuffer, tmp_ptr); + if (compressingBuffer == nullptr) { + condition.notify_one(); + } + } + } +}; \ No newline at end of file diff --git a/runtime/Portability.h b/runtime/Portability.h index 129c051..59cd097 100644 --- a/runtime/Portability.h +++ b/runtime/Portability.h @@ -49,4 +49,12 @@ #define NANOLOG_PRINTF_FORMAT_ATTR(string_index, first_to_check) #endif +#if defined(_MSC_VER) +extern "C" void _ReadWriteBarrier(void); +#pragma intrinsic(_ReadWriteBarrier) +#define NANOLOG_READ_WRITE_BARRIER _ReadWriteBarrier() +#elif defined(__GNUC__) +#define NANOLOG_READ_WRITE_BARRIER __asm__ __volatile__("" ::: "memory") +#endif + #endif /* NANOLOG_PORTABILITY_H */ \ No newline at end of file diff --git a/runtime/RuntimeLogger.cc b/runtime/RuntimeLogger.cc index de170e3..871ca0f 100644 --- a/runtime/RuntimeLogger.cc +++ b/runtime/RuntimeLogger.cc @@ -43,14 +43,13 @@ RuntimeLogger::RuntimeLogger() , compressionThread() , hasOutstandingOperation(false) , compressionThreadShouldExit(false) + , writerThreadShouldExit(false) , syncStatus(SYNC_COMPLETED) , condMutex() , workAdded() , hintSyncCompleted() , outputFd(-1) - , aioCb() - , compressingBuffer(nullptr) - , outputDoubleBuffer(nullptr) + , buffer() , currentLogLevel(NOTICE) , cycleAtThreadStart(0) , cyclesAtLastAIOStart(0) @@ -82,26 +81,9 @@ RuntimeLogger::RuntimeLogger() std::exit(-1); } - memset(&aioCb, 0, sizeof(aioCb)); - - int err = posix_memalign(reinterpret_cast(&compressingBuffer), - 512, NanoLogConfig::OUTPUT_BUFFER_SIZE); - if (err) { - perror("The NanoLog system was not able to allocate enough memory " - "to support its operations. Quitting...\r\n"); - std::exit(-1); - } - - err = posix_memalign(reinterpret_cast(&outputDoubleBuffer), - 512, NanoLogConfig::OUTPUT_BUFFER_SIZE); - if (err) { - perror("The NanoLog system was not able to allocate enough memory " - "to support its operations. Quitting...\r\n"); - std::exit(-1); - } - #ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this); + writerThread = std::thread(&RuntimeLogger::writerThreadMain, this); #endif } @@ -116,19 +98,16 @@ RuntimeLogger::~RuntimeLogger() { nanoLogSingleton.workAdded.notify_all(); } + // Stop the writer thread completely + writerThreadShouldExit = true; + buffer.swapBuffer(0); + writerThread.join(); + if (nanoLogSingleton.compressionThread.joinable()) nanoLogSingleton.compressionThread.join(); - // Free all the data structures - if (compressingBuffer) { - free(compressingBuffer); - compressingBuffer = nullptr; - } - - if (outputDoubleBuffer) { - free(outputDoubleBuffer); - outputDoubleBuffer = nullptr; - } + if (nanoLogSingleton.writerThread.joinable()) + nanoLogSingleton.writerThread.join(); if (outputFd > 0) close(outputFd); @@ -314,8 +293,8 @@ RuntimeLogger::preallocate() { } /** -* Main compression thread that handles scanning through the StagingBuffers, -* compressing log entries, and outputting a compressed log file. +* Main compression thread that handles scanning through the StagingBuffers and +* compressing log entries. */ void RuntimeLogger::compressionThreadMain() { @@ -329,7 +308,7 @@ RuntimeLogger::compressionThreadMain() { cycleAtThreadStart = cyclesAwakeStart; // Manages the state associated with compressing log messages - Log::Encoder encoder(compressingBuffer, NanoLogConfig::OUTPUT_BUFFER_SIZE); + Log::Encoder encoder(buffer.getCompressingBuffer(), NanoLogConfig::OUTPUT_BUFFER_SIZE); // Indicates whether a compression operation failed or not due // to insufficient space in the outputBuffer @@ -502,51 +481,35 @@ RuntimeLogger::compressionThreadMain() { } if (hasOutstandingOperation) { - if (aio_error(&aioCb) == EINPROGRESS) { - const struct aiocb *const aiocb_list[] = {&aioCb}; - if (outputBufferFull) { - // If the output buffer is full and we're not done, - // wait for completion - cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; - int err = aio_suspend(aiocb_list, 1, NULL); + if (buffer.writeInProgress() && !outputBufferFull) { + // If there's no new data, go to sleep. + if (bytesConsumedThisIteration == 0 && + NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0) + { + std::unique_lock lock(condMutex); + cyclesActive += PerfUtils::Cycles::rdtsc() - + cyclesAwakeStart; + workAdded.wait_for(lock, std::chrono::microseconds( + NanoLogConfig::POLL_INTERVAL_DURING_IO_US)); cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); - if (err != 0) - perror("LogCompressor's Posix AIO " - "suspend operation failed"); - } else { - // If there's no new data, go to sleep. - if (bytesConsumedThisIteration == 0 && - NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0) - { - std::unique_lock lock(condMutex); - cyclesActive += PerfUtils::Cycles::rdtsc() - - cyclesAwakeStart; - workAdded.wait_for(lock, std::chrono::microseconds( - NanoLogConfig::POLL_INTERVAL_DURING_IO_US)); - cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); - } - - if (aio_error(&aioCb) == EINPROGRESS) - continue; } - } - - // Finishing up the IO - int err = aio_error(&aioCb); - ssize_t ret = aio_return(&aioCb); - if (err != 0) { - fprintf(stderr, "LogCompressor's POSIX AIO failed" - " with %d: %s\r\n", err, strerror(err)); - } else if (ret < 0) { - perror("LogCompressor's Posix AIO Write failed"); + if (buffer.writeInProgress()) + continue; } - ++numAioWritesCompleted; - hasOutstandingOperation = false; - cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); // We've completed an AIO, check if we need to notify if (syncStatus == WAITING_ON_AIO) { + cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; + int err = buffer.swapBuffer(0); + cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); + if (err != 0) + fprintf(stderr, "LogCompressor's write failed with %d: %s\n", + err, strerror(err)); + + ++numAioWritesCompleted; + hasOutstandingOperation = false; + cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); std::unique_lock lock(nanoLogSingleton.condMutex); if (syncStatus == WAITING_ON_AIO) { syncStatus = SYNC_COMPLETED; @@ -567,27 +530,32 @@ RuntimeLogger::compressionThreadMain() { ssize_t bytesOver = bytesToWrite % 512; if (bytesOver != 0) { - memset(compressingBuffer, 0, 512 - bytesOver); + memset(buffer.getCompressingBuffer(), 0, 512 - bytesOver); bytesToWrite = bytesToWrite + 512 - bytesOver; padBytesWritten += (512 - bytesOver); } } - aioCb.aio_fildes = outputFd; - aioCb.aio_buf = compressingBuffer; - aioCb.aio_nbytes = bytesToWrite; totalBytesWritten += bytesToWrite; - cyclesAtLastAIOStart = PerfUtils::Cycles::rdtsc(); - if (aio_write(&aioCb) == -1) - fprintf(stderr, "Error at aio_write(): %s\n", strerror(errno)); + uint64_t tmp = PerfUtils::Cycles::rdtsc(); + int err = buffer.swapBuffer(bytesToWrite); + + if (hasOutstandingOperation) { + cyclesActive += tmp - cyclesAwakeStart; + cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); + if (err != 0) + fprintf(stderr, "LogCompressor's write failed with %d: %s\n", err, + strerror(err)); - hasOutstandingOperation = true; + ++numAioWritesCompleted; + cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); + } - // Swap buffers - encoder.swapBuffer(outputDoubleBuffer, + cyclesAtLastAIOStart = tmp; + hasOutstandingOperation = true; + encoder.swapBuffer(buffer.getCompressingBuffer(), NanoLogConfig::OUTPUT_BUFFER_SIZE); - std::swap(outputDoubleBuffer, compressingBuffer); outputBufferFull = false; } @@ -595,6 +563,16 @@ RuntimeLogger::compressionThreadMain() { cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; } +/** +* Main writer thread that outputting a compressed log file. +*/ +void +RuntimeLogger::writerThreadMain() { + while (!writerThreadShouldExit) { + buffer.writeToFile(outputFd); + } +} + // Documentation in NanoLog.h void RuntimeLogger::setLogFile_internal(const char *filename) { @@ -625,9 +603,17 @@ RuntimeLogger::setLogFile_internal(const char *filename) { workAdded.notify_all(); } + // Stop the writer thread completely + writerThreadShouldExit = true; + buffer.swapBuffer(0); + writerThread.join(); + if (compressionThread.joinable()) compressionThread.join(); + if (writerThread.joinable()) + writerThread.join(); + if (outputFd > 0) close(outputFd); outputFd = newFd; @@ -635,8 +621,10 @@ RuntimeLogger::setLogFile_internal(const char *filename) { // Relaunch thread nextInvocationIndexToBePersisted = 0; // Reset the dictionary compressionThreadShouldExit = false; + writerThreadShouldExit = false; #ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this); + writerThread = std::thread(&RuntimeLogger::writerThreadMain, this); #endif } diff --git a/runtime/RuntimeLogger.h b/runtime/RuntimeLogger.h index dbdb1d0..65dcee0 100644 --- a/runtime/RuntimeLogger.h +++ b/runtime/RuntimeLogger.h @@ -16,7 +16,6 @@ #ifndef RUNTIME_NANOLOG_H #define RUNTIME_NANOLOG_H -#include #include #include @@ -27,6 +26,7 @@ #include "Config.h" #include "Common.h" +#include "DoubleBuffer.h" #include "Fence.h" #include "Log.h" #include "NanoLog.h" @@ -159,6 +159,8 @@ using namespace NanoLog; void compressionThreadMain(); + void writerThreadMain(); + void setLogFile_internal(const char *filename); /** @@ -192,10 +194,13 @@ using namespace NanoLog; // Protects reads and writes to threadBuffers std::mutex bufferMutex; - // Background thread that polls the various staging buffers, compresses - // the staged log messages, and outputs it to a file. + // Background thread that polls the various staging buffers and compresses + // the staged log messages. std::thread compressionThread; + // Background thread that writes log to file + std::thread writerThread; + // Indicates there's an operation in aioCb that should be waited on bool hasOutstandingOperation; @@ -203,6 +208,10 @@ using namespace NanoLog; // typically only set in testing or when the application is exiting. bool compressionThreadShouldExit; + // Flag signaling the writerThread to stop running. This is + // typically only set in testing or when the application is exiting. + bool writerThreadShouldExit; + // Marks the progress of flushing all log messages to disk after a user // invokes the sync() API. To complete the operation, the background // thread has to make two passes through the staging buffers and wait @@ -228,19 +237,8 @@ using namespace NanoLog; // construction of the LogCompressor int outputFd; - // POSIX AIO structure used to communicate async IO requests - struct aiocb aioCb; - - // Used to stage the compressed log messages before passing it on to the - // POSIX AIO library. - - // Dynamically allocated buffer to stage compressed log message before - // handing it over to the POSIX AIO library for output. - char *compressingBuffer; - - // Dynamically allocated double buffer that is swapped with the - // compressingBuffer when the latter is passed to the POSIX AIO library. - char *outputDoubleBuffer; + // Buffer to stage compressed log message + DoubleBuffer buffer; // Minimum log level that RuntimeLogger will accept. Anything lower will // be dropped. From 1ae869fe98572bc90ad0b3c5c6a5ae066c65f97f Mon Sep 17 00:00:00 2001 From: Roman Koshelev Date: Wed, 2 Dec 2020 21:24:04 +0300 Subject: [PATCH 3/3] Rewritten file I / O for portable C API Apart from portability, it also provides more guarantees (for example, the write system call does not guarantee that the entire buffer will be written, unlike fwrite, which guarantees that the entire buffer will be written without errors). --- runtime/Config.h | 2 +- runtime/DoubleBuffer.h | 22 +++--------- runtime/FSyncPortable.h | 27 +++++++++++++++ runtime/RuntimeLogger.cc | 72 ++++++++++++++++------------------------ runtime/RuntimeLogger.h | 7 ++-- 5 files changed, 63 insertions(+), 67 deletions(-) create mode 100644 runtime/FSyncPortable.h diff --git a/runtime/Config.h b/runtime/Config.h index 3ee6d87..28ee813 100644 --- a/runtime/Config.h +++ b/runtime/Config.h @@ -25,7 +25,7 @@ namespace NanoLogConfig { // Controls in what mode the compressed log file will be opened - static const int FILE_PARAMS = O_APPEND|O_RDWR|O_CREAT|O_NOATIME|O_DSYNC; + static const char FILE_PARAMS[] = "a+"; // Location of the initial log file static const char DEFAULT_LOG_FILE[] = "./compressedLog"; diff --git a/runtime/DoubleBuffer.h b/runtime/DoubleBuffer.h index 062f474..6d3bfbd 100644 --- a/runtime/DoubleBuffer.h +++ b/runtime/DoubleBuffer.h @@ -13,23 +13,11 @@ class DoubleBuffer { - struct Deleter { - void operator()(char *__ptr) const { free(__ptr); } - }; - - using MemPtrT = std::unique_ptr; + using MemPtrT = std::unique_ptr; static MemPtrT allocateMemory() { - char *tmp = (char *)operator new(NanoLogConfig::OUTPUT_BUFFER_SIZE, - std::align_val_t(512), std::nothrow); - if (tmp == nullptr) { - perror( - "The NanoLog system was not able to allocate enough memory " - "to support its operations. Quitting...\n"); - std::exit(-1); - } - return MemPtrT{tmp}; + return MemPtrT{new char[NanoLogConfig::OUTPUT_BUFFER_SIZE]}; }; static char *accessOnce(const MemPtrT &ptr) @@ -79,7 +67,7 @@ class DoubleBuffer } } - void writeToFile(int file) noexcept + void writeToFile(FILE* file) noexcept { unsigned tmp_size = 0; MemPtrT tmp_ptr = nullptr; @@ -93,8 +81,8 @@ class DoubleBuffer int res = 0; if (tmp_size != 0) { - res = write(file, tmp_ptr.get(), tmp_size); - res = (res < 0) ? errno : 0; + size_t wsize = fwrite(tmp_ptr.get(), 1u, tmp_size, file); + res = (wsize != tmp_size) ? errno : 0; } while (accessOnce(freeBuffer) != nullptr) {} diff --git a/runtime/FSyncPortable.h b/runtime/FSyncPortable.h new file mode 100644 index 0000000..4f2b464 --- /dev/null +++ b/runtime/FSyncPortable.h @@ -0,0 +1,27 @@ +#pragma once + +#ifndef _WIN32 + +#include + +#else + +#include +#include + +inline int fsync(int fd) { + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (h == INVALID_HANDLE_VALUE) { + return -1; + } + if (!FlushFileBuffers(h)) { + return -1; + } + return 0; +} + +inline int fdatasync(int fd) { + return fsync(fd); +} + +#endif \ No newline at end of file diff --git a/runtime/RuntimeLogger.cc b/runtime/RuntimeLogger.cc index 871ca0f..305532f 100644 --- a/runtime/RuntimeLogger.cc +++ b/runtime/RuntimeLogger.cc @@ -48,7 +48,7 @@ RuntimeLogger::RuntimeLogger() , condMutex() , workAdded() , hintSyncCompleted() - , outputFd(-1) + , outputFile(nullptr) , buffer() , currentLogLevel(NOTICE) , cycleAtThreadStart(0) @@ -60,7 +60,6 @@ RuntimeLogger::RuntimeLogger() , cyclesDiskIO_upperBound(0) , totalBytesRead(0) , totalBytesWritten(0) - , padBytesWritten(0) , logsProcessed(0) , numAioWritesCompleted(0) , coreId(-1) @@ -72,14 +71,15 @@ RuntimeLogger::RuntimeLogger() stagingBufferPeekDist[i] = 0; const char *filename = NanoLogConfig::DEFAULT_LOG_FILE; - outputFd = open(filename, NanoLogConfig::FILE_PARAMS, 0666); - if (outputFd < 0) { + outputFile = fopen(filename, NanoLogConfig::FILE_PARAMS); + if (outputFile == nullptr) { fprintf(stderr, "NanoLog could not open the default file location " "for the log file (\"%s\").\r\n Please check the permissions " "or use NanoLog::setLogFile(const char* filename) to " "specify a different log file.\r\n", filename); std::exit(-1); } + setvbuf(outputFile, nullptr, _IONBF, 0); #ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this); @@ -109,10 +109,10 @@ RuntimeLogger::~RuntimeLogger() { if (nanoLogSingleton.writerThread.joinable()) nanoLogSingleton.writerThread.join(); - if (outputFd > 0) - close(outputFd); + if (outputFile != nullptr) + fclose(outputFile); - outputFd = 0; + outputFile = nullptr; } // Documentation in NanoLog.h @@ -122,7 +122,7 @@ RuntimeLogger::getStats() { char buffer[1024]; // Leaks abstraction, but basically flush so we get all the time uint64_t start = PerfUtils::Cycles::rdtsc(); - fdatasync(nanoLogSingleton.outputFd); + fdatasync(fileno(nanoLogSingleton.outputFile)); uint64_t stop = PerfUtils::Cycles::rdtsc(); nanoLogSingleton.cyclesDiskIO_upperBound += (stop - start); @@ -136,8 +136,6 @@ RuntimeLogger::getStats() { nanoLogSingleton.totalBytesWritten); double totalBytesReadDouble = static_cast( nanoLogSingleton.totalBytesRead); - double padBytesWrittenDouble = static_cast( - nanoLogSingleton.padBytesWritten); double numEventsProcessedDouble = static_cast( nanoLogSingleton.logsProcessed); @@ -197,14 +195,11 @@ RuntimeLogger::getStats() { compressTime * 1.0e9 / numEventsProcessedDouble); out << buffer; - snprintf(buffer, 1024, "The compression ratio was %0.2lf-%0.2lfx " - "(%lu bytes in, %lu bytes out, %lu pad bytes)\n", - 1.0 * totalBytesReadDouble / (totalBytesWrittenDouble - + padBytesWrittenDouble), + snprintf(buffer, 1024, "The compression ratio was %0.2lf " + "(%lu bytes in, %lu bytes out)\n", 1.0 * totalBytesReadDouble / totalBytesWrittenDouble, nanoLogSingleton.totalBytesRead, - nanoLogSingleton.totalBytesWritten, - nanoLogSingleton.padBytesWritten); + nanoLogSingleton.totalBytesWritten); out << buffer; return out.str(); @@ -525,17 +520,6 @@ RuntimeLogger::compressionThreadMain() { if (bytesToWrite == 0) continue; - // Pad the output if necessary - if (NanoLogConfig::FILE_PARAMS & O_DIRECT) { - ssize_t bytesOver = bytesToWrite % 512; - - if (bytesOver != 0) { - memset(buffer.getCompressingBuffer(), 0, 512 - bytesOver); - bytesToWrite = bytesToWrite + 512 - bytesOver; - padBytesWritten += (512 - bytesOver); - } - } - totalBytesWritten += bytesToWrite; uint64_t tmp = PerfUtils::Cycles::rdtsc(); @@ -569,29 +553,29 @@ RuntimeLogger::compressionThreadMain() { void RuntimeLogger::writerThreadMain() { while (!writerThreadShouldExit) { - buffer.writeToFile(outputFd); + buffer.writeToFile(outputFile); } } // Documentation in NanoLog.h void RuntimeLogger::setLogFile_internal(const char *filename) { - // Check if it exists and is readable/writeable - if (access(filename, F_OK) == 0 && access(filename, R_OK | W_OK) != 0) { - std::string err = "Unable to read/write from new log file: "; - err.append(filename); - throw std::ios_base::failure(err); - } - // Try to open the file - int newFd = open(filename, NanoLogConfig::FILE_PARAMS, 0666); - if (newFd < 0) { - std::string err = "Unable to open file new log file: '"; - err.append(filename); - err.append("': "); - err.append(strerror(errno)); + FILE* newFile = fopen(filename, NanoLogConfig::FILE_PARAMS); + if (newFile == nullptr) { + std::string err; + if (errno == EACCES) { + err = "Unable to read/write from new log file: "; + err.append(filename); + } else { + err = "Unable to open file new log file: '"; + err.append(filename); + err.append("': "); + err.append(strerror(errno)); + } throw std::ios_base::failure(err); } + setvbuf(outputFile, nullptr, _IONBF, 0); // Everything seems okay, stop the background thread and change files sync(); @@ -614,9 +598,9 @@ RuntimeLogger::setLogFile_internal(const char *filename) { if (writerThread.joinable()) writerThread.join(); - if (outputFd > 0) - close(outputFd); - outputFd = newFd; + if (outputFile != nullptr) + fclose(outputFile); + outputFile = newFile; // Relaunch thread nextInvocationIndexToBePersisted = 0; // Reset the dictionary diff --git a/runtime/RuntimeLogger.h b/runtime/RuntimeLogger.h index 65dcee0..c44ff0e 100644 --- a/runtime/RuntimeLogger.h +++ b/runtime/RuntimeLogger.h @@ -17,6 +17,7 @@ #define RUNTIME_NANOLOG_H #include +#include #include #include @@ -235,7 +236,7 @@ using namespace NanoLog; // File handle for the output file; should only be opened once at the // construction of the LogCompressor - int outputFd; + FILE* outputFile; // Buffer to stage compressed log message DoubleBuffer buffer; @@ -276,10 +277,6 @@ using namespace NanoLog; // Metric: Number of bytes written to the output file (includes padding) uint64_t totalBytesWritten; - // Metric: Number of pad bytes written to round the file to the nearest - // 512B - uint64_t padBytesWritten; - // Metric: Number of log statements compressed and outputted. uint64_t logsProcessed;