INTELLIGENT WORK FORUMS
FOR COMPUTER PROFESSIONALS

Log In

Come Join Us!

Are you a
Computer / IT professional?
Join Tek-Tips Forums!
  • Talk With Other Members
  • Be Notified Of Responses
    To Your Posts
  • Keyword Search
  • One-Click Access To Your
    Favorite Forums
  • Automated Signatures
    On Your Posts
  • Best Of All, It's Free!

*Tek-Tips's functionality depends on members receiving e-mail. By joining you are opting in to receive e-mail.

Posting Guidelines

Promoting, selling, recruiting, coursework and thesis posting is forbidden.

Jobs

Matrix TCL

Sample script for TCL socket load distribution across many CPUs / hosts by jbrearley
Posted: 26 Nov 12

Here is a demo script for others to reuse / learn from. The server process hands out work assignments to multiple children process to do load balancing across multiple CPUs / hosts. The children process in this sample script dont do any real work, but occasionally create an error to demonstrate the error handling and recovery. Enjoy!

CODE

# Shows how a server process can hand out task assignments 
# to related client processes. This is useful for distributing
# tasks of varying sizes amongst multiple CPUs on the same 
# host, or possibly multiple CPUs across many separate hosts.

# When a client is finished its current assignment, it will 
# ask the server for the next available task. This allows the
# server to distribute a group of tasks to make full use of
# the CPUs available. This avoids the issue of overloading
# one CPU while other CPUs are available but idle. The result
# is that a group of tasks is completed in the shorted possible
# elapsed real time.

# Written by John Brearley  Nov 2012.

# NB: On WinVista, early versions of TCL8.6 had issues of server
# socket not getting event notification of new client socket
# from same host. Fixed in TCL8.6B3. Never an issue on WinXP.

# Not yet tested on Linux or Win7 or higher.

# Parameters user might wish to customize
set child_timeout_sec 15 ;# Maximum time in seconds a child process is allowed for processing a file
set max_children 4 ;# maximum parallel running child processes allowed
set max_retry 3 ;# maximum times to try recovering from a error
set port_start 10500 ;# TCP port range to use
set port_end 10600

# Define list of dummy files to be processed by children processes.
# In a production script, you probably get list of filenames from
# a directory calling token.
# Files will be handed out for processing in the order specified.
# In case of error, it is assumed that a file can be reprocessed
# by itself, with no dependancies on other results.
set work_list "f1 f3 f6 f9 f2 f11 f12 f0 f4 f20 f19 f18 f30 f100 g1 g3 g6 g9 g2 g11 g12 g0 g4 g20 g19 g18 g30 g100 "

# ================ Common procs ==============================

# ================ check_child_timeout =======================
# If a child process is taking too long to process a file,
# closes the child socket to trigger reassignment of that file.
#
# Calling parameters: none
#
# Returns: null
# ============================================================
proc check_child_timeout { } {

   # Look for files started, but not finished yet.
   # puts "check_child_timeout processing"
   set now_sec [clock seconds]
   set timeout 0 ;# count number of timeouts this run
   for {set i 1} {$i <= $::work_max} {incr i} {
      set file $::work($i,file)
      set start $::work($i,start)
      set finish $::work($i,finish)
      # puts "check_child_timeout i=$i file=$file start=$start finish=$finish"
      # Skip files not started yet or already finished.
      if {$start == "" || $finish != ""} {
         # puts "check_child_timeout skipping i=$i file=$file start=$start finish=$finish"
         continue
      }

      # Has file been processing too long?
      set run_time [expr $now_sec - $start]
      if {$run_time > $::child_timeout_sec} {
         # An error will get logged when the socket is closed and cleaned up.
         # We DONT log an error here, so that there is only one error generated
         # for a single timeout, not two.
         incr timeout
         puts "check_child_timeout i=$i file=$file has run for $run_time seconds, taking recovery action."
         set ch_server $::work($i,ch_server)
         close_server_socket $ch_server TIME
      } else {
         # puts "check_child_timeout i=$i file=$file run_time=$run_time seconds OK"
      }
   }

   # May need to update the server.
   if {$timeout != 0} {
      update_server
   }
   # puts "check_child_timeout done"
   return
}


