~aleteoryx/muditaos

b5749912a9393e0b21cd55d31b6807f648d3b291 — Marek Niepieklo 4 years ago 08cc7e0
[CP-825] Interrupted transfer to FS endpoint may corrupt the filesystem

Using std::fstream for file I/O at every file access in FileContext.
Reworked file opening/closing to happen on each file chunk
read/write, so that a file isn’t left open in case of an error
or transfer cancelation.
Separeted RxID and TxID for reliable cancelation of stalled xfers.
M module-services/service-desktop/endpoints/filesystem/FileContext.cpp => module-services/service-desktop/endpoints/filesystem/FileContext.cpp +32 -26
@@ 4,11 4,11 @@
#include <endpoints/filesystem/FileContext.hpp>
#include <log/log.hpp>
#include <utility>
#include <fstream>

FileContext::FileContext(const std::filesystem::path &path,
                         std::size_t size,
                         std::size_t chunkSize,
                         const std::string &openMode,
                         std::size_t offset)
    : path(path), size(size), offset(offset), chunkSize(chunkSize)
{


@@ 16,28 16,18 @@ FileContext::FileContext(const std::filesystem::path &path,
        throw std::invalid_argument("Invalid FileContext arguments");
    }

    file = std::fopen(path.c_str(), openMode.c_str());
    if (!file) {
        throw std::runtime_error("File open error");
    }

    constexpr size_t streamBufferSize = 64 * 1024;
    streamBuffer                      = std::make_unique<char[]>(streamBufferSize);
    setvbuf(file, streamBuffer.get(), _IOFBF, streamBufferSize);

    runningCrc32Digest.reset();
}

FileContext::~FileContext()
{
    std::fclose(file);
}

FileReadContext::FileReadContext(const std::filesystem::path &path,
                                 std::size_t size,
                                 std::size_t chunkSize,
                                 std::size_t offset)
    : FileContext(path, size, chunkSize, "rb", offset)
    : FileContext(path, size, chunkSize, offset)
{}

FileReadContext::~FileReadContext()


@@ 48,7 38,7 @@ FileWriteContext::FileWriteContext(const std::filesystem::path &path,
                                   std::size_t chunkSize,
                                   std::string crc32Digest,
                                   std::size_t offset)
    : FileContext(path, size, chunkSize, "wb", offset), crc32Digest(std::move(crc32Digest))
    : FileContext(path, size, chunkSize, offset), crc32Digest(std::move(crc32Digest))
{}

FileWriteContext::~FileWriteContext()


