~aleteoryx/muditaos

c11867772aec26b84c71cb8d47250cfd36bd1e1c — Mateusz Piesta 3 years ago 1dcd285
[BH-1457] Simulator audio

Switched to the more efficient way of
spawning pulse audio context/stream.
This change fixed rare issues reported by
address sanitizer and accidental freezes.
M module-audio/board/linux/LinuxAudioDevice.cpp => module-audio/board/linux/LinuxAudioDevice.cpp +30 -19
@@ 5,16 5,29 @@
#include <Audio/Stream.hpp>
#include <log/log.hpp>
#include <cmath>
#include <stdexcept>

namespace
{
    /// Creating and destroying context each time LinuxAudioDevice is spawned seems very unstable. It causes various
    /// problems, from raising sanitizer errors to completely freezing the simulator. As a solution, another approach
    /// has been chosen. Instead of recreating the pulse audio context, we rely on static instance of it, which is valid
    /// throughout the life of the simulator process. During the standard operation, we only create and attach audio
    /// streams, which have proven to be a fast and reliable way of doing things. All the necessary context cleaning is
    /// handled in the context's destructor during the simulator process close.
    static audio::pulse_audio::Context ctx;
    audio::pulse_audio::Context &get_context()
    {
        return ctx;
    }
} // namespace

namespace audio
{
    LinuxAudioDevice::LinuxAudioDevice(const float initialVolume)
        : supportedFormats(
              audio::AudioFormat::makeMatrix(supportedSampleRates, supportedBitWidths, supportedChannelModes)),
          audioProxy("audioProxy", [this](const auto &data) {
              requestedBytes = data;
              onDataSend();
          })
          audioProxy("audioProxy", [this](const auto &data) { onDataSend(); })
    {
        setOutputVolume(initialVolume);
    }


@@ 69,11 82,11 @@ namespace audio
        }
        Sink::_stream->peek(dataSpan);
        scaleVolume(dataSpan);
        pulseAudioWrapper->insert(dataSpan);
        stream->insert(dataSpan);
        Sink::_stream->consume();

        if (pulseAudioWrapper->bytes() >= requestedBytes) {
            pulseAudioWrapper->consume();
        if (stream->bytes() >= requestedBytes) {
            stream->consume();
        }
        else {
            audioProxy.post(requestedBytes);


@@ 88,15 101,17 @@ namespace audio

    void LinuxAudioDevice::enableOutput()
    {
        if (!isSinkConnected()) {
            LOG_ERROR("Output stream is not connected!");
            return;
        }

        currentFormat = Sink::_stream->getOutputTraits().format;

        pulseAudioWrapper = std::make_unique<PulseAudioWrapper>(
            [this](const std::size_t size) { audioProxy.post(size); }, currentFormat);
        stream = get_context().open_stream(currentFormat, [this](const std::size_t size) {
            requestedBytes = size;
            audioProxy.post(requestedBytes);
        });

        if (stream == nullptr) {
            fprintf(stderr, "Failed to open the audio stream");
            std::abort();
        }
    }

    void LinuxAudioDevice::disableInput()


@@ 104,11 119,7 @@ namespace audio

    void LinuxAudioDevice::disableOutput()
    {
        if (!isSinkConnected()) {
            LOG_ERROR("Error while stopping Linux Audio Device! Null stream.");
            return;
        }
        close         = true;
        get_context().close_stream();
        currentFormat = {};
    }
    void LinuxAudioDevice::scaleVolume(audio::AbstractStream::Span data)

M module-audio/board/linux/LinuxAudioDevice.hpp => module-audio/board/linux/LinuxAudioDevice.hpp +2 -3
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once


@@ 57,10 57,9 @@ namespace audio

        float volumeFactor = 1.0f;

        std::unique_ptr<PulseAudioWrapper> pulseAudioWrapper;
        AudioProxy audioProxy;
        pulse_audio::Stream *stream{nullptr};
        std::size_t requestedBytes{};
        bool close{false};
    };

} // namespace audio

M module-audio/board/linux/PulseAudioWrapper.cpp => module-audio/board/linux/PulseAudioWrapper.cpp +81 -57
@@ 5,33 5,78 @@
#include <pthread.h>
#include <iostream>
#include <utility>
#include <stdexcept>