# ================ close_server_socket =======================
# Closes server socket, updates status with reason text.
#
# Calling parameters: channel reason
#
# Returns: null
# ============================================================
proc close_server_socket {ch reason} {

   # Close the socket, update the status.
   set reason [string toupper $reason]
   puts "close_server_socket ch=$ch reason=$reason"
   set catch_resp [catch "close $ch" catch_msg]
   if {$catch_resp == 0} {
      set ::socket_info($ch,status) "$reason Close OK"
   } else {
      set ::socket_info($ch,status) "$reason Close ERR"
      incr ::errors
      puts "close_server_socket ERROR: close $ch got: $catch_msg"
   }
   set ::socket_info($ch,finish) [clock seconds]

   # Need to clean up work array so any unfinished files associated
   # with this channel will be reassigned to new children.
   for {set i 1} {$i <= $::work_max} {incr i} {
      set ch_server $::work($i,ch_server)
      if {$ch != $ch_server} {
         # file is associated with different channel
         continue 
      }

      # This file is associated with the channel just closed.
      set finish $::work($i,finish)
      if {$finish != ""} {
         # file was completed OK
         continue 
      }

      set file $::work($i,file)
      set child_num $::work($i,child_num)
      set child_pid $::work($i,child_pid)
      puts "close_server_socket ERROR: $reason clean up for i=$i file=$file ch=$ch child_num=$child_num child_pid=$child_pid"
      incr ::errors
      # NB: DONT reset the try counter!
      set ::work($i,child_num) ""
      set ::work($i,child_pid) ""
      set ::work($i,ch_child) ""
      set ::work($i,ch_server) ""
      set ::work($i,start) ""
      set ::work($i,finish) ""
      set ::work($i,errors) 0
      set ::work($i,warnings) 0
   }
   return
}

# ================ display_socket_info =======================
# Displays available socket info collected by the server process.
#
# Calling parameters: server_port
#
# Returns: null
# ============================================================
proc display_socket_info {port} {

   # Show column titles
   puts "\ndisplay_socket_info server_port: $port"
   set f1 "%-9s %-13s %5s %-8s %5s %5s %-13s %5s %4s"
   puts "[format $f1 SrvSock ChHost ChPrt ChSock ChNum ChPID Status Delay Time]"
   puts "[format $f1 ======= ====== ===== ====== ===== ===== ====== ===== ====]"

   # Format data for each server channel.
   # Server channel is extracted from the chan,addr key pair
   set names [array names ::socket_info]
   set names [lsort $names]
   foreach item $names {
      # puts "display_socket_info item=$item"
      if {[regexp -nocase {(.*),addr} $item - chan]} {
         # puts "display_socket_info chan=$chan"
         set addr $::socket_info($chan,addr)
         set addr [lindex [split $addr ":"] end] ;# shorten IPV6 address
         set port $::socket_info($chan,port)
         set status $::socket_info($chan,status)
         set delay $::socket_info($chan,delay)
         if {$delay != "-"} {
            append delay ms
         }
         set ch_child $::socket_info($chan,ch_child)
         set child_num $::socket_info($chan,child_num)
         set child_pid $::socket_info($chan,child_pid)
         set start $::socket_info($chan,start)
         set finish $::socket_info($chan,finish)
         # set finish "" ;# test code
         if {[regexp {^\d+$} $start] && [regexp {^\d+$} $finish]} {
            set time [expr $finish - $start]
            append time s
         } else {
            set time "-"
         }
         puts "[format "$f1" $chan $addr $port $ch_child $child_num $child_pid $status $delay $time]"
      }
   }
   return
}

# ================ display_work_info =========================
# Displays available work info collected by the server process.
#
# Calling parameters: none
#
# Returns: null
# ============================================================
proc display_work_info { } {

   # Show column titles
   puts "\ndisplay_work_info"
   set f1 "%-16s %3s %5s %5s %-8s %-8s %4s %3s %4s"
   puts "[format $f1 Filename Try ChNum ChPID ChSock SrvSock Time Err Warn]"
   puts "[format $f1 ======== === ===== ===== ====== ======= ==== === ====]"

   # Format data for work item processed.
   for {set i 1} {$i <= $::work_max} {incr i} {
      set file $::work($i,file)
      set try $::work($i,try)
      set child_num $::work($i,child_num)
      set child_pid $::work($i,child_pid)
      set ch_child $::work($i,ch_child)
      set ch_server $::work($i,ch_server)
      set start $::work($i,start)
      set finish $::work($i,finish)
      if {[regexp {^\d+$} $start] && [regexp {^\d+$} $finish]} {
         set time [expr $finish - $start]
         append time s
      } else {
         set time "FAIL"
      }
      set errors $::work($i,errors)
      set warnings $::work($i,warnings)
      puts "[format "$f1" $file $try $child_num $child_pid $ch_child $ch_server $time $errors $warnings]"
   }
   return
}