@@ 93,23 83,30 @@ auto FileReadContext::read() -> std::vector<std::uint8_t>
{
    LOG_DEBUG("Getting file data");

    std::fseek(file, offset, SEEK_SET);
    std::ifstream file(path, std::ios::binary);

    if (!file.is_open() || file.fail()) {
        LOG_ERROR("File %s open error", path.c_str());
        throw std::runtime_error("File open error");
    }

    file.seekg(offset);

    auto dataLeft = std::min(static_cast<std::size_t>(chunkSize), (size - offset));

    std::vector<std::uint8_t> buffer(dataLeft);

    auto dataRead = std::fread(buffer.data(), sizeof(int8_t), dataLeft, file);
    file.read(reinterpret_cast<char *>(buffer.data()), dataLeft);

    if (dataRead != dataLeft) {
    if (file.bad()) {
        LOG_ERROR("File %s read error", path.c_str());
        throw std::runtime_error("File read error");
    }

    runningCrc32Digest.add(buffer.data(), dataRead);
    runningCrc32Digest.add(buffer.data(), dataLeft);

    LOG_DEBUG("Read %u bytes", static_cast<unsigned int>(dataRead));
    advanceFileOffset(dataRead);
    LOG_DEBUG("Read %u bytes", static_cast<unsigned int>(dataLeft));
    advanceFileOffset(dataLeft);

    if (reachedEOF()) {
        LOG_INFO("Reached EOF");


@@ 122,22 119,30 @@ auto FileWriteContext::write(const std::vector<std::uint8_t> &data) -> void
{
    LOG_DEBUG("Sending file data");

    std::fseek(file, offset, SEEK_SET);
    std::ofstream file(path, std::ios::binary | std::ios::app);

    if (!file.is_open() || file.fail()) {
        LOG_ERROR("File %s open error", path.c_str());
        throw std::runtime_error("File open error");
    }

    file.seekp(offset);

    auto dataLeft = std::min(static_cast<std::size_t>(chunkSize), (size - offset));

    auto dataWritten = std::fwrite(reinterpret_cast<const char *>(data.data()), sizeof(int8_t), dataLeft, file);
    file.write(reinterpret_cast<const char *>(data.data()), dataLeft);
    file.flush();

    if (dataWritten != dataLeft) {
    if (file.bad()) {
        LOG_ERROR("File %s write error", path.c_str());
        throw std::runtime_error("File write error");
    }

    runningCrc32Digest.add(data.data(), dataWritten);
    runningCrc32Digest.add(data.data(), dataLeft);

    LOG_DEBUG("Written %u bytes", static_cast<unsigned int>(dataWritten));
    LOG_DEBUG("Written %u bytes", static_cast<unsigned int>(dataLeft));

    advanceFileOffset(dataWritten);
    advanceFileOffset(dataLeft);

    if (reachedEOF()) {
        LOG_INFO("Reached EOF of %s", path.c_str());


@@ 152,5 157,6 @@ auto FileWriteContext::crc32Matches() const -> bool

auto FileWriteContext::removeFile() -> void
{
    std::filesystem::remove(path);
    std::error_code ec;
    std::filesystem::remove(path, ec);
}

M module-services/service-desktop/endpoints/filesystem/FileOperations.cpp => module-services/service-desktop/endpoints/filesystem/FileOperations.cpp +12 -6
@@ 14,7 14,7 @@ FileOperations &FileOperations::instance()
auto FileOperations::createReceiveIDForFile(const std::filesystem::path &file) -> std::pair<transfer_id, std::size_t>
{
    cancelTimedOutReadTransfer();
    const auto rxID = ++runningXfrId;
    const auto rxID = ++runningRxId;
    const auto size = std::filesystem::file_size(file);

    if (size == 0) {


@@ 31,11 31,11 @@ auto FileOperations::createReceiveIDForFile(const std::filesystem::path &file) -

void FileOperations::cancelTimedOutReadTransfer()
{
    if (!runningXfrId) {
    if (runningRxId == 0) {
        return;
    }

    auto timedOutXfer       = runningXfrId - 1;
    auto timedOutXfer       = runningRxId.load();
    const auto fileCtxEntry = readTransfers.find(timedOutXfer);

    if (fileCtxEntry == readTransfers.end()) {


@@ 43,17 43,19 @@ void FileOperations::cancelTimedOutReadTransfer()
        return;
    }

    fileCtxEntry->second.reset();

    LOG_DEBUG("Canceling timed out rxID %u", static_cast<unsigned>(timedOutXfer));
    readTransfers.erase(timedOutXfer);
}

void FileOperations::cancelTimedOutWriteTransfer()
{
    if (!runningXfrId) {
    if (runningTxId == 0) {
        return;
    }

    auto timedOutXfer       = runningXfrId - 1;
    auto timedOutXfer       = runningTxId.load();
    const auto fileCtxEntry = writeTransfers.find(timedOutXfer);

    if (fileCtxEntry == writeTransfers.end()) {


@@ 61,6 63,10 @@ void FileOperations::cancelTimedOutWriteTransfer()
        return;
    }

    fileCtxEntry->second->removeFile();

    fileCtxEntry->second.reset();

    LOG_DEBUG("Canceling timed out rxID %u", static_cast<unsigned>(timedOutXfer));
    writeTransfers.erase(timedOutXfer);
}


@@ 165,7 171,7 @@ auto FileOperations::createTransmitIDForFile(const std::filesystem::path &file,
                                             const std::string &Crc32) -> transfer_id
{
    cancelTimedOutWriteTransfer();
    const auto txID = ++runningXfrId;
    const auto txID = ++runningTxId;

    LOG_DEBUG("Creating txID %u", static_cast<unsigned>(txID));


M module-services/service-desktop/endpoints/include/endpoints/filesystem/FileContext.hpp => module-services/service-desktop/endpoints/include/endpoints/filesystem/FileContext.hpp +0 -3
@@ 19,7 19,6 @@ class FileContext
    explicit FileContext(const std::filesystem::path &path,
                         std::size_t size,
                         std::size_t chunkSize,
                         const std::string &openMode,
                         std::size_t offset = 0);

    virtual ~FileContext();


@@ 40,8 39,6 @@ class FileContext

  protected:
    std::filesystem::path path{};
    std::FILE *file{};
    std::unique_ptr<char[]> streamBuffer;
    std::size_t size{};
    std::size_t offset{};
    std::size_t chunkSize{};

M module-services/service-desktop/endpoints/include/endpoints/filesystem/FileOperations.hpp => module-services/service-desktop/endpoints/include/endpoints/filesystem/FileOperations.hpp +2 -1
@@ 23,7 23,8 @@ class FileOperations
    std::map<transfer_id, std::unique_ptr<FileReadContext>> readTransfers;
    std::map<transfer_id, std::unique_ptr<FileWriteContext>> writeTransfers;

    std::atomic<transfer_id> runningXfrId{0};
    std::atomic<transfer_id> runningRxId{0};
    std::atomic<transfer_id> runningTxId{0};

    auto createFileReadContextFor(const std::filesystem::path &file, std::size_t fileSize, transfer_id xfrId) -> void;


M module-utils/log/Logger.cpp => module-utils/log/Logger.cpp +7 -1
@@ 136,7 136,13 @@ namespace Log
    /// @return:   1 - log flush successflul
    auto Logger::dumpToFile(std::filesystem::path logPath) -> int
    {
        auto firstDump = !std::filesystem::exists(logPath);
        std::error_code errorCode;
        auto firstDump = !std::filesystem::exists(logPath, errorCode);
        if (errorCode) {
            LOG_ERROR("Failed to check if file %s exists, error: %d", logPath.c_str(), errorCode.value());
            return -EIO;
        }

        if (const bool maxSizeExceeded = !firstDump && std::filesystem::file_size(logPath) > maxFileSize;
            maxSizeExceeded) {
            LOG_DEBUG("Max log file size exceeded. Rotating log files...");