- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to write a tcp/ip server that create new thread on each new connection.
The problem I'm facing is that the thread are correctly created, but as soon as one thread is requested to stop (upon connection drop for exemple) all thread are stopped.
The problem do not occur if the thread have nothing to do with communication.
here below is my source code, if the variable switch in routine DC_DEMARRE_SERVEUR have for value 1 then the thread created have nothing to do with communication and created thread stop automaticaly after a certain period, keeping the other thread alive.
If the variable switch in routine DC_DEMARRE_SERVEUR have for value 2 then the thread created are receiving message via TCP/IP and for test also stop after a certain period, but in that case, the first thread being stop, will stop all the thread!! I can not figure out why.
Could somebody help me?
Thanks
!> @brief Program .
!> @details .
!> @return none
PROGRAM MTCOM
USE KERNEL32
USE WS2_32
implicit none
!-------------------------------------------------------------------------
! INCLUDED FILES
!-------------------------------------------------------------------------
!-------------------------------------------------------------------------
! LOCAL DATA
!-------------------------------------------------------------------------
logical*1 FOREVER /.TRUE./
INTEGER (KIND = 4) :: WDG_DELAY_S = 10 ! activate watchdog every seconds
real*4 ellapsed_time
integer*2 portDisponible
INTEGER*4 exit_code
integer*4 i,status
INTEGER*4 rcvsize
INTEGER*4 BUFFER
logical*1 DC_DEMARRE_SERVEUR
cx-----------------------------------
INTEGER WINSOCK_V2_2
INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY
INTEGER SUCCESS
PARAMETER(WINSOCK_V2_2 = X'202',CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0)
TYPE(T_SOCKADDR_IN) listenerInfo,listener1Info
TYPE(T_SOCKADDR_IN) connectionInfo
TYPE(T_WSADATA) wsaInfo
TYPE(T_FD_SET) writeset
TYPE(T_FD_SET) readset
TYPE(T_timeval) timeset
TYPE T_CLIENT_SERVER_MESSAGE
SEQUENCE
UNION
MAP
INTEGER Code
INTEGER dest
INTEGER msgSize
UNION
MAP
CHARACTER*16 dest_buffer
END MAP
MAP
INTEGER dest_code
CHARACTER*12 msg_content
END MAP
END UNION
END MAP
MAP
CHARACTER*1048 buffer
END MAP
END UNION
END TYPE
TYPE(T_CLIENT_SERVER_MESSAGE) clientServerMessage
INTEGER connection,nb_connection
INTEGER*4 connection_list(10) /10*-1/
INTEGER ioctlsocket_bck /-1/
INTEGER listener,NonBlock
CHARACTER*16 host
INTEGER*2 port
INTEGER SendMsg
INTEGER ReceiveMsg
INTEGER fd_zero(64)
data fd_zero /64*0/
integer*4 block_t
integer*4 nb_thread /0/
logical*1 IS_THREAD_SERVEUR_ALIVE
logical*1 connection_found
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
!-------------------------------------------------------------------------
! CODE
!-------------------------------------------------------------------------
!initialization :
! Winsock v2. not initialized
listener = INVALID_SOCKET
connection = SOCKET_ERROR
host = '127.0.0.1'
port = 5000
ellapsed_time = SECNDS(0.0) ! initialized ellapsed_time
! Initialize Winsock v2. if needed
10 IF (listener .EQ. INVALID_SOCKET) THEN
status = WSAStartup(WINSOCK_V2_2, wsaInfo)
IF (status .NE. SUCCESS) THEN
write(6,'(" WSAStartup error -",I6)')status
ENDIF
listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
IF (listener .EQ. INVALID_SOCKET) THEN
write(6,'(" socket error - ",I6)')INVALID_SOCKET
status = WSACleanup()
GOTO 10
ELSE
listenerInfo%sin_family = AF_INET
listenerInfo%sin_port = htons(port)
listenerInfo%sin_addr%s_addr = inet_addr(host(1:LEN_TRIM(host)))
status = bind(listener,%REF(listenerInfo),SIZEOF(listenerInfo))
IF (status .EQ. SOCKET_ERROR) THEN
write(6,'(" bind error- - ",I6)')WSAGetLastError()
status = WSACleanup()
listener = INVALID_SOCKET
connection = SOCKET_ERROR
GOTO 10
ENDIF
ENDIF
ENDIF
IF (listener .NE. INVALID_SOCKET) THEN
status = listen(listener, 5)
IF (status .EQ. SOCKET_ERROR) THEN
write(6,'(" listen error- - ",I6)')WSAGetLastError()
status = WSACleanup()
listener = INVALID_SOCKET
connection = SOCKET_ERROR
GOTO 10
ENDIF
! Change the socket mode on the listening socket from blocking to
! non-block so the application will not block waiting for requests
NonBlock = 1
status = ioctlsocket(listener, FIONBIO,%ref(NonBlock))
if (status .EQ. SOCKET_ERROR) THEN
ioctlsocket_bck = status
write(6,'(" ioctlsocket failed with error- - ",I6)')WSAGetLastError()
ELSE
if (ioctlsocket_bck .ne. status) then
ioctlsocket_bck = status
write(6,'(" ioctlsocket is OK!")')
readset%fd_count = 0
readset%fd_array = fd_zero
writeset%fd_count =0
writeset%fd_array = fd_zero
readset%fd_count = 1
readset%fd_array(1) = listener
timeset%tv_sec = 0
timeset%tv_usec= 100000
status = select(0,%ref(readset),%ref(writeset),0,%ref(timeset))
if (status .EQ. SOCKET_ERROR) THEN
write(6,'(" Select failed with error- - ",I6)')WSAGetLastError()
end if
end if
nb_connection = 0
DO WHILE (nb_connection.le.10)
connection = accept(listener,%REF(connectionInfo),0)
if (connection .gt. 0 ) then
connection_found =.false.
do i=1,10
if ((connection.eq.connection_list(i)).and.(connection_list(i).ne.-1)) then
connection_found = .true.
end if
end do
if (.not.connection_found) then
write(6,'(" New connection - ",I6," on list ",i6)')connection,listener
nb_connection =nb_connection+1
thread_r(nb_connection).listener = listener
thread_r(nb_connection).connection = connection
thread_r(nb_connection).duration = nb_connection*25
if (.not.DC_DEMARRE_SERVEUR(nb_connection)) then
write(6,'(" Fail to start thread nb ",i6)')nb_connection
end if
end if
end if
END DO
ENDIF
ENDIF
END
logical*1 FUNCTION DC_DEMARRE_SERVEUR(threadnb)
USE IFPORT
USE IFCORE
use IFMT
implicit none
INTERFACE
integer(4) FUNCTION Thread_Proc2(lpThreadParameter)
!DEC$ ATTRIBUTES STDCALL,ALIAS:"_thread_proc2" :: Thread_Proc2
integer(INT_PTR_KIND()) lpThreadParameter
END FUNCTION
END INTERFACE
INTERFACE
integer(4) FUNCTION Thread_Proc1(lpThreadParameter)
!DEC$ ATTRIBUTES STDCALL,ALIAS:"_thread_proc1" :: Thread_Proc1
integer(INT_PTR_KIND()) lpThreadParameter
END FUNCTION
END INTERFACE
CHARACTER*1 kbd
INTEGER(KIND=INT_PTR_KIND( )) handle,handle1
logical*4 stopth,startwork
integer*4 exit_code,i,j,iretlog,iretint,timedate
INTEGER*4 threadnb,switch
integer(INT_PTR_KIND()) ThreadHandle1
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
switch = 2
thread_r(threadnb).stopth = .false.
thread_r(threadnb).startwork =.false.
thread_r(threadnb).THREAD_START_REQ =.true.
if (switch.eq.1) then
ThreadHandle1 = CreateThread(0,0,Thread_Proc1,loc(threadnb),CREATE_SUSPENDED,thread_r(threadnb).id)
else
ThreadHandle1 = CreateThread(0,0,Thread_Proc2,loc(threadnb),CREATE_SUSPENDED,thread_r(threadnb).id)
end if
iretlog = SetThreadPriority(ThreadHandle1,THREAD_PRIORITY_BELOW_NORMAL )
iretint = ResumeThread(ThreadHandle1)
WRITE(6,*)" THREAD ",threadnb," Nb ",ThreadHandle1," ID ",thread_r(threadnb).id," CREATED"
i=0
iretlog = GetExitCodeThread( ThreadHandle1, exit_code )
if(exit_code.eq.STILL_ACTIVE) then
thread_r(threadnb).ThreadHandle1 = ThreadHandle1
DC_DEMARRE_SERVEUR = .TRUE.
else
DC_DEMARRE_SERVEUR = .FALSE.
endif
END FUNCTION DC_DEMARRE_SERVEUR
logical*1 FUNCTION IS_THREAD_SERVEUR_ALIVE(threadnb)
USE IFPORT
USE IFCORE
use IFMT
implicit none
integer*4 exit_code,iretlog
INTEGER*4 threadnb
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
iretlog = GetExitCodeThread( thread_r(threadnb).ThreadHandle1, exit_code )
if(exit_code.eq.STILL_ACTIVE) then
IS_THREAD_SERVEUR_ALIVE = .TRUE.
else
IS_THREAD_SERVEUR_ALIVE = .FALSE.
endif
END FUNCTION IS_THREAD_SERVEUR_ALIVE
integer(4) function Thread_Proc2(lpThreadParameter)
!DEC$ ATTRIBUTES STDCALL, ALIAS:"_thread_proc2" :: Thread_Proc2
USE IFCORE
USE IFMT
USE WS2_32
implicit none
integer(INT_PTR_KIND()) lpThreadParameter
INTEGER*4 threadnb
POINTER(parg,threadnb)
INTEGER*4 i
INTEGER*4 threadID,index
real*4 t,delta
logical*1 FOREVER /.TRUE./
INTEGER (KIND = 4) :: WDG_DELAY_S = 5 ! activate watchdog every seconds
real*4 ellapsed_time
integer*4 status
integer*4 rcvsize
INTEGER*4 BUFFER
cx-----------------------------------
INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY
INTEGER SUCCESS
PARAMETER(CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0)
TYPE T_CLIENT_SERVER_MESSAGE
SEQUENCE
UNION
MAP
INTEGER Code
INTEGER dest
INTEGER msgSize
UNION
MAP
CHARACTER*16 dest_buffer
END MAP
MAP
INTEGER dest_code
CHARACTER*12 msg_content
END MAP
END UNION
END MAP
MAP
CHARACTER*1048 buffer
END MAP
END UNION
END TYPE
TYPE(T_CLIENT_SERVER_MESSAGE) clientServerMessage
INTEGER connection
INTEGER SendMsg
INTEGER ReceiveMsg
integer*4 counter
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
parg = lpThreadParameter
write(6,*) "The value of the Thread_Proc1 argument is ",threadnb
threadID = GetCurrentThreadId()
write(6,'(" NS THREAD ID -",I12)')threadID
do i=1,10
if (thread_r(i).id.eq.threadID) then
index = i
end if
end do
write(6,*) " START SERVER PART OF THREAD ---- ",index," Nb ",thread_r(index).ThreadHandle1, " ID ",thread_r(index).id
ellapsed_time = SECNDS(0.0) ! initialized ellapsed_time
t = SECNDS(0.0)
write(6,*) "reset t ",t,' for thread ',thread_r(index).id
counter = 0
FOREVER = .TRUE.
connection = thread_r(index).connection
DO WHILE (FOREVER)
if (thread_r(index).STOPTH) THEN
write(6,'(" NS ",i6," END REQUESTED -",I6)')threadID,counter
FOREVER = .FALSE.
end if
if (INT(SECNDS(ellapsed_time))>WDG_DELAY_S) THEN
write(6,'(" NS ",i6," ID ",i6," counter -",I6," connection ",i6)')index,threadID,counter,connection
counter = counter + 1
ellapsed_time = SECNDS(0.0) ! reset ellapsed_time
end if
delta = SECNDS(t)
if (delta.gt.thread_r(index).duration) then
write(6,*) "got t ",delta,' for thread ',thread_r(index).id,' index',index,' > ',thread_r(index).duration
FOREVER = .FALSE.
end if
call sleepqq(100)
status = ReceiveMsg(connection,clientServerMessage%buffer,1048,rcvsize)
IF (status .EQ. SUCCESS) THEN
100 continue
if (rcvsize.le.SIZEOF(clientServerMessage)) then
write(6,'(" NS ReceiveMsg - ",A6)')clientServerMessage%buffer(6:8)
if (clientServerMessage%buffer(6:8).eq.'000') then
WRITE (clientServerMessage%buffer,'(A9,I4.4)') '0013/100/',5000
status = SendMsg(connection,clientServerMessage%buffer(1:13),13)
write(6,'(" NS SendMsg - ",I6)')status
end if
else
end if
ELSE IF (status .EQ. 10035) THEN !nothing to read
cdg WRITE(*, *) 'nothing to read - ', status
ELSE
IF (status.EQ.CONNECTION_DROPPED_BY_REMOTE_PARTY) THEN
write(6,'(" NS ReceiveMsg - Connection dropped by remote party")')
write(6,'(" NS ",i6," ID ",i6," STOP SERVEUR - connection ",i6)')index,threadID,connection
FOREVER = .FALSE.
ELSE
write(6,'(" NS ReceiveMsg - ",I6)')status
ENDIF
cdg status = closesocket(connection)
cdg status = closesocket(listener)
cdg write(6,'(" NS closesocket - ",I6)')status
cdg connection = SOCKET_ERROR
ENDIF
END DO
write(6,*) " END OF THREAD soon---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id
call sleepqq(10000)
write(6,*) " END OF THREAD now ---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id
Thread_Proc2 = 0
call EXITTHREAD(0)
end function
integer(4) function Thread_Proc1(lpThreadParameter)
!DEC$ ATTRIBUTES STDCALL, ALIAS:"_thread_proc1" :: Thread_Proc1
USE IFPORT
USE IFCORE
USE IFMT
implicit none
integer(INT_PTR_KIND()) lpThreadParameter
INTEGER*4 threadnb
POINTER(parg,threadnb)
INTEGER*4 i
INTEGER*4 threadID,index
real*4 t
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
parg = lpThreadParameter
write(6,*) "The value of the Thread_Proc1 argument is ",threadnb
threadID = GetCurrentThreadId()
do i=1,10
if (thread_r(i).id.eq.threadID) then
index = i
end if
end do
write(6,*) " START SERVER PART OF THREAD ---- ",index," Nb ",thread_r(index).ThreadHandle1, " ID ",thread_r(index).id
CALL NEWSERVEUR1(index)
write(6,*) " END OF THREAD soon---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id
call sleepqq(10000)
write(6,*) " END OF THREAD now ---- ",index," Nb ",thread_r(index).ThreadHandle1," ID",thread_r(index).id
Thread_Proc1 = 0
call ExitThread(0)
end function
SUBROUTINE NEWSERVEUR1(threadnb)
use psapi
USE IFPORT
USE KERNEL32
USE WS2_32
implicit none
logical*1 FOREVER /.TRUE./
real*4 ellapsed_time
INTEGER*4 exit_code,threadnb
real*4 t,delta
integer(DWORD) :: threadID
integer*4 duration,index
structure /threadinfo/
integer(INT_PTR_KIND()) ThreadHandle1
integer*4 id
integer*4 duration
integer*4 listener
integer*4 connection
CHARACTER*16 host
INTEGER*2 port
logical*1 stopth
logical*1 startwork
logical*1 THREAD_START_REQ
end structure
record /threadinfo/thread_r(10)
common/thread/thread_r
!-------------------------------------------------------------------------
! CODE
!-------------------------------------------------------------------------
cdg threadID = GetCurrentThreadId()
cdg do i=1,3
cdg if (thread_r(i).id.eq.threadID) then
duration = thread_r(threadnb).duration
write(6,*) "Assign duration ",duration,' for thread ',thread_r(threadnb).id
t = SECNDS(0.0)
write(6,*) "reset t ",t,' for thread ',thread_r(threadnb).id
index = threadnb
cdg end if
cdg end do
DO WHILE (.TRUE.)
call sleepqq(200)
delta = SECNDS(t)
if (delta.gt.duration) then
write(6,*) "got t ",delta,' for thread ',thread_r(threadnb).id,' index',index,' > ',duration
goto 9999
end if
ENDDO
9999 CONTINUE
write(6,*) "Stop thread ",threadnb,'ID ',thread_r(threadnb).id
RETURN
END
!> @brief FUNCTION SendMsg.
!> @details send message via TCP/IP communication.
!> @return none
INTEGER FUNCTION SendMsg(connection, buffer, size)
USE WS2_32
IMPLICIT NONE
INTEGER SUCCESS
PARAMETER(SUCCESS = 0)
INTEGER connection
INTEGER size
CHARACTER*(size) buffer
INTEGER bytesSent
INTEGER bytesSentTotal
INTEGER status
SendMsg = SUCCESS
bytesSent = 0
bytesSentTotal = 0
DO WHILE (bytesSentTotal < size)
bytesSent = send(connection,
+ buffer(bytesSentTotal + 1:bytesSentTotal + 1),
+ (size - bytesSentTotal),
+ 0)
IF (bytesSent .EQ. SOCKET_ERROR) THEN
SendMsg = WSAGetLastError()
RETURN
ENDIF
bytesSentTotal = bytesSentTotal + bytesSent
END DO
RETURN
END
!> @brief FUNCTION ReceiveMsg.
!> @details Receive message via TCP/IP communication.
!> @return none
INTEGER FUNCTION ReceiveMsg(connection, buffer, size,rcvsize)
USE WS2_32
IMPLICIT NONE
INTEGER CONNECTION_DROPPED_BY_REMOTE_PARTY
INTEGER SUCCESS
PARAMETER(CONNECTION_DROPPED_BY_REMOTE_PARTY = X'05050505',SUCCESS = 0)
INTEGER connection
INTEGER size,rcvsize
CHARACTER*(size) buffer
INTEGER bytesReceived
INTEGER bytesReceivedTotal
INTEGER status
ReceiveMsg = SUCCESS
bytesReceived = 0
bytesReceivedTotal = 0
DO WHILE (bytesReceivedTotal < size)
bytesReceived = recv(connection,
+ buffer(bytesReceivedTotal + 1:bytesReceivedTotal + 1),
+ (size - bytesReceivedTotal),
+ 0)
IF (bytesReceived .EQ. SOCKET_ERROR) THEN
ReceiveMsg = WSAGetLastError()
rcvsize = bytesReceivedTotal
If ((ReceiveMsg.eq.10035).and.(bytesReceivedTotal.gt.0)) then
ReceiveMsg = 0
end if
RETURN
ELSEIF (bytesReceived .EQ. 0) THEN
ReceiveMsg = CONNECTION_DROPPED_BY_REMOTE_PARTY
RETURN
ENDIF
bytesReceivedTotal = bytesReceivedTotal + bytesReceived
END DO
RETURN
END
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some of your local variables in your procedures implicitly have the SAVE attribute because they have initialization - in particular the variable FOREVER, which controls whether the thread procedure continues looping or not. SAVE'd variables are typically implemented using static storage (rather than being stored on the stack), because static storage is used that variable will be shared amongst all threads, as soon as one thread sets it to .FALSE., all threads will exit.
The style of the code makes it very difficult to follow.
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You should not call ExitThread to terminate the thread. Instead, you should return from the thread entry point function. (IOW have error returns from any nested functions called from your thread startup/entry point function.
Jim Dempsey
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You also need to make sure that you compile with the necessary options to make your thread procedures re-entrant (perhaps also add RECURSIVE).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks for the answer but:
1 - making the recursive option do not change anything on the behavior, with or without this option if switch = 1 stopping one thread do not stop the other witch is correct, but with swicth=2, stopping one thread do stop all the thread witch is not correct.
2 - I do not understand how to stop thread whitout using exitthread function, As far as I understand this is the normal way to stop thread when we whan that the thread do not exist anymore.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Some of your local variables in your procedures implicitly have the SAVE attribute because they have initialization - in particular the variable FOREVER, which controls whether the thread procedure continues looping or not. SAVE'd variables are typically implemented using static storage (rather than being stored on the stack), because static storage is used that variable will be shared amongst all threads, as soon as one thread sets it to .FALSE., all threads will exit.
The style of the code makes it very difficult to follow.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks IanH, that was the point, removing the initialization of FOREVER do solve my Pb.
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page