# ================ launch_children ===========================
# Launch the children as independantly running parallel processes.
#
# Calling parameters: new_children host port
# new_children is an integer, the total number of new children
# to be created.
#
# Returns: null or throws error
# ============================================================
proc launch_children {new_children host port} {

   # Counter keeps track of how many times we launch children.
   if {![info exists ::child_launch_cnt]} {
      set ::child_launch_cnt 0 
   }
   incr ::child_launch_cnt

   # Counter is used to keep track of child numbers used.
   # The stdout information is easier to read when you keep
   # the child numbers unique. So we try to manage this value
   # automatically.
   if {![info exists ::current_child_max]} {
      # For server on remote host, we start child numbers at
      # a higher value. May or may not work out to be unique
      # depending on how many remote hosts access the same
      # server process
      if {$::host == $::local_host} {
         set ::current_child_max 0
      } else {
         set ::current_child_max [pid]
      }
   }
   incr ::current_child_max

   # Limit the number of new children launched at one time.
   if {$new_children > $::max_children} {
      set new_children $::max_children
   }
   set max_loop [expr $::current_child_max + $new_children - 1]
   puts "launch_children launching children $::current_child_max - $max_loop to $host:$port"

   # Launch the children
   for {set i $::current_child_max} {$i <= $max_loop} {incr i} {
      # $i is used as the child number to distinguish processes from each other.
      set ::current_child_max $i
      set catch_resp [catch "set pid \[eval exec tclsh $::argv0 $host $port $i &\]" catch_msg]
      if {$catch_resp == 0} {
         puts "launch_children launched child=$i pid=$pid"
      } else {
         error "launch_children ERROR: launching child=$i: $catch_msg"
      }
   }
   return
}

# ================ new_client ================================
# Procedure is called by vwait when new incoming client socket
# is received.
#
# Calling parameters: 3 arguments are supplied by vwait routine
#
# Returns: null
# ============================================================
proc new_client {new_ch new_addr new_port} {

   # Save newly assigned parameters
   save_new_client $new_ch $new_addr $new_port

   # Set up socket so it is non-blocking, and buffers a line.
   # If the socket is blocking (the default), then the server
   # can handle only 1 call at a time, which is interesting,
   # but not much practical use.
   fconfigure $new_ch -blocking 0 -buffering line -buffersize 100000

   # Set up file event handler. Script to execute, along with 
   # parameters, must be passed as a single string or list.
   # This allows the specifed routine to be run each time that
   # the socket has data ready to read.
   fileevent $new_ch readable "process_server_input $new_ch"

   # puts "\nserver: [fconfigure $new_ch]\n" ;# shows more data
   return
}

# ================ normal_child_exit =========================
# Normal exit routine for a child process.
#
# Calling parameters: child_num channel errors
#
# Returns: exits the child script
# ============================================================
proc normal_child_exit {child_num ch errors} {

   # Close the socket.
   set catch_resp [catch "close $ch" catch_msg]
   if {$catch_resp != 0} {
      incr errors
      puts "normal_child_exit ERROR: child_num=$child_num close $ch got: $catch_msg"
   }

   # Child process is done.
   puts "normal_child_exit child=$child_num ch=$ch errors=$errors time=[clock format [clock seconds] -format %H:%M:%S]"
   exit $errors
}

# ================ open_client_socket ========================
# Opens socket to specified host & port. Tries multiple times
# before giving up. 
#
# Calling parameters: host port child_num
# child_num is just for clarity in the messages
#
# Returns: socket_id or throws error
# ============================================================
proc open_client_socket {host port child_num} {

   # Define how often & how many times to try opening socket.
   set max_delay 3 ;# time in seconds between tries
   set max_tries 5

   # Open socket to server.
   for {set i 1} {$i <= $max_tries} {incr i} { 
      puts "open_client_socket host=$host port=$port child_num=$child_num Try #$i"
      set catch_resp [catch "set ch \[socket $host $port\]" catch_msg]
      if {$catch_resp == 0} {
         # Set the socket to non-blocking. This allows the gets loop
         # later on to get multiple lines of response from the server.
         fconfigure $ch -blocking 0
         puts "open_client_socket host=$host port=$port child_num=$child_num Try #$i OK ch=$ch"
         # puts "\nclient: [fconfigure $ch]\n" ;# shows more data
         return $ch

      } else {
         puts "open_client_socket host=$host port=$port child_num=$child_num Try #$i failed: $catch_msg"
         if {$i < $max_tries} {
            after [expr $max_delay * 1000]
         }
      }
   }
   error "open_client_socket ERROR: open socket to host=$host port=$port child_num=$child_num failed, tried $max_tries times, $catch_msg"
}

