kathyayini
Programmer
Below is the code which is mainly used 1) main and 2) RECEIVEBUFFER and client code.
If i use context then progran works fine for 13 times but 14th time fopen function
in RECEIVEBUFFER fails. If i do not use context then program runs without any problem.
there is some problem in the way i am deploying context.
i have to use context because i am using multithreading.
//*****************************************************************************
Server Code :
void *RECEIVEBUFFER(void* data)
{
struct sqlca sqlca;
int src_count;
sql_context ctx=params->ctx;
EXEC SQL BEGIN DECLARE SECTION;
char *filename [101];
int socketId = 0;
char cRefNo [15];
char cInput [4000];
char tstr [10];
int condition = 0;
EXEC SQL END DECLARE SECTION;
EXEC SQL CONTEXT USE :ctx;
EXEC SQL COMMIT;
pthread_cond_init(&tCounter.cond,tCounter.attr);
maptable inputMap;
maptable outputMap;
maptable::iterator pMapIter;
while( condition < 10 )
{
pthread_mutex_lock(&(tCounter.mutex));
while( condition == 10 )
pthread_cond_wait( &tCounter.cond, &tCounter.mutex);
condition ++;
pthread_cond_signal( &tCounter.cond );
pthread_mutex_unlock(&(tCounter.mutex));
}
socketId = ((struct sdata*) (data))->socket_id;
int nbytes;
memset(filename,0,101);
nbytes = recv(socketId, filename, 101, MSG_NOSIGNAL);
int rc, i,rv;
char sendBuffer [10000];
char msg [2001];
FILE *fp;
char itransNo [15];
if (access((char*)filename, F_OK) != 0)
{
printf("Error while accessing file\n"
;
pthread_exit((void *)-3);
}
fp = fopen((char*)filename, "r"
;
if (fp == (FILE *)NULL) pthread_exit((void *)-2);
pthread_mutex_lock(&(tCounter.mutex));
strcpy(msg,""
;
pthread_mutex_unlock(&(tCounter.mutex));
memset(msg, '\0', sizeof(msg));
fgets(msg, sizeof(msg)-1, fp);
fclose (fp);
pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,""
;
inputMap.erase(inputMap.begin(), inputMap.end());
outputMap.erase(outputMap.begin(), outputMap.end());
inputMap.clear();
outputMap.clear();
pthread_mutex_unlock(&(tCounter.mutex));
rc = DecodeISO8583(msg,inputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
pthread_mutex_lock(&(tCounter.mutex));
printf("Message Length = %d\n", rc);
printf("Version = %d\n", iVersion);
printf("Message Type = %d\n", iInMsgType);
printf("Msg = "
;
for (i=0; i<rc; i++) if(msg < ' ') printf("."
; else printf("%c", msg);
printf("\n"
;
for (pMapIter = inputMap.begin(); pMapIter != inputMap.end(); pMapIter++)
printf("Field %03d, Value %s\n", pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
switch (iInMsgType)
{
pthread_mutex_lock(&(tCounter.mutex));
case 200 : iOutMsgType = 210; break;
case 220 :
case 221 : iOutMsgType = 230; break;
case 420 :
case 421 : iOutMsgType = 430; break;
case 604 : iOutMsgType = 614; break;
case 624 :
case 625 : iOutMsgType = 634; break;
case 804 : iOutMsgType = 814;
if (iFunctionCode == 801 || iFunctionCode == 802 || iFunctionCode == 831 || iFunctionCode == 880)
strcpy(tstr, "800"
;
else strcpy(tstr, "902"
;
outputMap.insert(maptable::value_type(39, tstr));
break;
default : pthread_exit((void *)-4);
pthread_mutex_unlock(&(tCounter.mutex));
}
rc = CopyFields(inputMap,outputMap,pMapIter);
if (rc < 0)
{
DisConnectDB();
pthread_exit((void *)rc);
}
if (iInMsgType != 804)
{
pthread_mutex_lock(&(tCounter.mutex));
pMapIter = inputMap.find(37);
strcpy(cRefNo, (char *)pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL SELECT INPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO = :cRefNo AND STATUS in('S','D');
pthread_mutex_unlock(&(tCounter.mutex));
if(sqlca.sqlerrd[2] == 0)
{
rc=InsertInTable(inputMap,pMapIter,cRefNo);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
}
else if (sqlca.sqlcode < 0)
{
printf("Error %d while fetching record***\n",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,""
;
EXEC SQL SELECT OUTPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO= :cRefNo AND STATUS = 'D';
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlerrd[2] > 0)
{
rc=GenerateOutputMap(cInput,outputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL UPDATE LOG_TRANSACTION SET STATUS='N' WHERE REF_NO=:cRefNo AND STATUS = 'D';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf("Error %d while updating record in LOG_TRANSACTION table@.\n", sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
else
{
pthread_exit((void *)rc);
}
}
pthread_exit((void *)rc);
}
else if (sqlca.sqlcode < 0)
{
printf("Error %d while ***fetching record.\n",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
rc = initTuxedo();
if (rc != 0) { DisConnectDB(); pthread_exit((void *)rc); }
rc = authenticateServer(0);
if (rc != 0) { tpterm(); DisConnectDB(); pthread_exit((void *)rc); }
pthread_mutex_lock(&(tCounter.mutex));
strcpy((char *)vcSourceProduct.arr, "SWIT"
;
vcSourceProduct.len = strlen((char *)vcSourceProduct.arr);
pthread_mutex_unlock(&(tCounter.mutex));
memset(sendBuffer, ' ', MLHEADER_LEN);
sendBuffer[MLHEADER_LEN] = '\0';
strcat(sendBuffer, "00000"
;
rc = MapTransactionData(sendBuffer,inputMap,pMapIter,cRefNo);
if (rc < 0)
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}
rc = CallGenericService(sendBuffer);
if ((rc == -5) || (rc == -6))
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL
UPDATE LOG_TRANSACTION SET STATUS = 'D' WHERE REF_NO=:cRefNo AND STATUS = 'S';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf("Error %d while updating LOG_TRANSACTION table",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
}
if (rc == 0)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
}
}
else if(rc == -1)
{
if(tpurcode == 0)
{
outputMap.insert(maptable::value_type(39,"912"
);
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
else if(tpurcode == -1)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB(); pthread_exit((void *)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
}
}
}
rc = EncodeISO8583(msg,outputMap,pMapIter);
if (iInMsgType != 804) tpterm();
DisConnectDB();
pthread_mutex_lock(&(tCounter.mutex));
printf("Message Length = %d\n", rc);
printf("Version = %d\n", iVersion);
printf("Message Type = %d\n", iOutMsgType);
printf("Msg = "
;
for (i=0; i<rc; i++) if(msg < ' ') printf("."
; else printf("%c", msg);
printf("\n"
;
for (pMapIter = outputMap.begin(); pMapIter != outputMap.end(); pMapIter++)
printf("Field %03d, Value %s\n", pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
pthread_exit((void*)rc);
return (NULL);
}
int main()
{
int rc, i,rv;
int serverSocket;
int addrLen;
struct sockaddr_in sockAddr;
int threadCount=0;
int max_concurrent_alwd=0;
int nextIndex;
pthread_t receiveBufferThread[MAX_THREADS],cur;
pthread_attr_t attr;
size_t stacksize;
int segment_size=0;
int optval;
int total;
int iCtxCount=1;
EXEC SQL BEGIN DECLARE SECTION;
sql_context ctx[THREADS];
int sock_id = 0;
EXEC SQL END DECLARE SECTION;
pthread_t thread_id[THREADS];
pthread_attr_t patr;
EXEC SQL ENABLE THREADS;
pthread_mutex_init(&tCounter.mutex,NULL);
pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize (&attr, stacksize);
sockAddr.sin_family=AF_INET;
sockAddr.sin_addr.s_addr=INADDR_ANY;
sockAddr.sin_port=htons(PORT_NO);
addrLen=sizeof(struct sockaddr_in);
if((serverSocket=socket(AF_INET,SOCK_STREAM,0))== -1)
{
printf("Failed to create Socket\n"
;
return -1;
}
else
{
printf("Socket created successfully\n"
;
}
optval = 1;
rc = setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR,(char *)&optval, sizeof(optval));
if(bind(serverSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==-1)
{
printf("Failed to bind at port %d\n",PORT_NO);
return -1;
}
else
{
printf("Binding at port %d done successfully\n",PORT_NO);
}
tCounter.threadCount=0;
if(listen(serverSocket,1000) == -1)
{
printf("Failed to listen at port %d\n",PORT_NO);
return -1;
}
else
{
printf("Listening at port %d\n",PORT_NO);
}
while(1)
{
sock_id = accept(serverSocket,(struct sockaddr_in*)&sockAddr,&addrLen);
pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
if(sock_id == -1)
{
if(errno == EINTR)
{
printf("Signal Recived coming out \n"
;
break;
}
else
{
continue;
}
}
nextIndex = getFree();
newSocket[nextIndex].socket_id = sock_id;
newSocket[nextIndex].segment_size=segment_size;
for(i=0;i<1;i++)
{
EXEC SQL CONTEXT ALLOCATE :ctx;
EXEC SQL CONTEXT USE :ctx;
EXEC SQL CONNECT :CONNINFO;
printf("Connected!\n"
;
}
for(i=0;i<1;i++)
{
params.ctx=ctx;
params.thread_id=i;
pthread_create(&cur, NULL, RECEIVEBUFFER,(void*)&newSocket[nextIndex]);
send(sock_id,"Success@@@",10,MSG_NOSIGNAL);
}
iCtxCount++;
pthread_detach(cur);
pthread_mutex_lock(&(tCounter.mutex));
tCounter.threadCount++;
pthread_mutex_unlock(&(tCounter.mutex));
}
pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
for(i=0;i<=threadCount;i++)
{
pthread_join(cur,NULL);
}
TP_TERMINATE(serverSocket);
DisConnectDB();
return 0;
}
//*****************************************************************************
Client code :
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#define MESSAGE_LENGTH 101
#define PORT_NO 8891
#define BACKLOG 5
#define SERVER_IP "127.0.0.1"
#define NO_OF_RETRY 5
#define CHUNK_LENGTH 10
int TP_INIT(void);
int TP_CALL(char*,int,int);
int TP_TERMINATE(int);
int TP_INIT(void)
{
int newSocket,retryCount;
struct sockaddr_in sockAddr;
if((newSocket=socket(AF_INET,SOCK_STREAM,0))>0)
//printf("\nSocket created...\n"
;
sockAddr.sin_family=AF_INET;
sockAddr.sin_port=htons(PORT_NO);
inet_pton(AF_INET,SERVER_IP,&sockAddr.sin_addr);
retryCount=0;
while(retryCount<NO_OF_RETRY)
{
if(connect(newSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==0)
{
//printf("\nConnection established with the server %s...\n",inet_ntoa(sockAddr.sin_addr));
break;
}
retryCount++;
}
return newSocket;
}
int TP_CALL(char *buffer,int bufferLength,int socketId)
{
char packetLength[5];
int bytesSent;
printf("\nRequest is :=%s Buffer length= %d\n",buffer,bufferLength);
sprintf(packetLength,"%ld",bufferLength);
//send(socketId,packetLength,sizeof(long),0);
bytesSent=send(socketId,buffer,bufferLength,0);
printf("Send data size is :%d\n",bytesSent);
recv(socketId,buffer,MESSAGE_LENGTH,0);
printf("\nAcknowledgement message= %s\n",buffer);
return bytesSent;
}
int TP_TERMINATE(int socketId)
{
return close(socketId);
}
int main(int argc,char * argv[])
{
for(int z=0;z<2;z++)
{
int socket,i;
char buffer[MESSAGE_LENGTH],bufLength=0;
socket=TP_INIT();
memset(buffer,0,MESSAGE_LENGTH-1);
if(strcmp(argv[1] , "a"
== 0)
{
if(z==0)
strcpy(buffer,"CWD_RQST_FROM_ELECTRA"
;
//strcpy(buffer,"qqqqqq"
;
if(z==1)
strcpy(buffer,"ECHOTEST_RQST_FROM_ELECTRA"
;
}
if(strcmp(argv[1] , "b"
== 0)
{
if(z==0)
strcpy(buffer,"ECHOTEST_RQST_FROM_ELECTRA1"
;
if(z==1)
strcpy(buffer,"MINISTMT_RQST_FROM_ELECTRA"
;
}
sleep(1);
TP_CALL(buffer,strlen(buffer),socket);
TP_TERMINATE(socket);
}
return 0;
}
If i use context then progran works fine for 13 times but 14th time fopen function
in RECEIVEBUFFER fails. If i do not use context then program runs without any problem.
there is some problem in the way i am deploying context.
i have to use context because i am using multithreading.
//*****************************************************************************
Server Code :
void *RECEIVEBUFFER(void* data)
{
struct sqlca sqlca;
int src_count;
sql_context ctx=params->ctx;
EXEC SQL BEGIN DECLARE SECTION;
char *filename [101];
int socketId = 0;
char cRefNo [15];
char cInput [4000];
char tstr [10];
int condition = 0;
EXEC SQL END DECLARE SECTION;
EXEC SQL CONTEXT USE :ctx;
EXEC SQL COMMIT;
pthread_cond_init(&tCounter.cond,tCounter.attr);
maptable inputMap;
maptable outputMap;
maptable::iterator pMapIter;
while( condition < 10 )
{
pthread_mutex_lock(&(tCounter.mutex));
while( condition == 10 )
pthread_cond_wait( &tCounter.cond, &tCounter.mutex);
condition ++;
pthread_cond_signal( &tCounter.cond );
pthread_mutex_unlock(&(tCounter.mutex));
}
socketId = ((struct sdata*) (data))->socket_id;
int nbytes;
memset(filename,0,101);
nbytes = recv(socketId, filename, 101, MSG_NOSIGNAL);
int rc, i,rv;
char sendBuffer [10000];
char msg [2001];
FILE *fp;
char itransNo [15];
if (access((char*)filename, F_OK) != 0)
{
printf("Error while accessing file\n"
pthread_exit((void *)-3);
}
fp = fopen((char*)filename, "r"
if (fp == (FILE *)NULL) pthread_exit((void *)-2);
pthread_mutex_lock(&(tCounter.mutex));
strcpy(msg,""
pthread_mutex_unlock(&(tCounter.mutex));
memset(msg, '\0', sizeof(msg));
fgets(msg, sizeof(msg)-1, fp);
fclose (fp);
pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,""
inputMap.erase(inputMap.begin(), inputMap.end());
outputMap.erase(outputMap.begin(), outputMap.end());
inputMap.clear();
outputMap.clear();
pthread_mutex_unlock(&(tCounter.mutex));
rc = DecodeISO8583(msg,inputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
pthread_mutex_lock(&(tCounter.mutex));
printf("Message Length = %d\n", rc);
printf("Version = %d\n", iVersion);
printf("Message Type = %d\n", iInMsgType);
printf("Msg = "
for (i=0; i<rc; i++) if(msg < ' ') printf("."
printf("\n"
for (pMapIter = inputMap.begin(); pMapIter != inputMap.end(); pMapIter++)
printf("Field %03d, Value %s\n", pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
switch (iInMsgType)
{
pthread_mutex_lock(&(tCounter.mutex));
case 200 : iOutMsgType = 210; break;
case 220 :
case 221 : iOutMsgType = 230; break;
case 420 :
case 421 : iOutMsgType = 430; break;
case 604 : iOutMsgType = 614; break;
case 624 :
case 625 : iOutMsgType = 634; break;
case 804 : iOutMsgType = 814;
if (iFunctionCode == 801 || iFunctionCode == 802 || iFunctionCode == 831 || iFunctionCode == 880)
strcpy(tstr, "800"
else strcpy(tstr, "902"
outputMap.insert(maptable::value_type(39, tstr));
break;
default : pthread_exit((void *)-4);
pthread_mutex_unlock(&(tCounter.mutex));
}
rc = CopyFields(inputMap,outputMap,pMapIter);
if (rc < 0)
{
DisConnectDB();
pthread_exit((void *)rc);
}
if (iInMsgType != 804)
{
pthread_mutex_lock(&(tCounter.mutex));
pMapIter = inputMap.find(37);
strcpy(cRefNo, (char *)pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL SELECT INPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO = :cRefNo AND STATUS in('S','D');
pthread_mutex_unlock(&(tCounter.mutex));
if(sqlca.sqlerrd[2] == 0)
{
rc=InsertInTable(inputMap,pMapIter,cRefNo);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
}
else if (sqlca.sqlcode < 0)
{
printf("Error %d while fetching record***\n",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,""
EXEC SQL SELECT OUTPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO= :cRefNo AND STATUS = 'D';
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlerrd[2] > 0)
{
rc=GenerateOutputMap(cInput,outputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL UPDATE LOG_TRANSACTION SET STATUS='N' WHERE REF_NO=:cRefNo AND STATUS = 'D';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf("Error %d while updating record in LOG_TRANSACTION table@.\n", sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
else
{
pthread_exit((void *)rc);
}
}
pthread_exit((void *)rc);
}
else if (sqlca.sqlcode < 0)
{
printf("Error %d while ***fetching record.\n",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
rc = initTuxedo();
if (rc != 0) { DisConnectDB(); pthread_exit((void *)rc); }
rc = authenticateServer(0);
if (rc != 0) { tpterm(); DisConnectDB(); pthread_exit((void *)rc); }
pthread_mutex_lock(&(tCounter.mutex));
strcpy((char *)vcSourceProduct.arr, "SWIT"
vcSourceProduct.len = strlen((char *)vcSourceProduct.arr);
pthread_mutex_unlock(&(tCounter.mutex));
memset(sendBuffer, ' ', MLHEADER_LEN);
sendBuffer[MLHEADER_LEN] = '\0';
strcat(sendBuffer, "00000"
rc = MapTransactionData(sendBuffer,inputMap,pMapIter,cRefNo);
if (rc < 0)
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}
rc = CallGenericService(sendBuffer);
if ((rc == -5) || (rc == -6))
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL
UPDATE LOG_TRANSACTION SET STATUS = 'D' WHERE REF_NO=:cRefNo AND STATUS = 'S';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf("Error %d while updating LOG_TRANSACTION table",sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
}
if (rc == 0)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
}
}
else if(rc == -1)
{
if(tpurcode == 0)
{
outputMap.insert(maptable::value_type(39,"912"
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
else if(tpurcode == -1)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB(); pthread_exit((void *)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
}
}
}
rc = EncodeISO8583(msg,outputMap,pMapIter);
if (iInMsgType != 804) tpterm();
DisConnectDB();
pthread_mutex_lock(&(tCounter.mutex));
printf("Message Length = %d\n", rc);
printf("Version = %d\n", iVersion);
printf("Message Type = %d\n", iOutMsgType);
printf("Msg = "
for (i=0; i<rc; i++) if(msg < ' ') printf("."
printf("\n"
for (pMapIter = outputMap.begin(); pMapIter != outputMap.end(); pMapIter++)
printf("Field %03d, Value %s\n", pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));
pthread_exit((void*)rc);
return (NULL);
}
int main()
{
int rc, i,rv;
int serverSocket;
int addrLen;
struct sockaddr_in sockAddr;
int threadCount=0;
int max_concurrent_alwd=0;
int nextIndex;
pthread_t receiveBufferThread[MAX_THREADS],cur;
pthread_attr_t attr;
size_t stacksize;
int segment_size=0;
int optval;
int total;
int iCtxCount=1;
EXEC SQL BEGIN DECLARE SECTION;
sql_context ctx[THREADS];
int sock_id = 0;
EXEC SQL END DECLARE SECTION;
pthread_t thread_id[THREADS];
pthread_attr_t patr;
EXEC SQL ENABLE THREADS;
pthread_mutex_init(&tCounter.mutex,NULL);
pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize (&attr, stacksize);
sockAddr.sin_family=AF_INET;
sockAddr.sin_addr.s_addr=INADDR_ANY;
sockAddr.sin_port=htons(PORT_NO);
addrLen=sizeof(struct sockaddr_in);
if((serverSocket=socket(AF_INET,SOCK_STREAM,0))== -1)
{
printf("Failed to create Socket\n"
return -1;
}
else
{
printf("Socket created successfully\n"
}
optval = 1;
rc = setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR,(char *)&optval, sizeof(optval));
if(bind(serverSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==-1)
{
printf("Failed to bind at port %d\n",PORT_NO);
return -1;
}
else
{
printf("Binding at port %d done successfully\n",PORT_NO);
}
tCounter.threadCount=0;
if(listen(serverSocket,1000) == -1)
{
printf("Failed to listen at port %d\n",PORT_NO);
return -1;
}
else
{
printf("Listening at port %d\n",PORT_NO);
}
while(1)
{
sock_id = accept(serverSocket,(struct sockaddr_in*)&sockAddr,&addrLen);
pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
if(sock_id == -1)
{
if(errno == EINTR)
{
printf("Signal Recived coming out \n"
break;
}
else
{
continue;
}
}
nextIndex = getFree();
newSocket[nextIndex].socket_id = sock_id;
newSocket[nextIndex].segment_size=segment_size;
for(i=0;i<1;i++)
{
EXEC SQL CONTEXT ALLOCATE :ctx;
EXEC SQL CONTEXT USE :ctx;
EXEC SQL CONNECT :CONNINFO;
printf("Connected!\n"
}
for(i=0;i<1;i++)
{
params.ctx=ctx;
params.thread_id=i;
pthread_create(&cur, NULL, RECEIVEBUFFER,(void*)&newSocket[nextIndex]);
send(sock_id,"Success@@@",10,MSG_NOSIGNAL);
}
iCtxCount++;
pthread_detach(cur);
pthread_mutex_lock(&(tCounter.mutex));
tCounter.threadCount++;
pthread_mutex_unlock(&(tCounter.mutex));
}
pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
for(i=0;i<=threadCount;i++)
{
pthread_join(cur,NULL);
}
TP_TERMINATE(serverSocket);
DisConnectDB();
return 0;
}
//*****************************************************************************
Client code :
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#define MESSAGE_LENGTH 101
#define PORT_NO 8891
#define BACKLOG 5
#define SERVER_IP "127.0.0.1"
#define NO_OF_RETRY 5
#define CHUNK_LENGTH 10
int TP_INIT(void);
int TP_CALL(char*,int,int);
int TP_TERMINATE(int);
int TP_INIT(void)
{
int newSocket,retryCount;
struct sockaddr_in sockAddr;
if((newSocket=socket(AF_INET,SOCK_STREAM,0))>0)
//printf("\nSocket created...\n"
sockAddr.sin_family=AF_INET;
sockAddr.sin_port=htons(PORT_NO);
inet_pton(AF_INET,SERVER_IP,&sockAddr.sin_addr);
retryCount=0;
while(retryCount<NO_OF_RETRY)
{
if(connect(newSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==0)
{
//printf("\nConnection established with the server %s...\n",inet_ntoa(sockAddr.sin_addr));
break;
}
retryCount++;
}
return newSocket;
}
int TP_CALL(char *buffer,int bufferLength,int socketId)
{
char packetLength[5];
int bytesSent;
printf("\nRequest is :=%s Buffer length= %d\n",buffer,bufferLength);
sprintf(packetLength,"%ld",bufferLength);
//send(socketId,packetLength,sizeof(long),0);
bytesSent=send(socketId,buffer,bufferLength,0);
printf("Send data size is :%d\n",bytesSent);
recv(socketId,buffer,MESSAGE_LENGTH,0);
printf("\nAcknowledgement message= %s\n",buffer);
return bytesSent;
}
int TP_TERMINATE(int socketId)
{
return close(socketId);
}
int main(int argc,char * argv[])
{
for(int z=0;z<2;z++)
{
int socket,i;
char buffer[MESSAGE_LENGTH],bufLength=0;
socket=TP_INIT();
memset(buffer,0,MESSAGE_LENGTH-1);
if(strcmp(argv[1] , "a"
{
if(z==0)
strcpy(buffer,"CWD_RQST_FROM_ELECTRA"
//strcpy(buffer,"qqqqqq"
if(z==1)
strcpy(buffer,"ECHOTEST_RQST_FROM_ELECTRA"
}
if(strcmp(argv[1] , "b"
{
if(z==0)
strcpy(buffer,"ECHOTEST_RQST_FROM_ELECTRA1"
if(z==1)
strcpy(buffer,"MINISTMT_RQST_FROM_ELECTRA"
}
sleep(1);
TP_CALL(buffer,strlen(buffer),socket);
TP_TERMINATE(socket);
}
return 0;
}