- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to write a tcp/ip server that create new thread on each new connection.
The problem I'm facing is that the thread are correctly created, but as soon as one thread is requested to stop (upon connection drop for exemple) all thread are stopped.
The problem do not occur if the thread have nothing to do with communication.
here below is my source code, if the variable switch in routine DC_DEMARRE_SERVEUR have for value 1 then the thread created have nothing to do with communication and created thread stop automaticaly after a certain period, keeping the other thread alive.
If the variable switch in routine DC_DEMARRE_SERVEUR have for value 2 then the thread created are receiving message via TCP/IP and for test also stop after a certain period, but in that case, the first thread being stop, will stop all the thread!! I can not figure out why.
Could somebody help me?
Thanks
!> @brief Program . !> @details . !> @return none PROGRAM MTCOM USE KERNEL32 USE WS2_32 implicit none !------------------------------------------------------------------------- ! INCLUDED FILES !------------------------------------------------------------------------- !------------------------------------------------------------------------- ! LOCAL DATA !------------------------------------------------------------------------- logical*1 FOREVER /.TRUE./ INTEGER (KIND = 4) :: WDG_DELAY_S = 10 ! activate watchdog every seconds real*4 ellapsed_time integer*2 portDisponible INTEGER*4 exit_code integer*4 i,status INTEGER*4 rcvsize INTEGER*4 BUFFER logical*1 DC_DEMARRE_SERVEUR cx----------------------------------- INTEGER WINSOCK_V2_2 INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY INTEGER SUCCESS PARAMETER(WINSOCK_V2_2 = X'202',CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0) TYPE(T_SOCKADDR_IN) listenerInfo,listener1Info TYPE(T_SOCKADDR_IN) connectionInfo TYPE(T_WSADATA) wsaInfo TYPE(T_FD_SET) writeset TYPE(T_FD_SET) readset TYPE(T_timeval) timeset TYPE T_CLIENT_SERVER_MESSAGE SEQUENCE UNION MAP INTEGER Code INTEGER dest INTEGER msgSize UNION MAP CHARACTER*16 dest_buffer END MAP MAP INTEGER dest_code CHARACTER*12 msg_content END MAP END UNION END MAP MAP CHARACTER*1048 buffer END MAP END UNION END TYPE TYPE(T_CLIENT_SERVER_MESSAGE) clientServerMessage INTEGER connection,nb_connection INTEGER*4 connection_list(10) /10*-1/ INTEGER ioctlsocket_bck /-1/ INTEGER listener,NonBlock CHARACTER*16 host INTEGER*2 port INTEGER SendMsg INTEGER ReceiveMsg INTEGER fd_zero(64) data fd_zero /64*0/ integer*4 block_t integer*4 nb_thread /0/ logical*1 IS_THREAD_SERVEUR_ALIVE logical*1 connection_found structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r !------------------------------------------------------------------------- ! CODE !------------------------------------------------------------------------- !initialization : ! Winsock v2. not initialized listener = INVALID_SOCKET connection = SOCKET_ERROR host = '127.0.0.1' port = 5000 ellapsed_time = SECNDS(0.0) ! initialized ellapsed_time ! Initialize Winsock v2. if needed 10 IF (listener .EQ. INVALID_SOCKET) THEN status = WSAStartup(WINSOCK_V2_2, wsaInfo) IF (status .NE. SUCCESS) THEN write(6,'(" WSAStartup error -",I6)')status ENDIF listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) IF (listener .EQ. INVALID_SOCKET) THEN write(6,'(" socket error - ",I6)')INVALID_SOCKET status = WSACleanup() GOTO 10 ELSE listenerInfo%sin_family = AF_INET listenerInfo%sin_port = htons(port) listenerInfo%sin_addr%s_addr = inet_addr(host(1:LEN_TRIM(host))) status = bind(listener,%REF(listenerInfo),SIZEOF(listenerInfo)) IF (status .EQ. SOCKET_ERROR) THEN write(6,'(" bind error- - ",I6)')WSAGetLastError() status = WSACleanup() listener = INVALID_SOCKET connection = SOCKET_ERROR GOTO 10 ENDIF ENDIF ENDIF IF (listener .NE. INVALID_SOCKET) THEN status = listen(listener, 5) IF (status .EQ. SOCKET_ERROR) THEN write(6,'(" listen error- - ",I6)')WSAGetLastError() status = WSACleanup() listener = INVALID_SOCKET connection = SOCKET_ERROR GOTO 10 ENDIF ! Change the socket mode on the listening socket from blocking to ! non-block so the application will not block waiting for requests NonBlock = 1 status = ioctlsocket(listener, FIONBIO,%ref(NonBlock)) if (status .EQ. SOCKET_ERROR) THEN ioctlsocket_bck = status write(6,'(" ioctlsocket failed with error- - ",I6)')WSAGetLastError() ELSE if (ioctlsocket_bck .ne. status) then ioctlsocket_bck = status write(6,'(" ioctlsocket is OK!")') readset%fd_count = 0 readset%fd_array = fd_zero writeset%fd_count =0 writeset%fd_array = fd_zero readset%fd_count = 1 readset%fd_array(1) = listener timeset%tv_sec = 0 timeset%tv_usec= 100000 status = select(0,%ref(readset),%ref(writeset),0,%ref(timeset)) if (status .EQ. SOCKET_ERROR) THEN write(6,'(" Select failed with error- - ",I6)')WSAGetLastError() end if end if nb_connection = 0 DO WHILE (nb_connection.le.10) connection = accept(listener,%REF(connectionInfo),0) if (connection .gt. 0 ) then connection_found =.false. do i=1,10 if ((connection.eq.connection_list(i)).and.(connection_list(i).ne.-1)) then connection_found = .true. end if end do if (.not.connection_found) then write(6,'(" New connection - ",I6," on list ",i6)')connection,listener nb_connection =nb_connection+1 thread_r(nb_connection).listener = listener thread_r(nb_connection).connection = connection thread_r(nb_connection).duration = nb_connection*25 if (.not.DC_DEMARRE_SERVEUR(nb_connection)) then write(6,'(" Fail to start thread nb ",i6)')nb_connection end if end if end if END DO ENDIF ENDIF END logical*1 FUNCTION DC_DEMARRE_SERVEUR(threadnb) USE IFPORT USE IFCORE use IFMT implicit none INTERFACE integer(4) FUNCTION Thread_Proc2(lpThreadParameter) !DEC$ ATTRIBUTES STDCALL,ALIAS:"_thread_proc2" :: Thread_Proc2 integer(INT_PTR_KIND()) lpThreadParameter END FUNCTION END INTERFACE INTERFACE integer(4) FUNCTION Thread_Proc1(lpThreadParameter) !DEC$ ATTRIBUTES STDCALL,ALIAS:"_thread_proc1" :: Thread_Proc1 integer(INT_PTR_KIND()) lpThreadParameter END FUNCTION END INTERFACE CHARACTER*1 kbd INTEGER(KIND=INT_PTR_KIND( )) handle,handle1 logical*4 stopth,startwork integer*4 exit_code,i,j,iretlog,iretint,timedate INTEGER*4 threadnb,switch integer(INT_PTR_KIND()) ThreadHandle1 structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r switch = 2 thread_r(threadnb).stopth = .false. thread_r(threadnb).startwork =.false. thread_r(threadnb).THREAD_START_REQ =.true. if (switch.eq.1) then ThreadHandle1 = CreateThread(0,0,Thread_Proc1,loc(threadnb),CREATE_SUSPENDED,thread_r(threadnb).id) else ThreadHandle1 = CreateThread(0,0,Thread_Proc2,loc(threadnb),CREATE_SUSPENDED,thread_r(threadnb).id) end if iretlog = SetThreadPriority(ThreadHandle1,THREAD_PRIORITY_BELOW_NORMAL ) iretint = ResumeThread(ThreadHandle1) WRITE(6,*)" THREAD ",threadnb," Nb ",ThreadHandle1," ID ",thread_r(threadnb).id," CREATED" i=0 iretlog = GetExitCodeThread( ThreadHandle1, exit_code ) if(exit_code.eq.STILL_ACTIVE) then thread_r(threadnb).ThreadHandle1 = ThreadHandle1 DC_DEMARRE_SERVEUR = .TRUE. else DC_DEMARRE_SERVEUR = .FALSE. endif END FUNCTION DC_DEMARRE_SERVEUR logical*1 FUNCTION IS_THREAD_SERVEUR_ALIVE(threadnb) USE IFPORT USE IFCORE use IFMT implicit none integer*4 exit_code,iretlog INTEGER*4 threadnb structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r iretlog = GetExitCodeThread( thread_r(threadnb).ThreadHandle1, exit_code ) if(exit_code.eq.STILL_ACTIVE) then IS_THREAD_SERVEUR_ALIVE = .TRUE. else IS_THREAD_SERVEUR_ALIVE = .FALSE. endif END FUNCTION IS_THREAD_SERVEUR_ALIVE integer(4) function Thread_Proc2(lpThreadParameter) !DEC$ ATTRIBUTES STDCALL, ALIAS:"_thread_proc2" :: Thread_Proc2 USE IFCORE USE IFMT USE WS2_32 implicit none integer(INT_PTR_KIND()) lpThreadParameter INTEGER*4 threadnb POINTER(parg,threadnb) INTEGER*4 i INTEGER*4 threadID,index real*4 t,delta logical*1 FOREVER /.TRUE./ INTEGER (KIND = 4) :: WDG_DELAY_S = 5 ! activate watchdog every seconds real*4 ellapsed_time integer*4 status integer*4 rcvsize INTEGER*4 BUFFER cx----------------------------------- INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY INTEGER SUCCESS PARAMETER(CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0) TYPE T_CLIENT_SERVER_MESSAGE SEQUENCE UNION MAP INTEGER Code INTEGER dest INTEGER msgSize UNION MAP CHARACTER*16 dest_buffer END MAP MAP INTEGER dest_code CHARACTER*12 msg_content END MAP END UNION END MAP MAP CHARACTER*1048 buffer END MAP END UNION END TYPE TYPE(T_CLIENT_SERVER_MESSAGE) clientServerMessage INTEGER connection INTEGER SendMsg INTEGER ReceiveMsg integer*4 counter structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r parg = lpThreadParameter write(6,*) "The value of the Thread_Proc1 argument is ",threadnb threadID = GetCurrentThreadId() write(6,'(" NS THREAD ID -",I12)')threadID do i=1,10 if (thread_r(i).id.eq.threadID) then index = i end if end do write(6,*) " START SERVER PART OF THREAD ---- ",index," Nb ",thread_r(index).ThreadHandle1, " ID ",thread_r(index).id ellapsed_time = SECNDS(0.0) ! initialized ellapsed_time t = SECNDS(0.0) write(6,*) "reset t ",t,' for thread ',thread_r(index).id counter = 0 FOREVER = .TRUE. connection = thread_r(index).connection DO WHILE (FOREVER) if (thread_r(index).STOPTH) THEN write(6,'(" NS ",i6," END REQUESTED -",I6)')threadID,counter FOREVER = .FALSE. end if if (INT(SECNDS(ellapsed_time))>WDG_DELAY_S) THEN write(6,'(" NS ",i6," ID ",i6," counter -",I6," connection ",i6)')index,threadID,counter,connection counter = counter + 1 ellapsed_time = SECNDS(0.0) ! reset ellapsed_time end if delta = SECNDS(t) if (delta.gt.thread_r(index).duration) then write(6,*) "got t ",delta,' for thread ',thread_r(index).id,' index',index,' > ',thread_r(index).duration FOREVER = .FALSE. end if call sleepqq(100) status = ReceiveMsg(connection,clientServerMessage%buffer,1048,rcvsize) IF (status .EQ. SUCCESS) THEN 100 continue if (rcvsize.le.SIZEOF(clientServerMessage)) then write(6,'(" NS ReceiveMsg - ",A6)')clientServerMessage%buffer(6:8) if (clientServerMessage%buffer(6:8).eq.'000') then WRITE (clientServerMessage%buffer,'(A9,I4.4)') '0013/100/',5000 status = SendMsg(connection,clientServerMessage%buffer(1:13),13) write(6,'(" NS SendMsg - ",I6)')status end if else end if ELSE IF (status .EQ. 10035) THEN !nothing to read cdg WRITE(*, *) 'nothing to read - ', status ELSE IF (status.EQ.CONNECTION_DROPPED_BY_REMOTE_PARTY) THEN write(6,'(" NS ReceiveMsg - Connection dropped by remote party")') write(6,'(" NS ",i6," ID ",i6," STOP SERVEUR - connection ",i6)')index,threadID,connection FOREVER = .FALSE. ELSE write(6,'(" NS ReceiveMsg - ",I6)')status ENDIF cdg status = closesocket(connection) cdg status = closesocket(listener) cdg write(6,'(" NS closesocket - ",I6)')status cdg connection = SOCKET_ERROR ENDIF END DO write(6,*) " END OF THREAD soon---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id call sleepqq(10000) write(6,*) " END OF THREAD now ---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id Thread_Proc2 = 0 call EXITTHREAD(0) end function integer(4) function Thread_Proc1(lpThreadParameter) !DEC$ ATTRIBUTES STDCALL, ALIAS:"_thread_proc1" :: Thread_Proc1 USE IFPORT USE IFCORE USE IFMT implicit none integer(INT_PTR_KIND()) lpThreadParameter INTEGER*4 threadnb POINTER(parg,threadnb) INTEGER*4 i INTEGER*4 threadID,index real*4 t structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r parg = lpThreadParameter write(6,*) "The value of the Thread_Proc1 argument is ",threadnb threadID = GetCurrentThreadId() do i=1,10 if (thread_r(i).id.eq.threadID) then index = i end if end do write(6,*) " START SERVER PART OF THREAD ---- ",index," Nb ",thread_r(index).ThreadHandle1, " ID ",thread_r(index).id CALL NEWSERVEUR1(index) write(6,*) " END OF THREAD soon---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id call sleepqq(10000) write(6,*) " END OF THREAD now ---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id Thread_Proc1 = 0 call ExitThread(0) end function SUBROUTINE NEWSERVEUR1(threadnb) use psapi USE IFPORT USE KERNEL32 USE WS2_32 implicit none logical*1 FOREVER /.TRUE./ real*4 ellapsed_time INTEGER*4 exit_code,threadnb real*4 t,delta integer(DWORD) :: threadID integer*4 duration,index structure /threadinfo/ integer(INT_PTR_KIND()) ThreadHandle1 integer*4 id integer*4 duration integer*4 listener integer*4 connection CHARACTER*16 host INTEGER*2 port logical*1 stopth logical*1 startwork logical*1 THREAD_START_REQ end structure record /threadinfo/thread_r(10) common/thread/thread_r !------------------------------------------------------------------------- ! CODE !------------------------------------------------------------------------- cdg threadID = GetCurrentThreadId() cdg do i=1,3 cdg if (thread_r(i).id.eq.threadID) then duration = thread_r(threadnb).duration write(6,*) "Assign duration ",duration,' for thread ',thread_r(threadnb).id t = SECNDS(0.0) write(6,*) "reset t ",t,' for thread ',thread_r(threadnb).id index = threadnb cdg end if cdg end do DO WHILE (.TRUE.) call sleepqq(200) delta = SECNDS(t) if (delta.gt.duration) then write(6,*) "got t ",delta,' for thread ',thread_r(threadnb).id,' index',index,' > ',duration goto 9999 end if ENDDO 9999 CONTINUE write(6,*) "Stop thread ",threadnb,'ID ',thread_r(threadnb).id RETURN END !> @brief FUNCTION SendMsg. !> @details send message via TCP/IP communication. !> @return none INTEGER FUNCTION SendMsg(connection, buffer, size) USE WS2_32 IMPLICIT NONE INTEGER SUCCESS PARAMETER(SUCCESS = 0) INTEGER connection INTEGER size CHARACTER*(size) buffer INTEGER bytesSent INTEGER bytesSentTotal INTEGER status SendMsg = SUCCESS bytesSent = 0 bytesSentTotal = 0 DO WHILE (bytesSentTotal < size) bytesSent = send(connection, + buffer(bytesSentTotal + 1:bytesSentTotal + 1), + (size - bytesSentTotal), + 0) IF (bytesSent .EQ. SOCKET_ERROR) THEN SendMsg = WSAGetLastError() RETURN ENDIF bytesSentTotal = bytesSentTotal + bytesSent END DO RETURN END !> @brief FUNCTION ReceiveMsg. !> @details Receive message via TCP/IP communication. !> @return none INTEGER FUNCTION ReceiveMsg(connection, buffer, size,rcvsize) USE WS2_32 IMPLICIT NONE INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY INTEGER SUCCESS PARAMETER(CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0) INTEGER connection INTEGER size,rcvsize CHARACTER*(size) buffer INTEGER bytesReceived INTEGER bytesReceivedTotal INTEGER status ReceiveMsg = SUCCESS bytesReceived = 0 bytesReceivedTotal = 0 DO WHILE (bytesReceivedTotal < size) bytesReceived = recv(connection, + buffer(bytesReceivedTotal + 1:bytesReceivedTotal + 1), + (size - bytesReceivedTotal), + 0) IF (bytesReceived .EQ. SOCKET_ERROR) THEN ReceiveMsg = WSAGetLastError() rcvsize = bytesReceivedTotal If ((ReceiveMsg.eq.10035).and.(bytesReceivedTotal.gt.0)) then ReceiveMsg = 0 end if RETURN ELSEIF (bytesReceived .EQ. 0) THEN ReceiveMsg = CONNECTION_DROPPED_BY_REMOTE_PARTY RETURN ENDIF bytesReceivedTotal = bytesReceivedTotal + bytesReceived END DO RETURN END
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some of your local variables in your procedures implicitly have the SAVE attribute because they have initialization - in particular the variable FOREVER, which controls whether the thread procedure continues looping or not. SAVE'd variables are typically implemented using static storage (rather than being stored on the stack), because static storage is used that variable will be shared amongst all threads, as soon as one thread sets it to .FALSE., all threads will exit.
The style of the code makes it very difficult to follow.
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You should not call ExitThread to terminate the thread. Instead, you should return from the thread entry point function. (IOW have error returns from any nested functions called from your thread startup/entry point function.
Jim Dempsey
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You also need to make sure that you compile with the necessary options to make your thread procedures re-entrant (perhaps also add RECURSIVE).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks for the answer but:
1 - making the recursive option do not change anything on the behavior, with or without this option if switch = 1 stopping one thread do not stop the other witch is correct, but with swicth=2, stopping one thread do stop all the thread witch is not correct.
2 - I do not understand how to stop thread whitout using exitthread function, As far as I understand this is the normal way to stop thread when we whan that the thread do not exist anymore.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some of your local variables in your procedures implicitly have the SAVE attribute because they have initialization - in particular the variable FOREVER, which controls whether the thread procedure continues looping or not. SAVE'd variables are typically implemented using static storage (rather than being stored on the stack), because static storage is used that variable will be shared amongst all threads, as soon as one thread sets it to .FALSE., all threads will exit.
The style of the code makes it very difficult to follow.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks IanH, that was the point, removing the initialization of FOREVER do solve my Pb.
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page