# ================ process_server_input ======================
# Called when fileevent indicates there is data on the
# specified channel to be read and processed. 
#
# Calling parameters: channel
#
# Returns: null
# ============================================================
proc process_server_input {ch} {

   # eof function checks for eof on channel.
   if {[eof $ch]} {  
      close_server_socket $ch EOF
      update_server
      return
   }

   # Get data from socket & parse.
   # Sequence numbers or strings are echoed back in response, as is.
   set input [string trim [gets $ch]] ;# read incoming data from socket
   set len [string length $input]
   set seq_num [string trim [lindex $input 0]]
   # test code to force sequence number error
   # if {[regexp {^\d+$} $seq_num]} {
   #    incr seq_num 
   # }
   set cmd [string tolower [lindex $input 1]]
   set args [lrange $input 2 end]
   # puts "process_server_input ch=$ch seq_num=$seq_num cmd=$cmd args=$args" ;# echo data to screen

   # Process cmd
   if {$input == ""} {
      # Ignore blank lines get OK response.
      # puts "process_server_input ch=$ch ignoring blank line"
      send_data process_server_input $ch "$seq_num OK"

   } elseif {$cmd == "child_info"} {
      # Children are expected to provide their details.
      set temp $::socket_info($ch,ch_child)
      # puts "process_server_input temp=$temp"
      if {$temp == ""} {
         # NB: null child info is caught later in request_work.
         puts "process_server_input registering seq_num=$seq_num cmd=$cmd args=$args ch=$ch"
         set ::socket_info($ch,ch_child) [lindex $args 0]
         set ::socket_info($ch,child_num) [lindex $args 1]
         set ::socket_info($ch,child_pid) [lindex $args 2]
         send_data process_server_input $ch "$seq_num OK"
      } else {
         incr ::errors
         send_data process_server_input $ch "$seq_num ERROR: already registered seq_num=$seq_num cmd=$cmd args=$args ch=$ch"
      }

   } elseif {$cmd == "request_work"} {
      eval request_work $ch $seq_num $args

   } elseif {$cmd == "work_done"} {
      eval work_done $ch $seq_num $args

   } else {
      # Unknown cmd ==> error
      incr ::errors
      send_data process_server_input $ch "$seq_num ERROR: unknown seq_num=$seq_num cmd=$cmd args=$args ch=$ch"
   }
   return
}

# ================ random_child_failure ======================
# This routine is used for testing error recovery. It creates
# random children failures. Do NOT call this routine in normal
# circumstances!
#
# Calling parameters: channel ch_num file
# These parameters are for ease of tracking log messages only.
# 
# Returns: null or exits script
# ============================================================
proc random_child_failure {ch ch_num file} {

   # For production script, should return immediately!
   # return 

   # 10% chance for process to exit.
   if {[expr rand()] <= 0.10} {
      puts "******** random_child_failure ch=$ch ch_num=$ch_num file=$file exiting ********"
      exit
   }

   # 10% chance for long delay / timeout
   if {[expr rand()] <= 0.10} {
      puts "******** random_child_failure ch=$ch ch_num=$ch_num file=$file sleeping ********"
      after 1000000000
   }
   return
}

# ================ request_work ==============================
# This routine finds the next task to be handed out to a child
# process. Sends work assignment to child process, or all_done.
#
# Calling parameters: ch seq_num args
# 
# Returns: null
# ============================================================
proc request_work {ch seq_num args} {
   # puts "request_work ch=$ch seq_num=$seq_num args=$args"
   
   # Is there any work to be done?
   set file ""
   set found ""
   for {set i 1} {$i <= $::work_max} {incr i} {
      set start $::work($i,start)
      set try $::work($i,try)
      if {($start == "") && ($try < $::max_retry)} {
         # This task needs to be (re)assigned.
         set found $i
         set file $::work($found,file)
         break
      }
   }

   # If no work left, send all done response.
   if {$found == ""} {
      send_data request_work $ch "$seq_num all_done seq_num=$seq_num args=$args"
      return
   }

   # Did the child register its info as expected?
   set cur_ch [lindex $args 0]
   set cur_num [lindex $args 1]
   set cur_pid [lindex $args 2]
   # puts "request_work cur_ch=$cur_ch cur_num=$cur_num cur_pid=$cur_pid"
   set reg_ch [string trim $::socket_info($ch,ch_child)]
   set reg_num [string trim $::socket_info($ch,child_num)]
   set reg_pid [string trim $::socket_info($ch,child_pid)]
   # puts "request_work reg_ch=$reg_ch reg_num=$reg_num reg_pid=$reg_pid"
   if {($reg_ch == "") || ($reg_num == "") || ($reg_pid == "")} {
      incr ::errors
      send_data request_work $ch "$seq_num ERROR: child info not registered, kicking child out, ch=$ch seq_num=$seq_num ch_chan=$cur_ch child_num=$cur_num child_pid=$cur_pid"
      after 100 ;# let child catch up
      close_server_socket $ch NOINFO
      update_server
      return
   }

   # Does child info in the current work request match the info already registered?
   if {($cur_ch != $reg_ch) || ($cur_num != $reg_num) || ($cur_pid != $reg_pid)} {
      incr ::errors
      send_data request_work $ch "$seq_num ERROR: child info mismatch: $cur_ch $cur_num $cur_pid NE $reg_ch $reg_num $reg_pid, seq_num=$seq_num"
      return
   }

   # Assign the found task to this child process.
   incr ::work($found,try)
   set ::work($found,child_num) $cur_num
   set ::work($found,child_pid) $cur_pid
   set ::work($found,ch_child) $cur_ch
   set ::work($found,ch_server) $ch
   set ::work($found,start) [clock seconds]
   send_data request_work $ch "$seq_num assigned_work file=$file try=$::work($found,try) assigned to ch=$ch ch_child=$cur_ch child_num=$cur_num child_pid=$cur_pid"
   return
}

