green_dq wrote on Friday, September 16, 2005:
Hi!
Is there any things I must take in mind to share one global object between multiple threads?
I have an object, which contain message pool for threads. It has 2 methods - one to add message for thread and one to check (and read if exists) message.
Data access is done via array of mutexes, but some mystery happens - sometime thread receive "empty message" - i.e. check like
bool a = mQ.getMessage(…);
if(a)
{
do something;
}
goes into if, even if method return false.
I`v send an exaple to author - but may be he is too busy now - so I decided to ask here about this.
P.S. I`m using mingw (from Dev-Cpp distribution). But under Linux I have the same result.
Here is the code (it produces 5 files with debug info - 1 file per thread. Ypu can see if thread reseiven an empty message):
#include <cstdlib>
#include <iostream>
#include <fstream>
#include <assert.h>
#include <math.h>
#include <GL/glfw.h>
using namespace std;
static const size_t NUM_THREADS=5;
static const size_t THREAD_QUEUE_LEN=10;
static const size_t MAX_MSG_LEN=248;
static const size_t MSG_PER_THREAD=100;
typedef struct
{
int32_t f_Sender;
int32_t f_Len;
char f_Data[MAX_MSG_LEN];
}ThreadMessage;
class MsgQ
{
protected:
bool m_was_init;
private:
size_t m_rIdx[NUM_THREADS]; //< next read index for thread [i]
size_t m_wIdx[NUM_THREADS];//< next written index for thread [i]
ThreadMessage m\_Data\[NUM\_THREADS\]\[THREAD\_QUEUE\_LEN\];
GLFWmutex m\_locks\[NUM\_THREADS\];
public:
MsgQ();
~MsgQ();
bool sendMessage\(int32\_t srcID, int32\_t dstID, char \*data, size\_t dataLen\);
bool getMessage\(int32\_t \*srcID, int32\_t dstID, char \*data, size\_t \*dataLen\);
void Init\(\);
};
//******************************************************************
MsgQ::MsgQ()
:m_was_init(0)
{
return;
}
//******************************************************************
MsgQ::~MsgQ()
{
if(!m_was_init) return;
for(size_t i=0;i<NUM_THREADS;i++)
{
glfwDestroyMutex(m_locks[i]);
}
}
//******************************************************************
void MsgQ::Init()
{
if(m_was_init) return; // do not init twice
for(size_t i=0; i<NUM_THREADS;i++)
{
m_locks[i]=glfwCreateMutex();
assert(m_locks[i]);
m_rIdx[i]=0;
m_wIdx[i]=0;
}
m_was_init=true;
}
//******************************************************************
bool MsgQ::sendMessage(int32_t srcID, int32_t dstID, char *data, size_t dataLen)
{
bool isSend=false;
if(!m_was_init) return false;
glfwLockMutex(m_locks[dstID]);
if(m_wIdx[dstID]-m_rIdx[dstID]<THREAD_QUEUE_LEN) // we have space in queue
{
size_t msgIndex = m_wIdx[dstID]%THREAD_QUEUE_LEN; // mapping index to queue position
size_t msgLen=dataLen;
if(msgLen>MAX_MSG_LEN) msgLen=MAX_MSG_LEN; // overflow check
memcpy(m_Data[dstID][msgIndex].f_Data, data, msgLen);
m_Data[dstID][msgIndex].f_Len=msgLen;
m_Data[dstID][msgIndex].f_Sender=srcID;
m_wIdx[dstID]++; // next index to write
isSend=true;
}
glfwUnlockMutex(m_locks[dstID]);
return isSend;
}
//******************************************************************
bool MsgQ::getMessage(int32_t *srcID, int32_t dstID, char *data, size_t *dataLen)
{
bool gotMsg = false;
if(!m_was_init) return false;
glfwLockMutex(m_locks[dstID]);
if(m_wIdx[dstID]>m_rIdx[dstID]) // we have new message in queue
{
size_t msgIndex = m_rIdx[dstID]%THREAD_QUEUE_LEN; // mapping index to queue position
memcpy(data, m_Data[dstID][msgIndex].f_Data, m_Data[dstID][msgIndex].f_Len);
*dataLen = m_Data[dstID][msgIndex].f_Len;
*srcID = m_Data[dstID][msgIndex].f_Sender;
m_rIdx[dstID]++; // next index to read
gotMsg=true;
}
glfwUnlockMutex(m_locks[dstID]);
return gotMsg;
}
//******************************************************************
//**** M E S S A G E Q U E U E ********
//******************************************************************
MsgQ mQ;
//******************************************************************
inline void doSomeWork(void)
{
double _stub, _stub2;
for(int i=0;i<1e5;i++)
{
_stub2=(double)i;
_stub=sqrt(_stub2);
}
}
//========================================================================
// Thread function
//========================================================================
void GLFWCALL worker( void *arg )
{
unsigned int myid=*(unsigned int *) arg;
char buf_in[MAX_MSG_LEN];
char buf_out[MAX_MSG_LEN];
size_t send_msg=0;
size_t dataLen;
int32_t srcID;
int32_t a=0;
char dbg\_fname\[64\];
ofstream dbg\_cns; // personal thread debug console
sprintf\(dbg\_fname,"dbg\_%i",myid\);
dbg\_cns.open\(dbg\_fname\);
while\(send\_msg<MSG\_PER\_THREAD\)
\{
for\(size\_t i=0;i<NUM\_THREADS;i++\)
\{
if\(\(NUM\_THREADS>1\) && \(i==myid\)\) continue; // send a message to self only if alone
memset\(buf\_out, 0, sizeof\(buf\_out\)\);
sprintf\(buf\_out,"\{This is a message \[%i\] from %i thread to %i\}",a , myid, i\);
a++;
if\( mQ.sendMessage\(myid, i, buf\_out, strlen\(buf\_out\) \)\)
\{
doSomeWork\(\);
\}
else glfwSleep\(0.01\);
\}
memset\(buf\_in, 0, sizeof\(buf\_in\)\);
srcID=0; dataLen=0;
if\(mQ.getMessage\(&srcID, myid, buf\_in, &dataLen\)\);
\{
dbg\_cns<<"\n Received a message from thread \["<<srcID<<"\] of total len :"<<dataLen;
dbg\_cns<<"\n|"<<buf\_in<<"|\n";
\}
send\_msg++;
\}// end of while
dbg\_cns.close\(\);
}
//******************************************************************
int main(int argc, char *argv[])
{
GLFWthread thread[NUM_THREADS];
int ids[NUM_THREADS];
printf\( "\n Main thread Started\! \n" \);
// Initialise GLFW
if\( \!glfwInit\(\) \)
\{
return 0;
\}
mQ.Init\(\);
// reset random seed
srand\(\(unsigned int\)time\(\(time\_t \*\)NULL\)\);
// Create threads
for\(size\_t i=0;i<NUM\_THREADS;i++\)
\{
ids\[i\]=i;
thread\[i\] = glfwCreateThread\( worker, &ids\[i\]\);
\}
// Wait for threads to die
for\(size\_t i=0;i<NUM\_THREADS;i++\)
\{
glfwWaitThread\( thread\[i\], GLFW\_WAIT \);
\}
printf\( "\n Main thread finished\! \n" \);
// Terminate GLFW
glfwTerminate\(\);
return EXIT\_SUCCESS;
}
Compiler options:
-I…/include -Wall -ffast-math -g3 -mconsole
Linker options:
-L…/lib/win32 -lglfw -lglu32 -lopengl32
I`m using gcc (GCC) 3.4.2 (mingw-special).
Thank you in advance!