// Copyright (c) 2017-2024, Mudita Sp. z.o.o. All rights reserved. // For licensing, see https://github.com/mudita/MuditaOS/blob/master/LICENSE.md #include "Stream.hpp" #include #include using namespace audio; Stream::Stream(AudioFormat format, Allocator &allocator, std::size_t blockSize, unsigned int bufferingSize) : _allocator(allocator), _blockSize(blockSize), _blockCount(bufferingSize), _format(format), _buffer(_allocator.allocate(_blockSize * _blockCount)), _emptyBuffer(_allocator.allocate(_blockSize)), _dataStart(_buffer.get(), _blockSize * _blockCount, _buffer.get(), _blockSize), _dataEnd(_dataStart), _peekPosition(_dataStart), _writeReservationPosition(_dataStart) { std::fill(_emptyBuffer.get(), _emptyBuffer.get() + blockSize, 0); } bool Stream::push(void *data, std::size_t dataSize) { /// wrapper - no synchronization needed return push(Span{.data = static_cast(data), .dataSize = dataSize}); } bool Stream::push(const Span &span) { LockGuard lock; /// sanity - do not store buffers different than internal block size if (span.dataSize != _blockSize) { return false; } /// write reservation in progress if (_dataEnd != _writeReservationPosition) { return false; } /// no space left if (isFull()) { broadcastEvent(Event::StreamOverflow); return false; } auto nextDataBlock = *_dataEnd; std::copy(span.data, span.dataEnd(), nextDataBlock.data); _dataEnd++; _blocksUsed++; _writeReservationPosition = _dataEnd; broadcastStateEvents(); return true; } bool Stream::push() { /// wrapper - no synchronization needed return push(getNullSpan()); } bool Stream::pop(Span &span) { LockGuard lock; /// sanity - do not store buffers different than internal block size if (span.dataSize != _blockSize) { return false; } /// peek in progress if (_dataStart != _peekPosition) { return false; } if (isEmpty()) { span = getNullSpan(); broadcastEvent(Event::StreamUnderFlow); return false; } std::copy((*_dataStart).data, (*_dataStart).dataEnd(), span.data); _dataStart++; _blocksUsed--; _peekPosition = _dataStart; broadcastStateEvents(); return true; } void Stream::consume() { LockGuard lock; _blocksUsed -= _peekCount; _peekCount = 0; _dataStart = _peekPosition; broadcastStateEvents(); } bool Stream::peek(Span &span) { LockGuard lock; if (getPeekedCount() < getUsedBlockCount()) { span = *_peekPosition++; _peekCount++; return true; } span = getNullSpan(); broadcastEvent(Event::StreamUnderFlow); return false; } void Stream::unpeek() { LockGuard lock; _peekPosition = _dataStart; _peekCount = 0; } bool Stream::reserve(Span &span) { LockGuard lock; if (getBlockCount() - getUsedBlockCount() > _reserveCount) { span = *++_writeReservationPosition; _reserveCount++; return true; } // reset data to peek end _blocksUsed = 0; _dataEnd = _peekPosition; // reserve at peek end _reserveCount = 1; _writeReservationPosition = _peekPosition; span = *++_writeReservationPosition; broadcastEvent(Event::StreamOverflow); return false; } void Stream::commit() { LockGuard lock; _blocksUsed += _reserveCount; _reserveCount = 0; _dataEnd = _writeReservationPosition; broadcastStateEvents(); } void Stream::release() { LockGuard lock; _reserveCount = 0; _writeReservationPosition = _dataEnd; } auto Stream::getInputTraits() const noexcept -> Traits { LockGuard lock; return getIOTraits(); } auto Stream::getOutputTraits() const noexcept -> Traits { LockGuard lock; return getIOTraits(); } auto Stream::getIOTraits() const noexcept -> Traits { return Traits{.blockSize = _blockSize, .format = _format}; } void Stream::registerListener(AbstractStream::EventListener *listener) { LockGuard lock; listeners.push_back(std::ref(listener)); } void Stream::unregisterListeners(AbstractStream::EventListener *listener) { LockGuard lock; auto it = std::find(listeners.begin(), listeners.end(), listener); if (it != listeners.end()) { listeners.erase(it); } } void Stream::broadcastEvent(Event event) { for (auto listener : listeners) { listener->onEvent(this, event); } } void Stream::broadcastStateEvents() { if (_blocksUsed == (getBlockCount() / 2)) { broadcastEvent(Event::StreamHalfUsed); } else if (isEmpty()) { broadcastEvent(Event::StreamEmpty); } else if (isFull()) { broadcastEvent(Event::StreamFull); } } std::size_t Stream::getBlockCount() const noexcept { return _blockCount; } std::size_t Stream::getUsedBlockCount() const noexcept { return _blocksUsed; } std::size_t Stream::getPeekedCount() const noexcept { return _peekCount; } std::size_t Stream::getReservedCount() const noexcept { return _reserveCount; } bool Stream::isEmpty() const noexcept { LockGuard lock; return getUsedBlockCount() == 0; } bool Stream::isFull() const noexcept { LockGuard lock; return getUsedBlockCount() == getBlockCount(); } bool Stream::blocksAvailable() const noexcept { return !isFull(); } void Stream::reset() { LockGuard lock; _dataStart = {_buffer.get(), _blockSize * _blockCount, _buffer.get(), _blockSize}; _dataEnd = _dataStart; _peekPosition = _dataStart; _writeReservationPosition = _dataStart; std::fill(_emptyBuffer.get(), _emptyBuffer.get() + _blockSize, 0); _blocksUsed = 0; _peekCount = 0; _reserveCount = 0; } Stream::UniqueStreamBuffer StandardStreamAllocator::allocate(std::size_t size) { return std::make_unique(size); } Stream::UniqueStreamBuffer NonCacheableStreamAllocator::allocate(std::size_t size) { return std::unique_ptr>( allocator.allocate(size), [this, size](std::uint8_t ptr[]) { allocator.deallocate(ptr, size); }); } Stream::RawBlockIterator::RawBlockIterator(std::uint8_t *bufStart, std::size_t bufSize, std::uint8_t *ptr, std::size_t stepSize) : _bufStart(bufStart), _bufEnd(bufStart + bufSize), _curPos(ptr), _stepSize(stepSize) {} Stream::RawBlockIterator &Stream::RawBlockIterator::operator++() { _curPos += _stepSize; if (_curPos == _bufEnd) { _curPos = _bufStart; } return *this; } bool Stream::RawBlockIterator::operator==(const Stream::RawBlockIterator &rhs) { return _curPos == rhs._curPos; } bool Stream::RawBlockIterator::operator!=(const Stream::RawBlockIterator &rhs) { return !operator==(rhs); } Stream::RawBlockIterator Stream::RawBlockIterator::operator++(int) { RawBlockIterator tmp(*this); operator++(); return tmp; } Stream::RawBlockIterator &Stream::RawBlockIterator::operator--() { if (_curPos == _bufStart) { _curPos = _bufEnd - _stepSize; } else { _curPos -= _stepSize; } return *this; } Stream::RawBlockIterator Stream::RawBlockIterator::operator--(int) { RawBlockIterator tmp(*this); operator--(); return tmp; } Stream::Span Stream::RawBlockIterator::operator*() { return Stream::Span{.data = _curPos, .dataSize = _stepSize}; } Stream::Span Stream::getNullSpan() const noexcept { return Span{.data = _emptyBuffer.get(), .dataSize = _blockSize}; }