# ================ save_new_client ===========================
# When recovering from errors and launching new clients, TCL 
# will happily reuse a socket name, assuming that the socket
# has been closed. When you try to keep track of socket info,
# this can lead to the older socket instance data being
# overwritten by the current socket instance data. So this
# routine will detect this condition and shuffle the older
# data into new entries in the ::socket_info array to preserve
# the older data.
#
# Calling parameters: new_ch new_addr new_port
#
# Returns: null
# ============================================================
proc save_new_client {new_ch new_addr new_port} {

   # Display newly assigned parameters
   puts "save_new_client new_ch=$new_ch new_addr=$new_addr new_port=$new_port"

   # Do we already have data for this channel?
   if {[info exists ::socket_info($new_ch,addr)]} {
      # Try to find an unused channel name.
      set found 0
      for {set i 1} {$i < 100} {incr i} {
         set temp "${i}${new_ch}"
         if {![info exists ::socket_info($temp,addr)]} {
            set found 1
            break
         }
      }

      # Move the older existing data to the unused channel name in array.
      if {$found == 0} {
         incr ::errors
         puts "save_new_client ERROR: socket $temp data will be overwritten!"
      }
      puts "save_new_client moving older $new_ch data to $temp"
      set ::socket_info($temp,addr) $::socket_info($new_ch,addr)
      set ::socket_info($temp,ch_child) $::socket_info($new_ch,ch_child)
      set ::socket_info($temp,child_num) $::socket_info($new_ch,child_num)
      set ::socket_info($temp,child_pid) $::socket_info($new_ch,child_pid)
      set ::socket_info($temp,port) $::socket_info($new_ch,port)
      set ::socket_info($temp,status) $::socket_info($new_ch,status)
      set ::socket_info($temp,delay) $::socket_info($new_ch,delay)
      set ::socket_info($temp,start) $::socket_info($new_ch,start)
      set ::socket_info($temp,finish) $::socket_info($new_ch,finish)
   }

   # Save the new client data
   set ::socket_info($new_ch,addr) $new_addr
   set ::socket_info($new_ch,ch_child) ""
   set ::socket_info($new_ch,child_num) ""
   set ::socket_info($new_ch,child_pid) ""
   set ::socket_info($new_ch,port) $new_port
   set ::socket_info($new_ch,status) ""
   set ::socket_info($new_ch,delay) "-"
   set ::socket_info($new_ch,start) [clock seconds]
   set ::socket_info($new_ch,finish) ""
   return
}

# ================ send_data =================================
# Routine sends data to specified socket, checks for errors.
#
# Calling parameters: calling_name channel data
# calling_name is for log info & error traceability.
#
# Returns: null or throws error
# ============================================================
proc send_data {calling_name ch data} {

   # Display info on terminal
   if {![regexp {OK$} $data]} {
      # Suppress routine OK only messages.
      puts "$calling_name $ch $data"
   }

   # Send data on channel, check for errors
   # set ch zzz ;# test code
   set catch_resp [catch "puts $ch \"$data\"" catch_msg]
   if {$catch_resp != 0} {
      error "ERROR: $calling_name $ch $data puts got: $catch_msg"
   }

   # Flush data, check for errors.
   # If you dont flush after each write, socket buffers data locally.
   # close $ch ;# test code
   set catch_resp [catch "flush $ch" catch_msg]
   if {$catch_resp != 0} {
      error "ERROR: $calling_name $ch $data flush got: $catch_msg"
   }
   return
}