namespace audio
namespace audio::pulse_audio
{
    PulseAudioWrapper::PulseAudioWrapper(WriteCallback write_cb, AudioFormat audio_format)
        : write_cb{std::move(write_cb)}, audio_format(audio_format)
    Stream::Stream(AudioFormat audio_format_, pa_context *ctx) : audio_format{audio_format_}
    {
        /// PulseAudio requests at least SampleRate bytes of data to start playback
        /// Due to how module-audio's streams operate we can load a bit more data to the cache than needed.
        /// Doubling the buffer size is cheap and simple solution.
        cache.reserve(audio_format.getSampleRate() * 2);

        pa_sample_spec sample_spec;
        sample_spec.channels = audio_format.getChannels();
        sample_spec.rate     = audio_format.getSampleRate();
        sample_spec.format   = PA_SAMPLE_S16LE;

        if ((stream = pa_stream_new(ctx, "PureOSStream", &sample_spec, nullptr)) == nullptr) {
            throw std::runtime_error(pa_strerror(pa_context_errno(ctx)));
        }
    }

    Stream::~Stream()
    {
        pa_stream_disconnect(stream);
        pa_stream_unref(stream);
    }

    void Stream::insert(AbstractStream::Span span)
    {
        std::copy(span.data, span.dataEnd(), &cache[cache_pos]);
        cache_pos += span.dataSize;
    }

    void Stream::consume()
    {
        if (pa_stream_write(stream, cache.data(), cache_pos, NULL, 0, PA_SEEK_RELATIVE) < 0) {
            fprintf(stderr, "pa_stream_write() failed\n");
            return;
        }
        cache_pos = 0;
    }

    std::size_t Stream::bytes() const
    {
        return cache_pos;
    }

    pa_stream *Stream::raw()
    {
        return stream;
    }

    Context::Context()
    {
        pthread_create(
            &tid,
            nullptr,
            [](void *arg) -> void * {
                /// Mask all available signals to not interfere with the FreeRTOS simulator port
                /// Any 'external' threads that are not part of the FreeRTOS simulator port must be treated as a
                /// interrupt source and have to have all the signals masked in order to not 'steal' work from the
                /// threads managed by the port.
                sigset_t set;
                sigfillset(&set);
                pthread_sigmask(SIG_SETMASK, &set, NULL);

                auto *inst = static_cast<PulseAudioWrapper *>(arg);
                auto *inst = static_cast<Context *>(arg);
                return inst->worker();
            },
            this);
    }

    void *PulseAudioWrapper::worker()
    void *Context::worker()
    {
        mainloop     = pa_mainloop_new();
        mainloop_api = pa_mainloop_get_api(mainloop);


@@ 39,7 84,7 @@ namespace audio
        pa_context_set_state_callback(
            context,
            [](pa_context *, void *arg) {
                auto *inst = static_cast<PulseAudioWrapper *>(arg);
                auto *inst = static_cast<Context *>(arg);
                inst->context_state_callback();
            },
            this);


@@ 54,7 99,7 @@ namespace audio
        return nullptr;
    }

