同步了三个分离的线程

我几个星期以来一直陷入一个问题。

我有三个线程,并且已经实现了我打算使用的线程安全堆栈。

ThreadSafe堆栈实现:

#include <exception>
#include <stack>  

struct empty_stack: std::exception
{
  virtual const char* what() const throw()
  {
    return "My exception happened";
  }
};

template<typename T>
class threadsafe_stack
{

  private:
    std::stack<T> data;
    mutable std::mutex m;
  public:
    threadsafe_stack() = default;
    threadsafe_stack(const threadsafe_stack& other)
    {
      std::lock_guard<std::mutex> lock(other.m);
      data = other.data;
    }
    threadsafe_stack& operator = (const threadsafe_stack&) = delete;
    void push(T new_value)
    {
      std::lock_guard<std::mutex> lock(m);
      data.push(std::move(new_value));
    }
    std::shared_ptr<T> pop()
    {
      std::lock_guard<std::mutex> lock(m);
      if(data.empty()) throw empty_stack();
      std::shared_ptr<T> const res(
          std::make_shared<T>(std::move(data.top())));
      data.pop();
      return res;
    }
    void pop(T& value)
    {
      std::lock_guard<std::mutex> lock(m);
      if(data.empty()) throw empty_stack();
      value = std::move(data.top());
      data.pop();
    }
    bool empty() const
    {
      std::lock_guard<std::mutex> lock(m);
      return data.empty();
    }
};

现在,该线程的主要逻辑是可以在任何地方调用。 称为运行的线程,依次控制三个线程的读取,效果,写入。

运行实施:

  std::thread t1(&Filter::read,this);
  std::thread t3(&Filter::applyEffect,this); 
  std::thread t2(&Filter::write,this);
  t1.detach();
  t2.detach();
  t3.detach();
  std::this_thread::sleep_for(std::chrono::seconds(dur)); 

您可以从逻辑中看到每个线程将并行运行:

read会将值压入堆栈。 效果将从堆栈中弹出并推回堆栈中。 写操作将从堆栈中弹出。

定义是

threadsafe_stack<uint8_t*> stack;

阅读实现(如您所见,我尝试了很多方法)

uint8_t* Filter::read()
{ //TODO: Bracket please
  /**  
  *  I<nitialzing init read with a wav File first
  *  should be PMC also when testing logic is done
  */
  // the access to this function is mutually exclusive
  hdr =   genericWAVHeader(44000, 16, 2); //TODO: Indentation 2spaces.
  wrapper.initDevice(SND_PCM_STREAM_CAPTURE,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate);
  while(true)
  { 
    try 
    {
      std::unique_lock<std::mutex> lk(mutex);
      _tcond1.wait(lk,[this]{return !readDone;});
      std::cout << "Read Thread" << std::endl;
      effectDone = true;
      std::cout << "Effect Thread starting" << std::endl; 
      readDone = false;
      lk.unlock();
      _tcond2.notify_one();

      // sem[0].Wait();
      // std::cout << "Reading Thread " << std::endl;
      // sem[1].Post();
      // std::this_thread::sleep_for(std::chrono::milliseconds(2)); 
      // std::unique_lock<std::mutex> lock(mutex);
      // std::cout << "Read Thread" << std::endl;
      // _tcond2.notify_one();
      // _tcond1.wait(lock);
      // TransitionGuard guard(ready_for_task1, ready_for_task2);
      // std::cout << "Read Thread" << std::endl;
      /**
       * @brief 
       * Each time read is run a new promise with the data from wrapper.readFrames();
       * is being made and also should be locked(Why it should be locked?)
       * Locking it makes it so only read at the time has access to the promise
       * Should effect also have access to the promise
       * Yes as it should set it new the data structure after using the philips libraries
       */

    }catch(const std::exception& e)
    {
        std::cout << "[Read exception caught: " << e.what() << "]\n";
    }
    // try
    // {

    //   readyPromise.set_value(wrapper.readFrames());
    //   std::cout << "Ready Promise value set" << std::endl;
    //   std::cout << "Signal Effect to be active now" << std::endl;
    //   std::cout << "-------------------------------" << std::endl;


    // }
    // catch(const std::exception& e)
    // {
    //   std::cout << "[Read exception caught: " << e.what() << "]\n";
    // }

    // std::unique_lock<std::mutex> unique_lock(mutex);
    // data = wrapper.readFrames();
    // cv.wait(unique_lock,[&](){return isDone;});
    // isDone = !isDone;
    // std::cout << "Reading" << std::endl;
    // cv.notify_one();
    // std::unique_lock<std::mutex> unique_lock(mutex);
    // data = wrapper.readFrames();
    // cv.wait(unique_lock,[&](){return isDone;});
    // std::cout << "Reading" << std::endl;

    // isDone = !isDone;
    // cv.notify_one(); 
    // try
    // {
    //    promiseBuffer = std::promise<uint8_t*>();
    //    promiseBuffer.set_value(wrapper.readFrames());
    // }
    // catch(std::exception& e)
    // {
    //   std::cout << "[exception caught: " << e.what() << "]\n";
    // }
  }
  // hdr =   genericWAVHeader(44000, 16, 2); //TODO: Indentation 2spaces.
  // wrapper.initDevice(SND_PCM_STREAM_CAPTURE,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_ratep);
  // return wrappe9r.readFrames();

}

