c++ - 接收音频 block 的异步 TCP 服务器出现访问冲突错误

标签 c++ boost boost-asio mutex sfml

我开始了一个语音聊天服务器项目,使用 boost::asio 和 SMFL 音频库。我认为我的互斥锁逻辑不正确,因为我收到此错误:

Unhandled exception at 0x00B7871D in TCPAudioReceiver.exe: 0xC0000005: Access violation reading location 0x00000000.

调试器在主程序中的 if(mServer.samplesAvailable()) 行停止,我还注意到 if(mSamplesQueue.front().size() > 256) return true; 服务器中的行读取大小为 {???}。我认为锁定服务器的互斥体就足以防止此类问题,但我认为我离好的设计还有一两个概念。

我在这一点上没有特别的原因使用了 recursive_mutex,这更多是为了在我的故障排除中走得更远。

你建议我如何解决这个问题?

主要

#include "TcpAsyncServer.h"
#include "TcpAsyncSession.h"
#include "AudioStream.h"
#include <thread>

int main()
{
  io_service ioService;
  deque<vector<Int16>> mSamplesQueue;
  SoundBuffer mSoundBuffer;
  AudioStream mAudioStream;

  cout << "Starting server..." << endl;
  Server mServer(ioService, PORT);
  thread serviceThread([&](){ioService.run();}); 

  cout << "Starting reception loop..." << endl;
  for(;;) { 
    {
      lock_guard<recursive_mutex> lock(mServer.mServerMutex);
      // Look for new samples
      if(mServer.samplesAvailable()) {            // <-- debugger stops here
        cout << "Samples available..." << endl;
        vector<Int16> wSamples;
        mServer.getNextSamples(wSamples);
        mSamplesQueue.push_back(wSamples);
      }
    }

    // Loading and playing samples
    if((mAudioStream.getStatus() != AudioStream::Playing) && !mSamplesQueue.empty()) {
      cout << "Loading and playing audio stream..." << endl;
      if(mSoundBuffer.loadFromSamples(
        reinterpret_cast<Int16*>(mSamplesQueue.front().data()), 
        mSamplesQueue.front().size(), 2, 48000)
      )
      {
        cout << "SoundBuffer loaded successfully..." << endl;
        mAudioStream.load(mSoundBuffer);
        mAudioStream.play();
        mSamplesQueue.pop_front();
      }
      else cout << "SoundBuffer failed to load..." << endl;
    }

    // Give it some room to play the sound
    while (mAudioStream.getStatus() == AudioStream::Playing) {
      sleep(milliseconds(50));
    }
  }
  serviceThread.join();
}

服务器头

#include <mutex>
#include <deque>
#include <vector>
#include <iostream>
#include <boost/asio.hpp>
#include <SFML/Audio.hpp>

using namespace sf;
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace std;

#define PORT 2112
#define SAMPLE_BUFFER 512

class Server
{

public:
  // Ctor
  Server(io_service & iService, short iPort);

  // Methods
  bool samplesAvailable();
  void getNextSamples(vector<Int16> oSamples);
  void pushSamples(vector<Int16> iSamples);

  // Variables
  recursive_mutex mServerMutex;

private:

  // Methods
  void doAccept();

  // Variables
  tcp::acceptor mAcceptor;
  tcp::socket   mSocket;
  deque<vector<Int16>> mSamplesQueue;
};

服务器类

#include "TcpAsyncServer.h"
#include "TcpAsyncSession.h"

Server::Server(io_service & service, short port)
  : mAcceptor(service, tcp::endpoint(tcp::v4(), port)),
    mSocket(service) {
  doAccept();
}

void Server::doAccept()
{
  mAcceptor.async_accept(mSocket,
    [this]
    (boost::system::error_code error) {
      if(!error) make_shared<Session>(move(mSocket),this)->start();
      doAccept();
    }
  );
}

bool Server::samplesAvailable() {
  lock_guard<recursive_mutex> lock(mServerMutex);
  if(mSamplesQueue.front().size() > 256) return true; // <-- mSamplesQueue.front() has undefined size
  return false;
}

void Server::getNextSamples(vector<Int16> oSamples) {
  lock_guard<recursive_mutex> lock(mServerMutex);
  oSamples = mSamplesQueue.front();   
  mSamplesQueue.pop_front();
}

void Server::pushSamples(vector<Int16> iSamples) {
  lock_guard<recursive_mutex> lock(mServerMutex);
  mSamplesQueue.push_back(iSamples);
}

session 标题

#include <iostream>
#include <mutex>
#include <deque>
#include <vector>
#include <boost/asio.hpp>
#include <SFML/Audio.hpp>

