/* */ /* PIT - Parallelize It! Copyright (C) 2005 Kenneth R. Koehler */ /* */ /* e-mail address - kenneth.koehler@uc.edu */ /* */ /* */ /* This program is free software; you can redistribute it and/or modify it under the terms of the */ /* GNU General Public License as published by the Free Software Foundation; either version 2 of */ /* the License, or (at your option) any later version. */ /* */ /* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; */ /* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR */ /* PURPOSE. See the GNU General Public License for more details. */ /* */ /* You should have received a copy of the GNU General Public License along with this program; if */ /* not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* */ /* */ /* pit.c */ /* */ /* Workload must be partitioned into a set of parcels, each described by an entry in the */ /* workload file consisting of the command line necessary to process the parcel. */ /* */ /* The master instance of pit sends these parcels to the slave instances; as each completes, */ /* the slave notifies the master and the next parcel is transmitted. */ /* */ /* If a slave process dies (or TCP Keep Alive fails), the master reassigns the parcel. */ /* */ /* If the master is terminated before the workload is complete, it will recover */ /* via the progress file, rescheduling parcels that were in progress when the */ /* termination occurred. There are two progress files, used alternately: command line */ /* option determines which file is used. */ /* */ /* SIGHUP causes master process to dump current status, */ /* slave to cleanly exit after current parcel */ /* */ /* */ /* NB: Workload string parameters should have no spaces or be enclosed by "" or ''. */ /* */ /* */ /* NB: Uses parallel TCP port assigned to GAUSS (tm) */ /* */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define EXPECTED_DEFAULT 64 #define FALSE 0 #define NULL_SOCKET 0 #define PARCEL_FINISHED 0 #define SLAVE_BUSY 1 #define SLAVE_IDLE 0 #define SLAVE_SHUTDOWN -1 #define TRUE 1 /* prototypes for master processing */ /* workload management */ int assign_parcel (int slave_index); /* assign and send a parcel to a slave; returns slave==OK */ /* called by main */ void get_workload (unsigned long int id, char *work); /* read sequentially from workload file */ /* called by main, assign_parcel */ void save_progress (); /* called by assign_parcel, slave_down */ /* slave and socket control (die can also shutdown slave's socket) */ void add_slave (); /* called by main */ void slave_down (int slave_index); /* called by main, assign_parcel */ void close_sockets (); /* called by main, die */ /* manipulate parcel array */ int add_parcel (unsigned long int id); /* called by main, assign_parcel */ void remove_parcel (unsigned long int id); /* called by main, assign_parcel */ void dump_status (); /* called by catch_sighup, catch_sigint */ /* generic prototypes */ void catch_sighup (int signal_value); void catch_sigint (int signal_value); void die (); void die_msg (char *error_code); void time_stamp (); void time_stamp_msg (char *comment); /* global variables for master processing */ char progress_file_name [256] = "pit.progress"; /* used in main, save_progress */ char progress_file_suffix [3] = ".a"; int expected = EXPECTED_DEFAULT; /* maximum established sockets waiting to be accepted */ /* also dynamic array lengths are a multiple of this */ /* used in main, add_parcel, add_slave */ int listen_socket; /* used in main, close_sockets */ int no_more_work_in_file = FALSE; /* used in main, assign_parcel, get_workload */ int slave_socket; /* used in main, add_slave, die */ long number_of_parcels = 0; /* used in those plus add_parcel, remove_parcel, assign_parcel */ long number_of_slaves = 0; /* used in main, save_progress, add_slave, dump_status */ long max_number_of_parcels = EXPECTED_DEFAULT; /* used in main, add_parcels */ long max_number_of_slaves = EXPECTED_DEFAULT; /* used in main, add_slaves */ unsigned long int current_parcel = 0; /* used in assign_parcel, get_workload*/ unsigned long int earliest_parcel = 0; /* used in main, add_parcel */ unsigned long int latest_parcel = 0; typedef struct { char slave_ip_address [16]; char workload [256]; unsigned long int identifier; } parcel; typedef struct { char ip_address [16]; int slave_socket; int status; unsigned long int parcel_identifier; } slave; parcel *parcel_array; /* used in those, plus add_parcel, remove_parcel */ struct pollfd *poll_array; /* used in main, add_slave */ slave *slave_array; /* used in main, assign_parcel, slave_down, save_progress, add_slave, dump_status */ FILE *workload_file_descriptor; /* used in main, get_workload */ /* global variables used for both */ char output_string [512]; /* used in slave_down, dump_status, time_stamp* */ int debug = FALSE; /* used in main, assign_parcel */ int done = FALSE; /* used in main, catch_sighup */ int master = FALSE; /* used in main, catch_sighup */ int parent = TRUE; /* use in main, die */ int socket_keepalive = 1; /* used in main, add_slave */ long start_time_seconds = -1; /* used in time_stamp */ long start_time_microseconds; struct protoent *tcp_protocol_entry; /* used in main, add_slave */ /* main program */ int main (int argument_count, char *arguments[]) { /* local variables for master processing */ char workload_file_name [256] = "pit.workload"; int all_slaves_idle = FALSE; int parcel_index; int slave_index; int ps_index; /* used to index into both slave and poll arrays */ int slave_socket_address_length; unsigned long int parcel_id; unsigned long int this_identifier; FILE *progress_file_descriptor = NULL; struct sockaddr_in slave_socket_address; /* local variables for slave processing */ char master_host_name [256]; char *workload_array [128]; /* array of pointers to parameters for execvp */ int child_status; int param_begin_index; /* these are used for parsing workload for execvp */ int param_end_index; int workload_array_index; int workload_length; char *index_ptr; struct hostent *host_entry; /* local variables used for both */ char fini [5] = "fini"; char workload [256]; int input_length; int option_letter; int recv_length; struct servent *parallel_service_entry; struct sockaddr_in master_socket_address; extern char *optarg; extern int optind; time_stamp (); /* initialize time value */ /* process command line options */ option_letter = 0; while (option_letter != -1) { option_letter = getopt (argument_count, arguments, "dhmn:p:"); if (option_letter == '?') die_msg ("unknown option"); if (option_letter == ':') die_msg ("missing option parameter"); if (option_letter == 'd') debug = TRUE; if (option_letter == 'h') { printf ("usage: pit -m (-n expected_number_of_slaves) (-p ) ()\n"); printf (" pit \n"); printf (" pit -h\n"); printf (" may also add -d for parcel debugging\n"); printf ("defaults: n = %d, = pit.progress, = pit.workload\n", EXPECTED_DEFAULT); exit (0); } if (option_letter == 'm') master = TRUE; if (option_letter == 'n') { sscanf (optarg, "%d", &expected); max_number_of_slaves = expected; max_number_of_parcels = expected; } if (option_letter == 'p') { if (strlen (optarg) > 253) die_msg ("progress file name is too long"); else strcpy (progress_file_name, optarg); } } /* process positional parameter */ if (master) { if (optind < argument_count) { if (strlen (arguments[optind]) > 255) die_msg ("workload file name is too long"); else strcpy (workload_file_name, arguments[optind]); } } else { if (optind < argument_count) { if (strlen (arguments[optind]) > 255) die_msg ("master host name or ip address is too long"); else strcpy (master_host_name, arguments[optind]); } else die_msg ("usage: pit (-d) "); } /* generic initializations for both master and slave processes */ signal (SIGHUP, catch_sighup); signal (SIGINT, catch_sigint); tcp_protocol_entry = getprotobyname ("tcp"); if (tcp_protocol_entry == NULL) die_msg ("getting tcp protocol entry"); parallel_service_entry = getservbyname ("parallel", "tcp"); if (parallel_service_entry == NULL) die_msg ("getting parallel service entry"); if (master) { /* code for master process */ /* allocate slave and poll arrays - there is a 1:1 correspondence between their elements */ slave_array = (slave *) calloc (max_number_of_slaves, sizeof(slave)); if (slave_array == NULL) { perror ("allocating initial slave array"); die (); } poll_array = (struct pollfd *) calloc (max_number_of_slaves + 1, sizeof(slave)); if (poll_array == NULL) { perror ("allocating initial poll array"); die (); } /* recover parcels left from last partial run */ parcel_array = (parcel *) calloc (max_number_of_parcels, sizeof(parcel)); if (parcel_array == NULL) { perror ("allocating reschedule parcel array"); die (); } progress_file_descriptor = fopen (progress_file_name, "r"); if (progress_file_descriptor != NULL) { while (fscanf (progress_file_descriptor, "%ld\n", &(this_identifier)) == 1) add_parcel (this_identifier); if (fclose (progress_file_descriptor) != 0) { perror ("closing progress file"); die (); }} /* open workload file */ workload_file_descriptor = fopen (workload_file_name, "r"); if (workload_file_descriptor == NULL) { perror ("error opening workload file"); die (); } if (earliest_parcel > 0) { /* read workloads for rescheduled parcels */ for (parcel_id = earliest_parcel; parcel_id <= latest_parcel; parcel_id++) { get_workload (parcel_id, workload); if (no_more_work_in_file) die_msg ("premature end of workload file"); parcel_index = 0; while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier == parcel_id) strcpy (parcel_array[parcel_index].workload, workload); parcel_index +=1; } } } /* allocate socket, bind and listen */ listen_socket = socket (PF_INET, SOCK_STREAM, tcp_protocol_entry->p_proto); if (listen_socket == NULL_SOCKET) { perror ("allocating listen socket"); die (); } memset (&master_socket_address, 0, sizeof (master_socket_address)); master_socket_address.sin_family = AF_INET; master_socket_address.sin_addr.s_addr = INADDR_ANY; master_socket_address.sin_port = parallel_service_entry->s_port; if (bind (listen_socket, (struct sockaddr *)(&master_socket_address), sizeof(master_socket_address)) != 0) { perror ("binding socket"); die (); } if (listen (listen_socket, expected) != 0) { perror ("listening"); die (); } time_stamp_msg ("Beginning processing."); /* master process main loop */ while (! done) { /* poll for new slaves, input or hangups from slaves in progress */ /* listen socket takes last position in poll_array - there is no slave array element for it */ poll_array[number_of_slaves].fd = listen_socket; poll_array[number_of_slaves].events = POLLIN; if (poll (poll_array, number_of_slaves + 1 , -1) < 0) { perror ("polling"); die (); } /* examine poll_array elements for existing slaves */ ps_index = 0; while (ps_index < number_of_slaves) { if (poll_array[ps_index].revents & POLLHUP) /* process a hung up slave */ slave_down (ps_index); else { if (poll_array[ps_index].revents & POLLIN) { /* process input from a slave */ for (input_length = 0; input_length < 4; input_length += recv_length) { recv_length = recv (slave_array[ps_index].slave_socket, &fini, 4 - input_length, MSG_WAITALL); if (recv_length <= 0) { if ((recv_length == 0) || (errno == EPIPE) || (errno == ETIMEDOUT)) { slave_down (ps_index); break; } else { perror ("recving fini from slave"); die (); } } } if (recv_length > 0) { if (debug) { sprintf (output_string, "Slave at IP address %s sent %s.", slave_array[ps_index].ip_address, fini); time_stamp (); } /* remove finished parcel and assign a new one */ remove_parcel (slave_array[ps_index].parcel_identifier); if (assign_parcel (ps_index)) ps_index += 1; } } else { if (poll_array[ps_index].revents != 0) die ("bad return from poll"); else ps_index += 1; } } } /* accept connection from new slave */ if (poll_array[number_of_slaves].revents & POLLIN) { slave_socket_address_length = sizeof(slave_socket_address); slave_socket = accept (listen_socket, (struct sockaddr *)(&slave_socket_address), &slave_socket_address_length); if (slave_socket < 0) { perror ("accepting"); die (); } /* get it ready and assign it some work to do */ add_slave (); assign_parcel (number_of_slaves - 1); } /* see if there are any idle slaves to assign work to */ slave_index = 0; while (slave_index < number_of_slaves) { if (slave_array[slave_index].status == SLAVE_IDLE) { if (assign_parcel (slave_index)) slave_index += 1; } else slave_index += 1; } /* if there's no more work and all slaves are idle, we're done */ if (no_more_work_in_file && (number_of_parcels == 0)) { slave_index = 0; all_slaves_idle = TRUE; while (slave_index < number_of_slaves) { all_slaves_idle = all_slaves_idle && ((slave_array[slave_index].status == SLAVE_IDLE) || (slave_array[slave_index].status == SLAVE_SHUTDOWN)); slave_index += 1; } if (all_slaves_idle) done = TRUE; } } close_sockets (); time_stamp_msg ("Processing finished."); return 0; } else { /* code for slave process */ /* do DNS lookup */ memset (&master_socket_address, 0, sizeof (master_socket_address)); master_socket_address.sin_family = AF_INET; host_entry = gethostbyname (master_host_name); if (host_entry == NULL) die_msg ("unable to identify master IP address"); memcpy (&master_socket_address.sin_addr, host_entry->h_addr, host_entry->h_length); master_socket_address.sin_port = parallel_service_entry->s_port; /* allocate, connect and set socket option */ slave_socket = socket (PF_INET, SOCK_STREAM, tcp_protocol_entry->p_proto); if (slave_socket == NULL_SOCKET) { perror ("allocating slave socket"); die (); } if (connect (slave_socket, (struct sockaddr *)(&master_socket_address), sizeof(master_socket_address)) < 0) { perror ("connecting"); die (); } if (setsockopt (slave_socket, tcp_protocol_entry->p_proto, SO_KEEPALIVE, &(socket_keepalive), sizeof(socket_keepalive)) != 0) { perror ("setting keep alive socket option"); die (); } time_stamp_msg ("Beginning processing."); while (! done) { /* recv length of next parcel workload */ for (input_length = 0; input_length < sizeof(int); input_length += recv_length) { recv_length = recv (slave_socket, &workload_length, sizeof(int) - input_length, MSG_WAITALL); if (recv_length <= 0) { if ((recv_length == 0) || (errno == EPIPE) || (errno == ETIMEDOUT)) { time_stamp_msg ("Master process died."); exit (1); } else { perror ("recving workload length"); die (); } } } if (workload_length == 0) { /* master has indicated we're done */ done = TRUE; break; } /* recv next parcel workload */ if (workload_length > 255) die_msg ("excessive workload length"); for (input_length = 0; input_length < workload_length; input_length += recv_length) { recv_length = recv (slave_socket, &workload, workload_length - input_length, MSG_WAITALL); if (recv_length <= 0) { if ((recv_length == 0) || (errno == EPIPE) || (errno == ETIMEDOUT)) { time_stamp_msg ("Master process died."); exit (1); } else { perror ("recving workload"); die (); } } } if (debug) { strcpy (output_string, "Received : "); strncat (output_string, workload, workload_length); time_stamp (); } /* parse workload into workload_array for execvp */ workload_array_index = 0; param_begin_index = 0; memset (&(workload[workload_length]), 0, 1); /* so index works properly */ while (param_begin_index < workload_length) { if (workload[param_begin_index] == '\'') { /* allow for parameters quoted with apostrophes */ param_begin_index += 1; index_ptr = index (&(workload[param_begin_index + 1]), '\''); if (index_ptr == NULL) die ("malformed workload apostrophe"); param_end_index = (index_ptr - &(workload[0]) - 1); } else { if (workload[param_begin_index] == '"') { /* allow for quoted parameters */ param_begin_index += 1; index_ptr = index (&(workload[param_begin_index + 1]), '"'); if (index_ptr == NULL) die ("malformed workload quote"); param_end_index = (index_ptr - &(workload[0]) - 1); } else { index_ptr = index (&(workload[param_begin_index]), ' '); /* space-delimited parameters */ if (index_ptr == NULL) param_end_index = (workload_length - 1); else param_end_index = (index_ptr - &(workload[0]) - 1); } } workload_array[workload_array_index] = &(workload[param_begin_index]); workload_array_index += 1; if (workload_array_index > 126) die ("workload_array needs to be larger"); memset (&(workload[param_end_index + 1]), 0, 1); /* break workload up into separate strings */ param_begin_index = param_end_index + 2; while (workload[param_begin_index] == ' ') /* find start of next parameter */ param_begin_index += 1; } workload_array[workload_array_index] = NULL; /* execvp requires NULL pointer to terminate array */ /* execute it */ switch (fork()) { case 0: /* child task */ parent = FALSE; if (close (slave_socket) != 0) { /* close socket inherited from parent */ perror ("closing socket"); die (); } execvp (workload, workload_array); /* only returns on error */ perror ("execvp failed"); die (); break; case -1: die_msg("creating child process"); break; default: /* parent task */ break; } /* wait for it to finish */ if (wait (&child_status) < 0) { perror ("waiting for child to terminate"); die (); } if (WIFEXITED (child_status) && (WEXITSTATUS (child_status) != 0)) { sprintf (output_string, "child exit status was %d.\n", WEXITSTATUS (child_status)); die (output_string); } if (WIFSIGNALED (child_status)) { sprintf (output_string, "child exited due to signal %d.\n", WTERMSIG (child_status)); die (output_string); } /* send fini to master */ if (send (slave_socket, fini, 4, MSG_NOSIGNAL) < 0) { if ((errno == EPIPE) || (errno == ETIMEDOUT)) { time_stamp_msg ("Master process died."); exit (1); } perror ("sending fini"); die (); } if (debug) time_stamp_msg ("Fini sent."); } time_stamp_msg ("Processing finished."); if (shutdown (slave_socket, SHUT_WR) < 0) { perror ("shutting down socket"); die (); } return 0; } } /* procedures used by master process */ int assign_parcel (int slave_index) { int parcel_index = 0; int workload_length; unsigned long int parcel_id = PARCEL_FINISHED; if (! (no_more_work_in_file && (number_of_parcels == 0))) { /* entire workload not complete */ while (parcel_index < number_of_parcels) { /* look for more parcels already scheduled */ if ((parcel_array[parcel_index].identifier != PARCEL_FINISHED) && (strlen (parcel_array[parcel_index].slave_ip_address) == 0)) { /* but not already in progress */ parcel_id = parcel_array[parcel_index].identifier; break; } parcel_index += 1; } if ((! no_more_work_in_file) && (parcel_id == PARCEL_FINISHED)) { /* otherwise get next parcel in sequence */ parcel_id = current_parcel + 1; parcel_index = add_parcel (parcel_id); get_workload (parcel_id, parcel_array[parcel_index].workload); if (no_more_work_in_file) { remove_parcel (parcel_id); parcel_id = PARCEL_FINISHED; } } } if (parcel_id == PARCEL_FINISHED) { /* no work to do right now but not all slaves necessarily done */ if (number_of_parcels == 0) { /* but if they ARE... */ if (slave_array[slave_index].status != SLAVE_SHUTDOWN) { /* if we haven't already, */ slave_array[slave_index].parcel_identifier = PARCEL_FINISHED; slave_array[slave_index].status = SLAVE_SHUTDOWN; /* tell slave it's finished */ workload_length = 0; } else return TRUE; } else { /* nothing to do right now but there will be if a slave dies */ slave_array[slave_index].parcel_identifier = PARCEL_FINISHED; slave_array[slave_index].status = SLAVE_IDLE; return TRUE; } } else { /* we have one, so send it */ slave_array[slave_index].parcel_identifier = parcel_id; slave_array[slave_index].status = SLAVE_BUSY; workload_length = strlen(parcel_array[parcel_index].workload) - 1; } /* do not send \n */ /* send length of workload to slave, followed by workload (if more to do) */ if (send (slave_array[slave_index].slave_socket, &workload_length, sizeof(int), MSG_NOSIGNAL) < 0) { if ((errno == EPIPE) || (errno == ETIMEDOUT)) { slave_down (slave_index); return FALSE; } else { perror ("writing workload length to slave"); die (); } } if (workload_length > 0) { if (send (slave_array[slave_index].slave_socket, parcel_array[parcel_index].workload, workload_length, MSG_NOSIGNAL) < 0) { if ((errno == EPIPE) || (errno == ETIMEDOUT)) { slave_down (slave_index); return FALSE; } else { perror ("writing workload to slave"); die (); } } /* store ip address of slave assigned to this parcel */ strncpy (parcel_array[parcel_index].slave_ip_address, slave_array[slave_index].ip_address, 16); } save_progress (); /* keep track of where we are */ if (debug) { if (workload_length > 0) { sprintf (output_string, "Slave at IP address %s has been sent ", slave_array[slave_index].ip_address); strncat (output_string, parcel_array[parcel_index].workload, workload_length); } else sprintf (output_string, "Slave at IP address %s has been sent end of run.", slave_array[slave_index].ip_address); time_stamp (); } return TRUE; } void get_workload (unsigned long int id, char *work) { if (current_parcel >= id) die_msg ("unable to scan backwards"); while (current_parcel < id) { current_parcel += 1; if (fgets (work, 256, workload_file_descriptor) != work) no_more_work_in_file = TRUE; } } void save_progress () { /* do this everytime a slave dies or a parcel is assigned */ int parcel_index = 0; int progress_file_name_length = strlen (progress_file_name); FILE *progress_file_descriptor = NULL; strcat (progress_file_name, progress_file_suffix); progress_file_descriptor = fopen (progress_file_name, "w"); if (progress_file_descriptor == NULL) { perror ("error opening progress file to write"); die (); } while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier != PARCEL_FINISHED) if (fprintf (progress_file_descriptor, "%ld\n", parcel_array[parcel_index].identifier) < 0) { perror ("error writing waiting parcel to progress file"); die (); } parcel_index += 1; } if (fclose (progress_file_descriptor) != 0) { perror ("closing progress file"); die (); } sync (); /* alternate between a and b file extensions for fallback resilience */ progress_file_name[progress_file_name_length] = 0; if (progress_file_suffix[1] == 'a') progress_file_suffix[1] = 'b'; else progress_file_suffix[1] = 'a'; } void add_slave () { int slave_socket_address_length = sizeof(struct sockaddr_in); slave *new_slave_array = NULL; struct pollfd *new_poll_array; struct sockaddr_in slave_socket_address; /* set keep alive option in socket */ if (setsockopt (slave_socket, tcp_protocol_entry->p_proto, SO_KEEPALIVE, &(socket_keepalive), sizeof(socket_keepalive)) != 0) { perror ("setting keep alive socket option"); die (); } /* allocate larger slave and poll arrays if necessary */ if (number_of_slaves >= max_number_of_slaves) { new_slave_array = (slave *) realloc (slave_array, expected * sizeof(slave)); if (new_slave_array == NULL) die_msg ("allocating new slave array"); slave_array = new_slave_array; new_poll_array = (struct pollfd *) realloc (poll_array, expected * sizeof(struct pollfd)); if (new_poll_array == NULL) die_msg ("allocating new poll array"); poll_array = new_poll_array; max_number_of_slaves += expected;} /* add new slave to slave and poll arrays */ memset (slave_array[number_of_slaves].ip_address, 0, 16); slave_array[number_of_slaves].parcel_identifier = PARCEL_FINISHED; slave_array[number_of_slaves].slave_socket = slave_socket; slave_array[number_of_slaves].status = SLAVE_IDLE; poll_array[number_of_slaves].events = POLLIN; poll_array[number_of_slaves].fd = slave_socket; poll_array[number_of_slaves].revents = 0; /* get slave IP address for future reference */ if (getpeername (slave_socket, (struct sockaddr *)(&slave_socket_address), &slave_socket_address_length) != 0) { perror ("getting peer name for slave"); die (); } strncpy (slave_array[number_of_slaves].ip_address, inet_ntoa (slave_socket_address.sin_addr), 16); sprintf (output_string, "Slave at IP address %s now available.", slave_array[number_of_slaves].ip_address); time_stamp (); number_of_slaves += 1; } void slave_down (int slave_index) { int parcel_index = 0; if (slave_array[slave_index].status != SLAVE_SHUTDOWN) { /* expect it from finished slaves */ sprintf (output_string, "Slave at IP address %s went down.", slave_array[slave_index].ip_address); time_stamp (); /* clear slave_ip_address for this parcel so it can be resent */ while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier == slave_array[slave_index].parcel_identifier) { strcpy (parcel_array[parcel_index].slave_ip_address, ""); break; } parcel_index += 1; } save_progress (); } if (close (slave_array[slave_index].slave_socket) != 0) { perror ("closing hung up socket"); die (); } /* remove slave from slave and poll arrays */ if (slave_index != (number_of_slaves - 1)) /* contract slave array if necessary */ memmove (&(slave_array[slave_index]), &(slave_array[slave_index + 1]), ((number_of_slaves - slave_index - 1) * sizeof(slave))); /* always contract poll array because we may be called before listen socket has been checked */ memmove (&(poll_array[slave_index]), &(poll_array[slave_index + 1]), ((number_of_slaves - slave_index) * sizeof(struct pollfd))); number_of_slaves -= 1; } void close_sockets () { int slave_index; /* close all open sockets, but don't die on error since we may have been called by die */ slave_index = 0; while (slave_index < number_of_slaves) { if (close (slave_array[slave_index].slave_socket) != 0) perror ("closing socket"); slave_index += 1; } if (close (listen_socket) != 0) perror ("closing listen socket"); } int add_parcel (unsigned long int id) { int parcel_index = 0; parcel *new_array = NULL; if ((earliest_parcel == 0) || (id < earliest_parcel)) /* so we can re-initialize workloads */ earliest_parcel = id; if ((latest_parcel == 0) || (id > latest_parcel)) latest_parcel = id; while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier == PARCEL_FINISHED) { /* re-use entry */ parcel_array[parcel_index].identifier = id; strcpy (parcel_array[parcel_index].slave_ip_address, ""); strcpy (parcel_array[parcel_index].workload, ""); return parcel_index; } parcel_index += 1; } if (number_of_parcels >= max_number_of_parcels) { /* allocate and initialize larger parcel array if necessary */ new_array = (parcel *) realloc (parcel_array, expected * sizeof(parcel)); if (new_array == NULL) die_msg ("allocating new parcel array"); parcel_array = new_array; max_number_of_parcels += expected; for (parcel_index = number_of_parcels; parcel_index < max_number_of_parcels; parcel_index++) { parcel_array[parcel_index].identifier = PARCEL_FINISHED; strcpy (parcel_array[parcel_index].slave_ip_address, ""); strcpy (parcel_array[parcel_index].workload, ""); } } parcel_array[number_of_parcels].identifier = id; /* add it to the end of the array */ strcpy (parcel_array[number_of_parcels].slave_ip_address, ""); strcpy (parcel_array[number_of_parcels].workload, ""); number_of_parcels += 1; return number_of_parcels - 1; } void remove_parcel (unsigned long int id) { /* simply marks them finished so they can be re-used */ int parcel_index = 0; while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier == id) { parcel_array[parcel_index].identifier = PARCEL_FINISHED; strcpy (parcel_array[parcel_index].slave_ip_address, ""); strcpy (parcel_array[parcel_index].workload, ""); break; } parcel_index += 1; } /* fix number of parcels */ number_of_parcels = max_number_of_parcels - 1; while (number_of_parcels >= 0) if (parcel_array[number_of_parcels].identifier != PARCEL_FINISHED) break; else number_of_parcels -= 1; number_of_parcels += 1; } void dump_status () { int parcel_index = 0; while (parcel_index < number_of_parcels) { if (parcel_array[parcel_index].identifier != PARCEL_FINISHED) { sprintf (output_string, "Slave at IP address %s processing parcel id %ld, workload follows.", parcel_array[parcel_index].slave_ip_address, parcel_array[parcel_index].identifier); time_stamp (); printf (" %s\n", parcel_array[parcel_index].workload); } parcel_index += 1; } fflush (NULL); } /* procedures used for both master and slave processes */ void catch_sighup (int signal_value) { if (master) dump_status (); else done = TRUE; } void catch_sigint (int signal_value) { dump_status (); die_msg ("SIGINT"); } void die () { /* close sockets so counterpart knows we're dying */ if (master) close_sockets (); else { if (parent && (shutdown (slave_socket, SHUT_WR) < 0)) perror ("shutting down socket"); } memset (output_string, 0, 1); time_stamp (); abort (); } void die_msg (char *error_code) { char error_string [256]; if (master) close_sockets (); else { if (parent && (shutdown (slave_socket, SHUT_WR) < 0)) perror ("shutting down socket"); } memset (output_string, 0, 1); time_stamp (); strcpy (error_string, "Error *** "); strcat (error_string, error_code); strcat (error_string, " *** \n"); printf (error_string); abort (); } void time_stamp () { long adjusted_seconds = start_time_seconds; long adjusted_microseconds = start_time_microseconds; struct timeval now; gettimeofday (&now, NULL); if (start_time_seconds == -1) { start_time_seconds = now.tv_sec; start_time_microseconds = now.tv_usec; return; } if ((now.tv_usec - adjusted_microseconds) < 0) { adjusted_microseconds -= 1000000; adjusted_seconds += 1; } sprintf (output_string + strlen (output_string), " Total elapsed time is %ld.%06ld.\n", (now.tv_sec - adjusted_seconds), (now.tv_usec - adjusted_microseconds)); printf (output_string); fflush (NULL); } void time_stamp_msg (char *comment) { strcpy (output_string, comment); time_stamp (); }