    void PulseAudioWrapper::context_state_callback()
    void Context::context_state_callback()
    {
        assert(context);



@@ 62,36 107,9 @@ namespace audio
        case PA_CONTEXT_CONNECTING:
        case PA_CONTEXT_AUTHORIZING:
        case PA_CONTEXT_SETTING_NAME:
        case PA_CONTEXT_READY:
            break;

        case PA_CONTEXT_READY: {
            int r;
            pa_sample_spec sample_spec;
            sample_spec.channels = audio_format.getChannels();
            sample_spec.rate     = audio_format.getSampleRate();
            sample_spec.format   = PA_SAMPLE_S16LE;

            if ((stream = pa_stream_new(context, "PureOSStream", &sample_spec, nullptr)) == nullptr) {
                fprintf(stderr, "pa_stream_new() failed: %s\n", pa_strerror(pa_context_errno(context)));
                return;
            }

            pa_stream_set_write_callback(
                stream,
                [](pa_stream *, size_t length, void *arg) {
                    auto *inst = static_cast<PulseAudioWrapper *>(arg);
                    inst->stream_write_cb(length);
                },
                this);

            if ((r = pa_stream_connect_playback(stream, nullptr, nullptr, {}, nullptr, nullptr)) < 0) {
                fprintf(stderr, "pa_stream_connect_playback() failed: %s\n", pa_strerror(pa_context_errno(context)));
                return;
            }

            break;
        }

        case PA_CONTEXT_TERMINATED:
            quit(0);
            fprintf(stderr, "PulseAudio connection terminated.\n");


@@ 104,20 122,16 @@ namespace audio
        }
    }

    void PulseAudioWrapper::stream_write_cb(size_t length)
    void Context::stream_write_cb(size_t length)
    {
        write_cb(length);
    }

    PulseAudioWrapper::~PulseAudioWrapper()
    Context::~Context()
    {
        quit();
        pthread_join(tid, nullptr);

        if (stream != nullptr) {
            pa_stream_unref(stream);
        }

        if (context != nullptr) {
            pa_context_unref(context);
            context = nullptr;


@@ 130,30 144,40 @@ namespace audio
        }
    }

    void PulseAudioWrapper::quit(int ret)
    void Context::quit(int ret)
    {
        if (mainloop_api != nullptr) {
            mainloop_api->quit(mainloop_api, ret);
        }
    }

    void PulseAudioWrapper::insert(audio::AbstractStream::Span span)
    Stream *Context::open_stream(AudioFormat audio_format_, WriteCallback write_cb_)
    {
        std::copy(span.data, span.dataEnd(), &cache[cache_pos]);
        cache_pos += span.dataSize;
    }
        try {
            stream   = std::make_unique<Stream>(audio_format_, context);
            write_cb = write_cb_;

    void PulseAudioWrapper::consume()
    {
        if (pa_stream_write(stream, cache.data(), cache_pos, NULL, 0, PA_SEEK_RELATIVE) < 0) {
            fprintf(stderr, "pa_stream_write() failed\n");
            return;
            pa_stream_set_write_callback(
                stream->raw(),
                [](pa_stream *, size_t length, void *arg) {
                    auto *inst = static_cast<Context *>(arg);
                    inst->stream_write_cb(length);
                },
                this);

            if (pa_stream_connect_playback(stream->raw(), nullptr, nullptr, {}, nullptr, nullptr) < 0) {
                throw std::runtime_error(pa_strerror(pa_context_errno(context)));
            }
        }
        catch (const std::runtime_error &e) {
            fprintf(stderr, e.what());
            return nullptr;
        }
        cache_pos = 0;
    }

    std::size_t PulseAudioWrapper::bytes() const
        return stream.get();
    }
    void Context::close_stream()
    {
        return cache_pos;
        stream = {};
    }
} // namespace audio
} // namespace audio::pulse_audio

M module-audio/board/linux/PulseAudioWrapper.hpp => module-audio/board/linux/PulseAudioWrapper.hpp +28 -13
@@ 1,4 1,4 @@
// Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
// Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
// For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md

#pragma once


@@ 11,20 11,39 @@
#include <functional>
#include <deque>
#include <vector>
#include <memory>

namespace audio
namespace audio::pulse_audio
{
    class PulseAudioWrapper
    class Stream
    {
      public:
        using WriteCallback = std::function<void(const std::size_t size)>;
        PulseAudioWrapper(WriteCallback write_cb, AudioFormat audio_format);
        ~PulseAudioWrapper();
        Stream(AudioFormat audio_format_, pa_context *ctx);
        ~Stream();

        void insert(audio::AbstractStream::Span span);
        void insert(AbstractStream::Span span);
        void consume();
        std::size_t bytes() const;

        pa_stream *raw();

      private:
        std::vector<std::uint8_t> cache{};
        std::uint32_t cache_pos{};
        AudioFormat audio_format;
        pa_stream *stream{nullptr};
    };

    class Context
    {
      public:
        using WriteCallback = std::function<void(const std::size_t size)>;
        Context();
        ~Context();

        Stream *open_stream(AudioFormat audio_format_, WriteCallback write_cb);
        void close_stream();

      private:
        void context_state_callback();
        void *worker();


@@ 32,16 51,12 @@ namespace audio
        void quit(int ret = 0);

        pthread_t tid{};
        pa_stream *stream{nullptr};
        pa_mainloop *mainloop{nullptr};
        pa_mainloop_api *mainloop_api{nullptr};
        pa_context *context{nullptr};
        std::unique_ptr<Stream> stream;

        WriteCallback write_cb;
        AudioFormat audio_format;

        std::vector<std::uint8_t> cache{};
        std::uint32_t cache_pos{};
    };

} // namespace audio
} // namespace audio::pulse_audio