using namespace std;
using namespace sf;
using namespace boost::asio;
using namespace boost::asio::ip;

#define BUFFER_SIZE 1024

class Server;

class Session : public enable_shared_from_this<Session>
{

public:
  // Ctor
  Session(tcp::socket & iSocket, Server* iServerPtr);

  // Methods
  void start();

private:
  // Methods
  void doRead();

  // Variables
  tcp::socket   mSocket;
  char          mData[BUFFER_SIZE];
  Server*       mServerPtr; 
  deque<vector<Int16>> mSampleBufferQueue;
};

session 类

#include "TcpAsyncSession.h"
#include "TcpAsyncServer.h"

Session::Session(tcp::socket & iSocket, Server* iServerPtr)
  : mSocket(move(iSocket)),
    mServerPtr(iServerPtr) 
{}

void Session::start() {
  doRead();
}

void Session::doRead() {
  shared_ptr<Session> self(shared_from_this());

  mSocket.async_read_some(buffer(mData,BUFFER_SIZE),
    [this,self]
    (boost::system::error_code error, size_t iBytesReceived) {
      if(!error) {
        cout << "Receiving " << iBytesReceived << " bytes..." << endl; 
        vector<Int16> wSamples;

        for(unsigned int i = 0; i < iBytesReceived; i+=2) {
          wSamples.push_back(static_cast<Int16>( mData[i]<<8 | mData[i] ));
        }

        {
          lock_guard<recursive_mutex> lock(mServerPtr->mServerMutex);
          mServerPtr->pushSamples(move(wSamples));
        }

        doRead();
      }
    }
  );
}

最佳答案

哇,好长。

你最明显的错误在这里:

if (mSamplesQueue.front().size() > 256)
    return true; // <-- mSamplesQueue.front() has undefined size

您应该检查mSamplesQueue.empty() 是否为真,否则使用front()UB .

代替

bool Server::samplesAvailable() {
    std::lock_guard<std::recursive_mutex> lock(mServerMutex);

    if (mSamplesQueue.front().size() > 256)
        return true;
    return false;
}

你可以简单地写

bool Server::samplesAvailable() {
    std::lock_guard<std::recursive_mutex> lock(mServerMutex);
    return mSamplesQueue.size() && mSamplesQueue.front().size()>256;
}

审阅:

  • 有UB在

    wSamples.push_back(static_cast<Int16>(mData[i] << 8 | mData[i]));
    

    你需要括号,并且在许多编译器上 char 是有符号的,这导致 UB左移。

  • 永远不要在头文件中使用 using 指令。 特别是不是using namespace std; ( Why is "using namespace std" considered bad practice? )

  • 有很多不必要的缓冲区复制,为什么不在具有适当字节序的平台上重新解释转换 char[] 缓冲区,和/或为什么不 std::在 pushSamples() 中移动:

    mSamplesQueue.push_back(std::move(iSamples));
    
  • 直接使用类成员是一种代码味道 (Law of Demeter)。特别是,这是您在编写时混合抽象级别的明确标志:

    {
        std::lock_guard<std::recursive_mutex> lock(mServerPtr->mServerMutex);
        mServerPtr->pushSamples(std::move(wSamples));
    }
    

    特别是因为你已经拥有

    void Server::pushSamples(std::vector<sf::Int16> iSamples) {
        std::lock_guard<std::recursive_mutex> lock(mServerMutex);
        mSamplesQueue.push_back(std::move(iSamples));
    }
    

    这意味着此时您需要一个递归互斥量,否则就会出现死锁。

  • Session::mSampleBufferQueue 未使用

  • getNextSamples 的签名错误。您永远不会看到任何效果,因为参数是按值传递的。要么声明它:

    void getNextSamples(std::vector<sf::Int16>& oSamples);
    

    std::vector<sf::Int16> getNextSamples();
    
  • 考虑使 samplesAvailable() 等方法成为常量。为此,您需要将互斥体标记为 mutable ( Always declare std::mutex as mutable in C++11? )

概念问题

从概念上讲,接受并发客户端存在问题。如果真的发生这种情况,您将最终播放随机交错的流。