# ================ send_server_get_resp ======================
# Sends request string to server, waits for server response.
#
# Calling parameters: channel seq_num request_string
#
# For enhanced error checking, the sequence number or id is sent
# at the start of the request to the sever. The server is
# expected to use this sequence number or id at the start of
# the response. If the response sequence number or id does
# NOT match that of the request, an error is thrown.
#
# Returns: response string from server, or throws error
# ============================================================
proc send_server_get_resp {ch seq_num request} {

   # Send sequence number and request to server.
   set start_ms [clock milliseconds] ;# milliseconds available in TCL8.5 and up
   send_data send_server_get_resp $ch "$seq_num $request"

   # Get server response and display it.
   # The response may be multiple lines.
   set max_sec 60 ;# timeout for getting response
   set start_sec [clock seconds]
   set response ""
   while {1} {
      # Check for timeout on socket
      after 50
      set now_sec [clock seconds]
      set delta [expr $now_sec - $start_sec]
      if {$delta >= $max_sec} {
         error "send_server_get_resp ERROR: timeout, waited $delta seconds for server response on ch=$ch seq_num=$seq_num request=$request"
      }

      # Collect data until we get some data followed by a null response.
      gets $ch data
      if {$response != "" && $data ==  ""} {
         break
      } else {
         append response $data
      }
   }

   # Validate response sequence number or id.
   # NB: lindex chokes on unmatched quotes in text string, so use regexp!
   # puts "send_server_get_resp seq_num=$seq_num response=$response"
   if {![regexp {^(.*?)\s} $response - response_seq]} {
      set response_seq "ERROR"
   }
   # puts "send_server_get_resp seq_num=$seq_num response_seq=$response_seq response=$response "
   if {$seq_num != $response_seq} {
      error "send_server_get_resp ERROR: sequence number $seq_num != $response_seq, request=$request response=$response"
   } 

   # Show results & timing.
   set stop_ms [clock milliseconds]
   set delta_ms [expr $stop_ms - $start_ms]
   if {$delta_ms > $::max_delay} {
      set ::max_delay $delta_ms
   }
   puts "send_server_get_resp ch=$ch seq_num=$seq_num request=$request response=$response delay=$delta_ms ms"
   return $response
}

# ================ start_server ==============================
# Starts the server listening process.
#
# Calling parameters: none
#
# Returns: TCP port number being listened to, or throws error
# ============================================================
proc start_server {} {

   # To allow for multiple server instances running on the same
   # host, we dynamically try to find an unused port number.
   # socket -server new_client $::port_start ;# test cod
   for {set port $::port_start} {$port <= $::port_end} {incr port} { 
      # Start server socket. It is here that we specify the routine
      # that we want to process incoming socket calls. In this case,
      # I have called the routine new_client.
      puts "start_server trying port $port"
      set catch_resp [catch "socket -server new_client $port" catch_msg]
      if {$catch_resp == 0} {
         puts "start_server listening on port $port, $catch_msg"
         return $port
      } else {
         puts "start_server port $port is not available, $catch_msg"
         after 500
      }
   }
   error "start_server ERROR: could not listen to any of ports $::port_start - $::port_end"
}

# ================ update_server =============================
# When all work is done, shuts down server. When necessary,
# starts more children processes.
#
# Calling parameters: none
#
# Returns: null
# ============================================================
proc update_server { } {

   # Is there any work left to do?
   set work_files ""
   set work_indices ""
   for {set i 1} {$i <= $::work_max} {incr i} {
      set file $::work($i,file)
      set finish $::work($i,finish)
      set try $::work($i,try)
      # puts "update_server i=$i file=$file finish=$finish try=$try"
      if {$finish == "" && $try < $::max_retry} {
         # This file is not finished yet, possibly not yet assigned either.
         lappend work_files $file
         lappend work_indices $i
      }
   }
   puts "update_server work_files=$work_files work_indices=$work_indices"

   # Are there any sockets still open?
   set sockets_open ""
   set names [array names ::socket_info]
   foreach item $names {
      # puts "update_server item=$item"
      if {[regexp -nocase {^(.*),addr$} $item - chan]} {
         set status $::socket_info($chan,status)
         # puts "update_server item=$item chan=$chan status=$status"
         if {![regexp -nocase {close} $status]} {
            # puts "update_server chan=$chan status=$status is still in use"
            lappend sockets_open $chan
         }
      }
   }
   puts "update_server sockets_open=$sockets_open"

   # When there is work to do and sockets are open, carry on. This is normal.
   if {$work_indices != "" && $sockets_open != ""} {
      puts "update_server keep working..."
      return
   }

   # When there is no work to do and sockets are still open, close the sockets.
   # Children should have done this themselves.
   if {$work_indices == "" && $sockets_open != ""} {
      puts "update_server no more work to do, closing sockets, shutting down server"
      foreach chan $sockets_open {
         close_server_socket $chan DONE
      }
      set ::doneit shutdown ;# vwait will recognize the variable has been updated
      return
   }

   # When there is no work and all sockets are closed. This is the expected
   # normal end. Trigger vwait to end the server process.
   if {$work_indices == "" && $sockets_open == ""} {
      puts "update_server no more work to do, all channels are closed, shutting down server"
      set ::doneit shutdown ;# vwait will recognize the variable has been updated
      return
   }

   # Oops! There are no sockets, but we still have work to do!!!
   # Have we already hit this error?
   if {$::child_launch_cnt < $::max_retry} {
      # Launch more children
      after 1000 ;# dont be too aggressive
      incr ::errors
      puts "update_server ERROR: No sockets open, but we still have work to do: $work_files"
      set cnt [llength $work_indices]
      launch_children $cnt $::host $::port
      return

   } else {
      incr ::errors
      puts "update_server ERROR: launched children $::child_launch_cnt times already, shutting server down!"
      set ::doneit shutdown ;# vwait will recognize the variable has been updated
      return
   }
}

