Sharing object between threads - need help

green_dq wrote on Friday, September 16, 2005:

Hi!
Is there any things I must take in mind to share one global object between multiple threads?
I have an object, which contain message pool for threads. It has 2 methods - one to add message for thread and one to check (and read if exists) message.
Data access is done via array of mutexes, but some mystery happens - sometime thread receive "empty message" - i.e. check like
bool a = mQ.getMessage(…);
if(a)
{
do something;
}
goes into if, even if method return false.
I`v send an exaple to author - but may be he is too busy now - so I decided to ask here about this.
P.S. I`m using mingw (from Dev-Cpp distribution). But under Linux I have the same result.

Here is the code (it produces 5 files with debug info - 1 file per thread. Ypu can see if thread reseiven an empty message):

#include <cstdlib>
#include <iostream>
#include <fstream>
#include <assert.h>
#include <math.h>

#include <GL/glfw.h>

using namespace std;

static const size_t NUM_THREADS=5;
static const size_t THREAD_QUEUE_LEN=10;
static const size_t MAX_MSG_LEN=248;
static const size_t MSG_PER_THREAD=100;

typedef struct
{
int32_t f_Sender;
int32_t f_Len;
char f_Data[MAX_MSG_LEN];
}ThreadMessage;

class MsgQ
{
protected:
bool m_was_init;

private:
size_t m_rIdx[NUM_THREADS]; //< next read index for thread [i]
size_t m_wIdx[NUM_THREADS];//< next written index for thread [i]

  ThreadMessage     m\_Data\[NUM\_THREADS\]\[THREAD\_QUEUE\_LEN\];
  GLFWmutex         m\_locks\[NUM\_THREADS\];

public:
MsgQ();
~MsgQ();

bool sendMessage\(int32\_t srcID, int32\_t dstID, char \*data, size\_t dataLen\);
bool getMessage\(int32\_t \*srcID, int32\_t dstID, char \*data, size\_t \*dataLen\);
void Init\(\);

};

//******************************************************************

MsgQ::MsgQ()
:m_was_init(0)
{
return;
}
//******************************************************************

MsgQ::~MsgQ()
{
if(!m_was_init) return;
for(size_t i=0;i<NUM_THREADS;i++)
{
glfwDestroyMutex(m_locks[i]);
}

}
//******************************************************************
void MsgQ::Init()
{
if(m_was_init) return; // do not init twice
for(size_t i=0; i<NUM_THREADS;i++)
{
m_locks[i]=glfwCreateMutex();
assert(m_locks[i]);
m_rIdx[i]=0;
m_wIdx[i]=0;
}
m_was_init=true;
}

//******************************************************************
bool MsgQ::sendMessage(int32_t srcID, int32_t dstID, char *data, size_t dataLen)
{
bool isSend=false;
if(!m_was_init) return false;
glfwLockMutex(m_locks[dstID]);
if(m_wIdx[dstID]-m_rIdx[dstID]<THREAD_QUEUE_LEN) // we have space in queue
{
size_t msgIndex = m_wIdx[dstID]%THREAD_QUEUE_LEN; // mapping index to queue position
size_t msgLen=dataLen;
if(msgLen>MAX_MSG_LEN) msgLen=MAX_MSG_LEN; // overflow check

memcpy(m_Data[dstID][msgIndex].f_Data, data, msgLen);
m_Data[dstID][msgIndex].f_Len=msgLen;
m_Data[dstID][msgIndex].f_Sender=srcID;
m_wIdx[dstID]++; // next index to write
isSend=true;
}
glfwUnlockMutex(m_locks[dstID]);
return isSend;
}

//******************************************************************
bool MsgQ::getMessage(int32_t *srcID, int32_t dstID, char *data, size_t *dataLen)
{
bool gotMsg = false;

if(!m_was_init) return false;

glfwLockMutex(m_locks[dstID]);
if(m_wIdx[dstID]>m_rIdx[dstID]) // we have new message in queue
{
size_t msgIndex = m_rIdx[dstID]%THREAD_QUEUE_LEN; // mapping index to queue position

memcpy(data, m_Data[dstID][msgIndex].f_Data, m_Data[dstID][msgIndex].f_Len);
*dataLen = m_Data[dstID][msgIndex].f_Len;
*srcID = m_Data[dstID][msgIndex].f_Sender;
m_rIdx[dstID]++; // next index to read
gotMsg=true;
}
glfwUnlockMutex(m_locks[dstID]);
return gotMsg;

}
//******************************************************************
//**** M E S S A G E Q U E U E ********
//******************************************************************