修正版

  • main.cpp

    //#include "AudioStream.h"
    #include "TcpAsyncServer.h"
    #include "TcpAsyncSession.h"
    #include <thread>
    
    struct AudioStream {
        enum Status { Playing, Buffering };
        Status getStatus() const { return Buffering; }
        void load(sf::SoundBuffer const& buf) {
            // 
            std::cout << __PRETTY_FUNCTION__ 
                << " rate=" << buf.getSampleRate() 
                << " channels=" << buf.getChannelCount() 
                << " duration=" << buf.getDuration().asSeconds() << "s "
                << " samples=" << buf.getSampleCount()
                << "\n";
        }
        void play() {}
    };
    
    int main() {
        boost::asio::io_service ioService;
        std::deque<std::vector<sf::Int16> > mSamplesQueue;
        sf::SoundBuffer mSoundBuffer;
        AudioStream mAudioStream;
    
        std::cout << "Starting server..." << std::endl;
        Server mServer(ioService, PORT); // start async accept loop
        std::thread serviceThread([&]() { ioService.run(); });
    
        std::cout << "Starting reception loop..." << std::endl;
        for (;;) {
            {
                std::lock_guard<std::recursive_mutex> lock(mServer.mServerMutex);
    
                // Look for new samples
                if (mServer.samplesAvailable()) {
                    std::cout << "Samples available..." << std::endl;
                    std::vector<sf::Int16> wSamples;
                    mServer.getNextSamples(wSamples);
                    mSamplesQueue.push_back(wSamples);
                }
            }
    
            // Loading and playing samples
            if ((mAudioStream.getStatus() != AudioStream::Playing) && !mSamplesQueue.empty()) {
                std::cout << "Loading and playing audio stream..." << std::endl;
                if (mSoundBuffer.loadFromSamples(reinterpret_cast<sf::Int16 *>(mSamplesQueue.front().data()),
                                                 mSamplesQueue.front().size(), 2, 48000)) {
                    std::cout << "SoundBuffer loaded successfully..." << std::endl;
                    mAudioStream.load(mSoundBuffer);
                    mAudioStream.play();
                    mSamplesQueue.pop_front();
                } else
                    std::cout << "SoundBuffer failed to load..." << std::endl;
            }
    
            // Give it some room to play the sound
            while (mAudioStream.getStatus() == AudioStream::Playing) {
                sleep(sf::milliseconds(50));
            }
        }
        serviceThread.join();
    }
    
  • TcpAsyncServer.h

    #include <SFML/Audio.hpp>
    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <mutex>
    #include <vector>
    
    #define PORT 2113
    #define SAMPLE_BUFFER 5120000
    
    class Server {
        using tcp = boost::asio::ip::tcp;
    
      public:
        // Ctor
        Server(boost::asio::io_service &iService, short iPort);
    
        // Methods
        bool samplesAvailable() const;
        void getNextSamples(std::vector<sf::Int16>& oSamples);
        void pushSamples(std::vector<sf::Int16> iSamples);
    
        // Variables
        mutable std::recursive_mutex mServerMutex;
    
      private:
        // Methods
        void doAccept();
    
        // Variables
        tcp::acceptor mAcceptor;
        tcp::socket mSocket;
        std::deque<std::vector<sf::Int16> > mSamplesQueue;
    };
    
  • TcpAsyncSession.h

    #include <boost/asio.hpp>
    
    #define BUFFER_SIZE 10240000
    
    class Server;
    
    class Session : public std::enable_shared_from_this<Session> {
        using tcp = boost::asio::ip::tcp;
    
      public:
        // Ctor
        Session(tcp::socket &&iSocket, Server *iServerPtr);
    
        // Methods
        void start();
    
      private:
        // Methods
        void doRead();
    
        // Variables
        tcp::socket mSocket;
        uint8_t mData[BUFFER_SIZE];
        Server *mServerPtr;
    };
    
  • TcpAsyncServer.cpp

    #include "TcpAsyncServer.h"
    #include "TcpAsyncSession.h"
    
    Server::Server(boost::asio::io_service &service, short port) : mAcceptor(service, tcp::endpoint(tcp::v4(), port)), mSocket(service) {
        mAcceptor.set_option(tcp::acceptor::reuse_address());
        doAccept();
    }
    
    void Server::doAccept() {
        mAcceptor.async_accept(mSocket, [this](boost::system::error_code error) {
            if (!error)
                std::make_shared<Session>(std::move(mSocket), this)->start();
            doAccept();
        });
    }
    
    bool Server::samplesAvailable() const {
        std::lock_guard<std::recursive_mutex> lock(mServerMutex);
        return mSamplesQueue.size() && mSamplesQueue.front().size()>256;
    }
    
    void Server::getNextSamples(std::vector<sf::Int16>& oSamples) {
        std::lock_guard<std::recursive_mutex> lock(mServerMutex);
        oSamples = std::move(mSamplesQueue.front());
        mSamplesQueue.pop_front();
    }
    
    void Server::pushSamples(std::vector<sf::Int16> iSamples) {
        std::lock_guard<std::recursive_mutex> lock(mServerMutex);
        mSamplesQueue.push_back(std::move(iSamples));
    }
    
  • TcpAsyncSession.cpp

    #include "TcpAsyncServer.h"
    #include "TcpAsyncSession.h"
    
    //#include <SFML/Audio.hpp>
    #include <iostream>
    
    Session::Session(tcp::socket &&iSocket, Server *iServerPtr) : mSocket(std::move(iSocket)), mServerPtr(iServerPtr) {}
    
    void Session::start() { doRead(); }
    
    void Session::doRead() {
    
        std::shared_ptr<Session> self(shared_from_this());
    
        mSocket.async_read_some(
                boost::asio::buffer(mData, BUFFER_SIZE),
                [this, self](boost::system::error_code error, size_t iBytesReceived) {
                    if (error)
                        return;
                    std::cout << "Receiving " << iBytesReceived << " bytes..." << std::endl;
                    std::vector<sf::Int16> wSamples;
    
                    for (unsigned int i = 0; i < iBytesReceived; i += 2) {
                        wSamples.push_back(static_cast<sf::Int16>((mData[i] << 8) | mData[i]));
                    }
    
                    mServerPtr->pushSamples(std::move(wSamples));
    
                    doRead();
            });
    }
    