# ================ work_done =================================
# This routine logs the results of processing a specific task.
#
# Calling parameters: ch seq_num args
# args can include optional error & warning counts from
# processing the specific file. These are added to the server
# running totals. 
# 
# Returns: null
# ============================================================
proc work_done {ch seq_num args} {
   # puts "work_done ch=$ch seq_num=$seq_num args=$args"
   
   # Parse calling data
   set cur_ch [lindex $args 0]
   set cur_num [lindex $args 1]
   set cur_pid [lindex $args 2]
   set cur_file [lindex $args 3]
   set cur_delay [lindex $args 4]
   set cur_err [lindex $args 5] ;# optional
   set cur_warn [lindex $args 6] ;# optional
   # puts "work_done cur_ch=$cur_ch cur_num=$cur_num cur_pid=$cur_pid cur_file=$cur_file cur_delay=$cur_delay cur_err=$cur_err cur_warn=$cur_warn"

   # Get work data for file being reported as done processing.
   set found ""
   for {set i 1} {$i <= $::work_max} {incr i} {
      set work_file $::work($i,file)
      if {$work_file == $cur_file} {
         # Valid file, get more data
         set found $i
         set work_ch_child $::work($found,ch_child)
         set work_ch_server $::work($found,ch_server)
         set work_child_num $::work($found,child_num)
         set work_child_pid $::work($found,child_pid)
         set work_finish $::work($found,finish)
         break
      }
   }

   # File not found => error.
   if {$found == ""} {
      incr ::errors
      send_data work_done $ch "$seq_num ERROR: file=$cur_file not in work list, ch=$ch seq_num=$seq_num args=$args"
      return
   }

   # Verify work being reported on was assigned to this child.
   if {($ch != $work_ch_server) || ($cur_ch != $work_ch_child) || ($cur_num != $work_child_num) || ($cur_pid != $work_child_pid)} {
      incr ::errors
      send_data work_done $ch "$seq_num ERROR: file=$cur_file work assignment mismatch: $ch $cur_ch $cur_num $cur_pid NE $work_ch_server $work_ch_child $work_child_num $work_child_pid seq_num=$seq_num"
      return
   }

   # Verify work has not already been reported on.
   if {$work_finish != ""} {
      incr ::errors
      send_data work_done $ch "$seq_num ERROR: file=$cur_file assignment already reported done: ch=$ch seq_num=$seq_num args=$args"
      return
   }

   # This is a valid work done report. Log the finish time.
   set ::work($found,finish) [clock seconds]

   # Delay stats are now collected as part of each work done report.
   # This avoids race conditions where both the server & children 
   # processes exit and some delay stats get lost in flight.
   set ::socket_info($ch,delay) $cur_delay

   # Log optional errors & warnings
   if {[regexp {^\d+$} $cur_err] && $cur_err > 0} {
      incr ::errors $cur_err
      puts "work_done ERROR: file=$cur_file reported $cur_err errors"
      set ::work($found,errors) $cur_err
   }
   if {[regexp {^\d+$} $cur_warn] && $cur_warn > 0} {
      incr ::warnings $cur_warn
      puts "work_done WARNING: file=$cur_file reported $cur_warn warnings"
      set ::work($found,warnings) $cur_warn
   }

   # Send response.
   send_data work_done $ch "$seq_num OK ch=$ch seq_num=$seq_num args=$args"
   return
}

# ============================================================
# Main program
# ============================================================
# Initialization
set errors 0
set local_host [exec hostname] ;# Works on Windows & Linux
set max_delay 0 ;# tracks max server response time, in milliseconds
set self [file root [file tail $::argv0]]
set warnings 0

# Online help
set x [lindex $argv 0]
set x [string range $x 0 1]
set x [string tolower $x]
# puts "x=$x"
if {$x == "-h" || $x == "/?"} {
   puts "Basic usage: $self \[host\] \[port\]"
   puts " "
   puts "Demo of task load balancing between processes on"
   puts "multiple CPUs."
   puts " " 
   puts "By default, host is localhost and port is $port_start."
   puts "port is only used when accessing server on a remote host."
   exit 1
}

# Get command line tokens
set host [string tolower [string trim [lindex $argv 0]]]
if {$host == "" || $host == "localhost"} {
   set host $local_host
}
set port [string tolower [string trim [lindex $argv 1]]]
if {$port == ""} {
   set port $port_start
}
# Child_num distinguishes child process from server process.
set child_num [string tolower [string trim [lindex $argv 2]]]
# puts "main host=$host port=$port child_num=$child_num"

