我几个星期以来一直陷入一个问题。
我有三个线程,并且已经实现了我打算使用的线程安全堆栈。
ThreadSafe堆栈实现:
#include <exception>
#include <stack>
struct empty_stack: std::exception
{
virtual const char* what() const throw()
{
return "My exception happened";
}
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() = default;
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack& operator = (const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value = std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
现在,该线程的主要逻辑是可以在任何地方调用。 称为运行的线程,依次控制三个线程的读取,效果,写入。
运行实施:
std::thread t1(&Filter::read,this);
std::thread t3(&Filter::applyEffect,this);
std::thread t2(&Filter::write,this);
t1.detach();
t2.detach();
t3.detach();
std::this_thread::sleep_for(std::chrono::seconds(dur));
您可以从逻辑中看到每个线程将并行运行:
read会将值压入堆栈。 效果将从堆栈中弹出并推回堆栈中。 写操作将从堆栈中弹出。
定义是
threadsafe_stack<uint8_t*> stack;
阅读实现(如您所见,我尝试了很多方法)
uint8_t* Filter::read()
{ //TODO: Bracket please
/**
* I<nitialzing init read with a wav File first
* should be PMC also when testing logic is done
*/
// the access to this function is mutually exclusive
hdr = genericWAVHeader(44000, 16, 2); //TODO: Indentation 2spaces.
wrapper.initDevice(SND_PCM_STREAM_CAPTURE,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate);
while(true)
{
try
{
std::unique_lock<std::mutex> lk(mutex);
_tcond1.wait(lk,[this]{return !readDone;});
std::cout << "Read Thread" << std::endl;
effectDone = true;
std::cout << "Effect Thread starting" << std::endl;
readDone = false;
lk.unlock();
_tcond2.notify_one();
// sem[0].Wait();
// std::cout << "Reading Thread " << std::endl;
// sem[1].Post();
// std::this_thread::sleep_for(std::chrono::milliseconds(2));
// std::unique_lock<std::mutex> lock(mutex);
// std::cout << "Read Thread" << std::endl;
// _tcond2.notify_one();
// _tcond1.wait(lock);
// TransitionGuard guard(ready_for_task1, ready_for_task2);
// std::cout << "Read Thread" << std::endl;
/**
* @brief
* Each time read is run a new promise with the data from wrapper.readFrames();
* is being made and also should be locked(Why it should be locked?)
* Locking it makes it so only read at the time has access to the promise
* Should effect also have access to the promise
* Yes as it should set it new the data structure after using the philips libraries
*/
}catch(const std::exception& e)
{
std::cout << "[Read exception caught: " << e.what() << "]\n";
}
// try
// {
// readyPromise.set_value(wrapper.readFrames());
// std::cout << "Ready Promise value set" << std::endl;
// std::cout << "Signal Effect to be active now" << std::endl;
// std::cout << "-------------------------------" << std::endl;
// }
// catch(const std::exception& e)
// {
// std::cout << "[Read exception caught: " << e.what() << "]\n";
// }
// std::unique_lock<std::mutex> unique_lock(mutex);
// data = wrapper.readFrames();
// cv.wait(unique_lock,[&](){return isDone;});
// isDone = !isDone;
// std::cout << "Reading" << std::endl;
// cv.notify_one();
// std::unique_lock<std::mutex> unique_lock(mutex);
// data = wrapper.readFrames();
// cv.wait(unique_lock,[&](){return isDone;});
// std::cout << "Reading" << std::endl;
// isDone = !isDone;
// cv.notify_one();
// try
// {
// promiseBuffer = std::promise<uint8_t*>();
// promiseBuffer.set_value(wrapper.readFrames());
// }
// catch(std::exception& e)
// {
// std::cout << "[exception caught: " << e.what() << "]\n";
// }
}
// hdr = genericWAVHeader(44000, 16, 2); //TODO: Indentation 2spaces.
// wrapper.initDevice(SND_PCM_STREAM_CAPTURE,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_ratep);
// return wrappe9r.readFrames();
}
应用效果实施:
uint8_t* Filter::applyEffect()
{
/**
* @brief
* ApplyEffect should be done here
* void BeClear_SuperHandsFree_Main
(
const float * const * const spks,
const float * const * const mics,
float * const * const out_com,
float * const * const out_sr
);
float modified_buffer_in[480] = { };
float modified_buffer_out[480] = { };
int RNNoiseProcessBufferIn(struct audio_stream_in *stream, int16_t *in_buffer, size_t bytes)
{
struct stream_in *in = (struct stream_in *)stream;
size_t frames_rq = bytes / audio_stream_in_frame_size(&in->stream.common);
for (int channel = 0; channel < STEREO; channel++)
{
for (int i = 0; i < frames_rq; i++)
{
modified_buffer_in[i] = in_buffer[STEREO * i + channel];
}
rnnoise_process_frame(denoised_state_in[channel], modified_buffer_in, modified_buffer_in);
for (int i = 0; i < frames_rq; i++)
{
in_buffer[STEREO * i + channel] = (int16_t)modified_buffer_in[i];
}
}
return 0;
}
*
*
*
*
*/
/**
*
while(true){
// try
// {
// std::unique_lock<std::mutex> unique_lock(mutex);
// readyForEffect = readyPromise.get_future();
// data = readyForEffect.get();
// cv.wait(unique_lock,[&](){return effectDone;});
// effectDone = !effectDone;
// cv.notify_one();
// /**
// * @brief
// * Philips library goes here.
// */
// //BeClear_SuperHandsFree_Main();
// }
// catch(const std::exception& e)
// {
// std::cout << "*************************" << std::endl;
// std::cout << "[Effect exception caught: " << e.what() << "]\n";
// }
try
{
std::unique_lock<std::mutex> lk(mutex);
_tcond2.wait(lk);
std::cout << "Read Effect Starting" << std::endl;
lk.unlock();
_tcond1.notify_one();
// std::unique_lock<std::mutex> lock(mutex);
// std::cout << "Effect Thread" << std::endl;
// _tcond3.notify_one();
// _tcond2.wait(lock);
// TransitionGuard guard( ready_for_task2, ready_for_task3 );
// std::cout << "Effect Thread" << std::endl;
// sem[1].Wait();
// std::cout << "Effect Thread " << std::endl;
// sem[0].Post();
// std::this_thread::sleep_for(std::chrono::milliseconds(2));
//stack.pop();
//stack.push();
// {
// readyForEffect = std::shared_future<uint8_t*>(readyPromise.get_future());
// uint8_t* d = readyForEffect.get();
// //BeClear_SuperHandsFree_Main();
// readyPromise = std::promise<uint8_t*>();
// readyPromise.set_value(d);
// writePromise.set_value(d);
// readyPromise = std::promise<uint8_t*>();
// readyForEffect.wait();
}
catch(const std::exception& e)
{
std::cout << "*************************" << std::endl;
std::cout << "[Effect exception caught: " << e.what() << "]\n";
};
// std::unique_lock<std::mutex> unique_lock(mutex);
// /**
// * @brief
// * This is where philips library is supposed to be used
// */
// cv.wait(unique_lock,[&](){return !isDone;}); // || efDone;});
// try
// {
// promiseBuffer = std::promise<uint8_t*>();
// promiseBuffer.set_value(data);
// }
// catch(std::exception& e)
// {
// std::cout << "[ ApplyEffect exception caught: " << e.what() << "]\n";
// }
// isDone = !isDone;
// //efDone = !efDone;
// cv.notify_one();
// try
// {
// effectPromise = std::promise<uint8_t*>();
// std::future<uint8_t*> futureBuffer = promiseBuffer.get_future();
// uint8_t* d = futureBuffer.get();
// // d -> actually the data
// // phillips;
// effectPromise.set_value(d);
// }
// catch(std::exception& e)
// {
// std::cout << "[exception caught: " << e.what() << "]\n";
// }
}
编写实现:
bool Filter:: write()
{
/**
* @brief
*
* Should write
*/
#ifndef TESTING
hdr = genericWAVHeader(44000, 16, 2);
wrapper.initWrite(SND_PCM_STREAM_PLAYBACK,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate);
while(true)
{
try
{
/* code */
/**
* @brief
* Signal the other threads to start
* How
*
*/
// std::unique_lock<std::mutex> lock(mutex);
// std::cout << "Write Thread" << std::endl;
// _tcond1.notify_one();
// _tcond3.wait(lock);
TransitionGuard guard( ready_for_task3, ready_for_task1 );
std::cout << "Effect Thread" << std::endl;
//sem[2].Wait(true);
//std::cout << "Writing Thread" << std::endl;
//sem[0].Post();
//std::this_thread::sleep_for(std::chrono::milliseconds(2));
// std::unique_lock<std::mutex> locker(_lockprint3);
// _tcond3.wait(locker);
// std::cout << "Writing thread" << std::endl;
// _tcond1.notify_one();
//readyForRead = std::shared_future<uint8_t*>(readyPromise.get_future());
// readyForWrite = std::shared_future<uint8_t*>(writePromise.get_future());
// readyForWrite.wait();
// uint8_t* p = readyForWrite.get();
// wrapper.writeFrames(p);
// writePromise = std::promise<uint8_t*>();
}
catch(const std::exception& e)
{
std::cout << "[Write exception caught: " << e.what() << "]\n";
}
// std::unique_lock<std::mutex> unique_lock(mutex);
// std::cout << "Writing data" << std::endl;
// cv.wait(unique_lock,[&](){return !effectDone;});
// wrapper.writeFrames(data);
// effectDone = !effectDone;
// cv.notify_one();
// // //std::unique_lock<std::mutex> unique_lock(mutex);
// std::cout << "Writing data" << std::endl;
// //cv.wait(unique_lock,[&](){return !efDone;});
// //efDone = !efDone;
// // cv.notify_one();
// try
// {
// std::future<uint8_t*> effectBuffer = promiseBuffer.get_future();
// // applyEchoCanceling(buffer);
// // std::future<uint8_t*> bufferData = std::async(std::launch::deferred,&Filter::read,this);
// wrapper.writeFrames(effectBuffer.get());
// }
// catch(std::exception& e)
// {
// std::cout << "[Write exception caught: " << e.what() << "]\n";
// }
}
#else
// Open the file first
wavReader wavFile;
// Read the file with async
//std::future<uint8_t*> wavD = std::async(std::launch::deferred,&wavReader::waveLoad,wavFile,"/home/gentbinaku/solaborate_ws/Internal.Audio.tinyalsa_wrapper/test/test_files/3m3f.st_ns5db.pcm128000.wav");
uint8_t* wavData = wavFile.waveLoad("/home/gentbinaku/solaborate_ws/Internal.Audio.tinyalsa_wrapper/test/test_files/3m3f.st_ns5db.pcm128000.wav");
WavInfo wInfo = wavFile.returnWavInfo();
wrapper.initWav(wInfo);
snd_pcm_uframes_t count,frames;
count = 0;
do
{
frames = wrapper.write(wInfo.WavePtr + count,wInfo.WaveSize - count);
// If an error, try to recover from it
if (frames < 0)
{
printf("Error playing wave: %s\n", snd_strerror(frames));
break;
}
//qitu osht problem
//frames = frames = snd_pcm_writei(PlaybackHandle, WavePtr + count, WaveSize - count);
}while(count < wInfo.WaveSize);
// Read the file with async
// Return the WavInfo File
// Write the WavInfo for WavSize
#endif
// hdr = genericWAVHeader(44000, 16, 2); //TODO: Magic numbers, what is 44000, 16 and 2, where did the y came from. Mars ?
// wrapper.initWrite(SND_PCM_STREAM_PLAYBACK,hdr->bits_per_sample,hdr->number_of_channels,hdr->sample_rate);
// //TODO: I understand here, but what is hdr ?
// // Just a random name, no association with something
// //We are already suffering from Kernel naming convention, please spare us :)
// while(true)
// { //TODO: There is a reason why I asked with Atomic<bool>, in the demo state you write what you read,
// //the real life scenario you have both streams in paralel indipendent from one another
// {
// std::unique_lock<std::mutex> guard(m);
// m_CondVar.wait(guard,std::bind(&Filter::DataLoaded,this));
// //TODO: Nice approach with Condition variables,
// //but even in the demo state,
// // you can create the write thread one buffer later,
// //so you don't need to use condition variable, you are only one buffer late with the write
// /* read
// --------|--------|
// write
// |--------|
// at the moment when you read the second buffer, you write the first buffer you red, If you want to use the condition variables,
// then you can use std::Atomic<bool> in the while loop, and sync the threads with condition variables
// you can .wait() when from the scenario above, you read faster than you write (this should not happen)
//
}
如果以上内容不清楚,下面将尝试阐明我的问题。
您可以尝试许多方法:
- 通过使用条件变量和互斥锁(不成功)
- 通过使用std :: future和std :: promise(也不成功)
所以我的问题是我如何设法运行线程
readThread-> effectThread-> writeThread-> readThread-> effectThread-> writeThread ...
提前致谢。