演示运行

给它一个 138M 的 mp3:

Starting server...
Starting reception loop...
Receiving 8192 bytes...
Samples available...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=0.042666s  samples=4096
Receiving 562829 bytes...
Samples available...
Receiving 745525 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=2.93141s  samples=281415
Samples available...
Loading and playing audio stream...
Receiving 2815769 bytes...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=3.88295s  samples=372763
Samples available...
Receiving 4978211 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=14.6655s  samples=1407885
Samples available...
Receiving 5632954 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=25.9282s  samples=2489106
Samples available...
Receiving 5893470 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=29.3383s  samples=2816477
Samples available...
Receiving 5895401 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.6952s  samples=2946735
Samples available...
Receiving 5894091 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.7052s  samples=2947701
Samples available...
Receiving 5894197 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.6984s  samples=2947046
Samples available...
Receiving 5894303 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.6989s  samples=2947099
Samples available...
Receiving 5894144 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.6995s  samples=2947152
Samples available...
Receiving 5896192 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.6987s  samples=2947072
Samples available...
Receiving 5961675 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=30.7093s  samples=2948096
Samples available...
Receiving 5961728 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0504s  samples=2980838
Samples available...
Receiving 5960615 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0507s  samples=2980864
Samples available...
Receiving 5960793 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0449s  samples=2980308
Samples available...
Receiving 5960668 bytes...
Loading and playing audio stream...
An internal OpenAL call failed in SoundBuffer.cpp(265).
Expression:
   alBufferData(m_buffer, format, &m_samples[0], size, sampleRate)
Error description:
   AL_INVALID_VALUE
   A numeric argument is out of range.

SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0458s  samples=2980397
Samples available...
Receiving 5960740 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0451s  samples=2980334
Samples available...
Receiving 5960668 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0455s  samples=2980370
Samples available...
Receiving 5960740 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0451s  samples=2980334
Samples available...
Receiving 5960668 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0455s  samples=2980370
Samples available...
Receiving 5960740 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0451s  samples=2980334
Samples available...
Receiving 5960668 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0455s  samples=2980370
Samples available...
Receiving 5960740 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0451s  samples=2980334
Samples available...
Receiving 5960668 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0455s  samples=2980370
Samples available...
Receiving 5960740 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0451s  samples=2980334
Samples available...
Receiving 4770135 bytes...
Loading and playing audio stream...
SoundBuffer loaded successfully...
void AudioStream::load(const sf::SoundBuffer &) rate=48000 channels=2 duration=31.0455s  samples=2980370
Samples available...
Loading and playing audio stream...
SoundBuffer loaded successfully...

除了我的模拟 AudioStream 没有正确初始化音频库这一事实外,我觉得还不错。

请注意,我大大增加了缓冲区大小,因此日志不会太大。

关于c++ - 接收音频 block 的异步 TCP 服务器出现访问冲突错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49016907/

相关文章:

c++ - Linux 上第一对读写后串口挂起

c++ - xcode 中的 boost 错误

c++ - 将 boost::lockfree 与 c++11 线程支持库一起使用是否安全?

c++ - Boost 和 Python 3.x

c++ - 将 Boost strand 与 std::future 结合使用

c++ - 在 QTreeWidget 中显示文件系统

c++ - 无法在 VS2017 中包含 rapidjson 库

使用 Gibbs 采样器的 GMM 的 C++ 实现,即 Dirichlet 过程高斯混合模型

c++ - TFTP 源代码示例

c++ - boost::asio::serial_port 设置 RTS DTS