MsgQ mQ;

//******************************************************************
inline void doSomeWork(void)
{
double _stub, _stub2;
for(int i=0;i<1e5;i++)
{
_stub2=(double)i;
_stub=sqrt(_stub2);
}
}

//========================================================================
// Thread function
//========================================================================

void GLFWCALL worker( void *arg )
{
unsigned int myid=*(unsigned int *) arg;
char buf_in[MAX_MSG_LEN];
char buf_out[MAX_MSG_LEN];
size_t send_msg=0;
size_t dataLen;
int32_t srcID;
int32_t a=0;

char dbg\_fname\[64\];

ofstream dbg\_cns; // personal thread debug console
    
sprintf\(dbg\_fname,&quot;dbg\_%i&quot;,myid\);
dbg\_cns.open\(dbg\_fname\);

while\(send\_msg&lt;MSG\_PER\_THREAD\)
\{
 for\(size\_t i=0;i&lt;NUM\_THREADS;i++\)
 \{
  if\(\(NUM\_THREADS&gt;1\) &amp;&amp; \(i==myid\)\) continue; // send a message to self only if alone
  memset\(buf\_out, 0, sizeof\(buf\_out\)\);
  sprintf\(buf\_out,&quot;\{This is a message \[%i\] from %i thread to %i\}&quot;,a , myid, i\);
  a++;
  if\( mQ.sendMessage\(myid, i, buf\_out, strlen\(buf\_out\) \)\)
  \{
   doSomeWork\(\);        
   \}
   else glfwSleep\(0.01\); 
 \}

 memset\(buf\_in, 0, sizeof\(buf\_in\)\);
 srcID=0; dataLen=0;
 
 if\(mQ.getMessage\(&amp;srcID, myid, buf\_in, &amp;dataLen\)\);
 \{
  dbg\_cns&lt;&lt;&quot;\n Received a message from thread \[&quot;&lt;&lt;srcID&lt;&lt;&quot;\] of total len :&quot;&lt;&lt;dataLen;
  dbg\_cns&lt;&lt;&quot;\n|&quot;&lt;&lt;buf\_in&lt;&lt;&quot;|\n&quot;;
 \}                             
 send\_msg++;
\}// end of while
dbg\_cns.close\(\);

}

//******************************************************************

int main(int argc, char *argv[])
{
GLFWthread thread[NUM_THREADS];
int ids[NUM_THREADS];

printf\( &quot;\n Main thread Started\! \n&quot; \);

// Initialise GLFW
if\( \!glfwInit\(\) \)
\{
    return 0;
\}

mQ.Init\(\);

// reset random seed
srand\(\(unsigned int\)time\(\(time\_t \*\)NULL\)\);

// Create threads
for\(size\_t i=0;i&lt;NUM\_THREADS;i++\)
\{
 ids\[i\]=i;                         
 thread\[i\] = glfwCreateThread\( worker, &amp;ids\[i\]\);
\}


// Wait for threads to die
for\(size\_t i=0;i&lt;NUM\_THREADS;i++\)
\{
 glfwWaitThread\( thread\[i\], GLFW\_WAIT \);
 \}
printf\( &quot;\n Main thread finished\! \n&quot; \);


// Terminate GLFW
glfwTerminate\(\);


return EXIT\_SUCCESS;

}

Compiler options:
-I…/include -Wall -ffast-math -g3 -mconsole
Linker options:
-L…/lib/win32 -lglfw -lglu32 -lopengl32

I`m using gcc (GCC) 3.4.2 (mingw-special).

Thank you in advance!

green_dq wrote on Saturday, September 17, 2005:

Thanx to all, problem solved.
The problem was here
if(mQ.getMessage(&srcID, myid, buf_in, &dataLen)); <<— a ";" after if.