应用效果实施:

uint8_t* Filter::applyEffect()
{

  /**
  * @brief 
  * ApplyEffect should be done here
  * void BeClear_SuperHandsFree_Main
    (
       const float * const * const spks,
      const float * const * const mics,
      float * const * const out_com,
      float * const * const out_sr
    );
    float modified_buffer_in[480] = { };
float modified_buffer_out[480] = { };
int RNNoiseProcessBufferIn(struct audio_stream_in *stream, int16_t *in_buffer, size_t bytes)
{
  struct stream_in *in = (struct stream_in *)stream;
  size_t frames_rq = bytes / audio_stream_in_frame_size(&in->stream.common);

  for (int channel = 0; channel < STEREO; channel++)
  {
    for (int i = 0; i < frames_rq; i++)
    {
      modified_buffer_in[i] = in_buffer[STEREO * i + channel];
    }

    rnnoise_process_frame(denoised_state_in[channel], modified_buffer_in, modified_buffer_in);

    for (int i = 0; i < frames_rq; i++)
    {
      in_buffer[STEREO * i + channel] = (int16_t)modified_buffer_in[i];
    }
  }

  return 0;
}
  * 
  * 
  * 
  * 
  */
  /**
   * 
  while(true){
    // try
    // {
    //   std::unique_lock<std::mutex> unique_lock(mutex);
    //   readyForEffect = readyPromise.get_future();
    //   data = readyForEffect.get();
    //   cv.wait(unique_lock,[&](){return effectDone;});
    //   effectDone = !effectDone;
    //   cv.notify_one();

    //   /**
    //    * @brief 
    //    * Philips library goes here.
    //    */
    //   //BeClear_SuperHandsFree_Main();

    // }
    // catch(const std::exception& e)
    // {
    //     std::cout << "*************************" << std::endl;
    //     std::cout << "[Effect exception caught: " << e.what() << "]\n";

    // }
    try
    {
      std::unique_lock<std::mutex> lk(mutex);
      _tcond2.wait(lk);
      std::cout << "Read Effect Starting" << std::endl;
      lk.unlock();
      _tcond1.notify_one();
      // std::unique_lock<std::mutex> lock(mutex);
      // std::cout << "Effect Thread" << std::endl;
      // _tcond3.notify_one();
      // _tcond2.wait(lock);
      // TransitionGuard guard( ready_for_task2, ready_for_task3 );
      // std::cout << "Effect Thread" << std::endl;
      // sem[1].Wait();
      // std::cout << "Effect Thread " << std::endl;
      // sem[0].Post();
      // std::this_thread::sleep_for(std::chrono::milliseconds(2)); 
      //stack.pop();
      //stack.push();
    // {
    //   readyForEffect = std::shared_future<uint8_t*>(readyPromise.get_future());
    //   uint8_t* d = readyForEffect.get();
    //   //BeClear_SuperHandsFree_Main();
    //   readyPromise = std::promise<uint8_t*>();
    //   readyPromise.set_value(d);
    //   writePromise.set_value(d);
    //   readyPromise = std::promise<uint8_t*>();
    //   readyForEffect.wait();
    }
    catch(const std::exception& e)
    {
      std::cout << "*************************" << std::endl;
      std::cout << "[Effect exception caught: " << e.what() << "]\n";
    };


    // std::unique_lock<std::mutex> unique_lock(mutex);
    // /**
    //  * @brief 
    //  * This is where philips library is supposed to be used
    //  */
    // cv.wait(unique_lock,[&](){return !isDone;}); // || efDone;});
    // try
    // {
    //    promiseBuffer = std::promise<uint8_t*>();
    //    promiseBuffer.set_value(data);
    // }
    // catch(std::exception& e)
    // {
    //   std::cout << "[ ApplyEffect exception caught: " << e.what() << "]\n";
    // }
    // isDone = !isDone;
    // //efDone = !efDone;
    // cv.notify_one(); 
    // try
    // {
    //   effectPromise = std::promise<uint8_t*>();
    //   std::future<uint8_t*> futureBuffer = promiseBuffer.get_future();
    //   uint8_t* d = futureBuffer.get();
    //   // d -> actually the data
    //   // phillips;
    //   effectPromise.set_value(d);
    // }
    // catch(std::exception& e)
    // {
    //   std::cout << "[exception caught: " << e.what() << "]\n";
    // }

  }

编写实现:

bool Filter:: write()
{ 
  /**
   * @brief 
   * 
   *  Should write
   */
  #ifndef TESTING
    hdr = genericWAVHeader(44000, 16, 2);
    wrapper.initWrite(SND_PCM_STREAM_PLAYBACK,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate);
    while(true)
    {
      try
      {
        /* code */
        /**
         * @brief 
         * Signal the other threads to start
         * How
         * 
         */
        // std::unique_lock<std::mutex> lock(mutex);
        // std::cout << "Write Thread" << std::endl;
        // _tcond1.notify_one();
        // _tcond3.wait(lock);
        TransitionGuard guard( ready_for_task3, ready_for_task1 );
        std::cout << "Effect Thread" << std::endl;
        //sem[2].Wait(true);
        //std::cout << "Writing Thread" << std::endl;
        //sem[0].Post();
        //std::this_thread::sleep_for(std::chrono::milliseconds(2)); 
        // std::unique_lock<std::mutex> locker(_lockprint3);
        // _tcond3.wait(locker);
        // std::cout << "Writing thread" << std::endl;
        // _tcond1.notify_one();
        //readyForRead = std::shared_future<uint8_t*>(readyPromise.get_future());
        // readyForWrite = std::shared_future<uint8_t*>(writePromise.get_future());
        // readyForWrite.wait();
        // uint8_t* p = readyForWrite.get();
        // wrapper.writeFrames(p);
        // writePromise = std::promise<uint8_t*>();

      }
      catch(const std::exception& e)
      {
         std::cout << "[Write exception caught: " << e.what() << "]\n";
      }

      // std::unique_lock<std::mutex> unique_lock(mutex);
      // std::cout << "Writing data" << std::endl;
      // cv.wait(unique_lock,[&](){return !effectDone;});
      // wrapper.writeFrames(data);
      // effectDone = !effectDone;
      // cv.notify_one();
      // // //std::unique_lock<std::mutex> unique_lock(mutex);
      // std::cout << "Writing data" << std::endl;
      // //cv.wait(unique_lock,[&](){return !efDone;});
      // //efDone = !efDone;
      // // cv.notify_one();   
      // try
      // {

      //   std::future<uint8_t*> effectBuffer = promiseBuffer.get_future();
      //   // applyEchoCanceling(buffer);
      //   // std::future<uint8_t*> bufferData = std::async(std::launch::deferred,&Filter::read,this);
      //   wrapper.writeFrames(effectBuffer.get());
      // }
      // catch(std::exception& e)
      // {
      //   std::cout << "[Write exception caught: " << e.what() << "]\n";

      // }
    }
  #else
    // Open the file first 
    wavReader wavFile;
    // Read the file with async
    //std::future<uint8_t*> wavD = std::async(std::launch::deferred,&wavReader::waveLoad,wavFile,"/home/gentbinaku/solaborate_ws/Internal.Audio.tinyalsa_wrapper/test/test_files/3m3f.st_ns5db.pcm128000.wav");
    uint8_t* wavData = wavFile.waveLoad("/home/gentbinaku/solaborate_ws/Internal.Audio.tinyalsa_wrapper/test/test_files/3m3f.st_ns5db.pcm128000.wav");
    WavInfo wInfo = wavFile.returnWavInfo();
    wrapper.initWav(wInfo);
    snd_pcm_uframes_t count,frames;
    count = 0;
    do
    {
       frames = wrapper.write(wInfo.WavePtr + count,wInfo.WaveSize - count);
       // If an error, try to recover from it
       if (frames < 0)
       {
         printf("Error playing wave: %s\n", snd_strerror(frames));
         break;
       }
         //qitu osht problem
       //frames = frames = snd_pcm_writei(PlaybackHandle, WavePtr + count, WaveSize - count);
     }while(count < wInfo.WaveSize);



     // Read the file with async
     // Return the WavInfo File 
     // Write the WavInfo for WavSize


  #endif  

  // hdr = genericWAVHeader(44000, 16, 2); //TODO: Magic numbers, what is 44000, 16 and 2, where did the y came from. Mars ?
  // wrapper.initWrite(SND_PCM_STREAM_PLAYBACK,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate); 
  // //TODO: I understand here, but what is hdr ? 
  // // Just a random name, no association with  something
  // //We are already suffering from Kernel naming convention, please spare us :)
  // while(true) 
  // { //TODO: There is a reason why I asked with Atomic<bool>, in the demo state you write what you read, 
  //   //the real life scenario you have both streams in paralel indipendent from one another
  //   {
  //     std::unique_lock<std::mutex> guard(m);
  //     m_CondVar.wait(guard,std::bind(&Filter::DataLoaded,this)); 
  //     //TODO: Nice approach with Condition variables, 
  //     //but even in the demo state,
  //     // you can create the write thread one buffer later, 
  //     //so you don't need to use condition variable, you are only one buffer late with the write
  //     /* read
  //       --------|--------| 
  //       write
  //               |--------|
  //       at the moment when you read the second buffer, you write the first buffer you red, If you want to use the condition variables, 
  //       then you can use std::Atomic<bool> in the while loop, and sync the threads with condition variables
  //       you can .wait() when from the scenario above, you read faster than you write (this should not happen)
  //  
} 

如果以上内容不清楚,下面将尝试阐明我的问题。

您可以尝试许多方法:

  1. 通过使用条件变量和互斥锁(不成功)
  2. 通过使用std :: future和std :: promise(也不成功)

所以我的问题是我如何设法运行线程

readThread-> effectThread-> writeThread-> readThread-> effectThread-> writeThread ...

提前致谢。