# Child code goes here.
if {$child_num != ""} {

   # Open socket to server
   set ch [open_client_socket $host $port $child_num]

   # Sequence numbers on transactions are used for enhanced error checking.
   # Usually numeric, could be alpha string. Key point is to be unique.
   # If sequence number is not unique, then enhanced error cheking is defeated.
   set seq_num 0

   # Send child info to server
   incr seq_num
   set child_pid [pid]
   set child_info "$ch $child_num $child_pid"
   send_server_get_resp $ch $seq_num "child_info $child_info"

   # Process work assigned by server process.
   while { 1 } {

      # Ask server for next assigned task
      incr seq_num
      set server_resp [send_server_get_resp $ch $seq_num "request_work $child_info"]
      set resp [lindex $server_resp 1] ;# NB: token 0 is seq_num
      set file [lindex [split [lindex $server_resp 2] "="] end] ;# token is formated: file=name
      # puts "child_num=$child_num resp=$resp file=$file"
      if {$resp == "all_done"} {
         puts "child_num=$child_num got $resp"
         break ;# thats it, nothing more to do.
      }
      if {$resp == "ERROR:" || $file == ""} {
         incr errors
         puts "ERROR: child_num=$child_num got resp=$resp file=$file"
         break ;# exit due to errors.
      }

      # For testing purposes only, add random errors & timeouts to simulation.
      # Do NOT call this routine in a production script!
      random_child_failure $ch $child_num $file

      # This is where the real processing would be done. 
      # Could be a call to external routine or TCL packages.
      set delay [expr int(rand()*10)]
      puts "child=$child_num delay=$delay sec ch=$ch file=$file"
      after [expr $delay * 1000]

      # Tell server we are done with this assignment.
      # We can pass back optional error & warning counts for the server to collate.
      set err 0
      set warn 0
      incr seq_num
      send_server_get_resp $ch $seq_num "work_done $child_info $file $max_delay $err $warn"
   }

   # Child process is now done.
   normal_child_exit $child_num $ch $errors
}

# Remaining code is run only by the server process.

# Start server when running on localhost.
# Server port will override the command line port.
if {$host == $local_host} {
   set port [start_server]
   puts "main port=$port"

   # Set up work_list array for servers use.
   set work_max 0
   foreach file $work_list {
      incr work_max
      set work($work_max,file) $file ;# filename to be processed
      set work($work_max,try) 0 ;# counter to track how many times file has been assigned
      set work($work_max,child_num) "" ;# child number assigned to process this file
      set work($work_max,child_pid) "" ;# child process id assigned to process this file
      set work($work_max,ch_child) "" ;# child channel assigned to process this file
      set work($work_max,ch_server) "" ;# server channel assigned to process this file
      set work($work_max,start) "" ;# start time, in seconds, that task was assigned
      set work($work_max,finish) "" ;# finish time, in seconds, that task was reported finished
      set work($work_max,errors) 0 ;# child processing code may report an error count
      set work($work_max,warnings) 0 ;# child processing code may report a warning count
   }
   puts "main work_max=$work_max"
   # display_work_info
}

# Launch the children as independantly running parallel processes.
launch_children $max_children $host $port
if {$host != $local_host} {
   puts "$self main children will keep running..."
   exit
}

# Create dummy variable for vwait to watch.
set doneit "running"  
# puts "main doneit=$doneit"

# Set child timeout poll timer to 1/10 of child_timeout_sec.
set poll_ms [expr int($child_timeout_sec * 1000 / 10 )]

# Vwait processes socket events until dummy variable doneit is updated.
puts "main starting vwait..."
while {$doneit != "shutdown" } {

   # Set the periodic timer so we are guaranteed to be able to go check
   # for children timeouts. 
   after $poll_ms {set doneit check}

   # Let vwait handle socket events.
   vwait doneit

   # When the periodic timer has kicked, go look for children taking too long.
   if {$doneit == "check"} {
      check_child_timeout
      # NB: update_server may alter value of doneit, so be careful here!
      if {$doneit == "check"} {
         set doneit running
      }
   }
}
puts "main vwait finished doneit=$doneit"

# Trace info
after 1000
display_socket_info $port
display_work_info

# Thats it, we are done.
puts "\n$self main all done, $errors errors, $warnings warnings."
exit $errors 

Back to Tcl/Tk FAQ Index
Back to Tcl/Tk Forum

My Archive

Resources

Close Box

Join Tek-Tips® Today!

Join your peers on the Internet's largest technical computer professional community.
It's easy to join and it's free.

Here's Why Members Love Tek-Tips Forums:

Register now while it's still free!

Already a member? Close this window and log in.

Join Us             Close