/* EGAD: parallel_egad.cpp Navin Pokala and Tracy Handel Dept. of Molecular and Cell Biology University of California, Berkeley Copyright (C) 2003 Regents of the University of California GNU Public License Aug 12 2003 Absolutely no warranties are made or are implied with the use of this program or its parts. This file contains the functions for generating pair energy lookup tables in a parallel manner with networked and filesharing computers. */ #include "parallel_egad.h" /* this function creates slave inputfiles for parallel lookuptable generation. requires that protein has been initialized with input_stuff.cpp: input_stuff */ void lookup_table_master(char *inputfilename, PROTEIN *protein) { int i,j,k,num_jobs,all_jobs_done_flag; int num_processors; int num_residues_calculate; char *command, *line, *dummystring; FILE *file_ptr, *input_file_ptr; char *inputfile_top_filename, *slave_inputfilename, *working_inputfilename, *processor_name, *job_frag_filename; extern char *AVAILABLE_PROCESSORS_FILE, *EXECUTABLE_FILENAME, *CURRENT_WORKING_DIRECTORY, *INPUTFILENAME; extern int GET_PID; if(protein->parameters.log10_rotamer_combinations <= 20) { fprintf(stderr,"This is a small problem of 10^%lf rotamer combinations; not worth parallelization overhead for lookup_table generation\n",protein->parameters.log10_rotamer_combinations); fprintf(stderr,"Switching to single CPU mode\n"); return; } if(INPUTFILENAME==NULL) INPUTFILENAME = inputfilename; if(AVAILABLE_PROCESSORS_FILE==NULL) { fprintf(stderr,"ERROR For lookup_table_master, CPU_FILE must be defined or NUM_CPU > 0\n"); exit(1); } if(EXECUTABLE_FILENAME==NULL) { fprintf(stderr,"ERROR For lookup_table_master, EXECUTABLE_FILENAME must be defined\n"); exit(1); } else { if(does_this_file_exist(EXECUTABLE_FILENAME)==0) { fprintf(stderr,"ERROR EXECUTABLE_FILENAME %s does not exist.\n",EXECUTABLE_FILENAME); exit(1); } } /* allocate memory */ command = (char *)calloc(2*MAXLINE,sizeof(char)); line = (char *)calloc(MAXLINE,sizeof(char)); dummystring = (char *)calloc(MAXLINE,sizeof(char)); inputfile_top_filename = (char *)calloc(MAXLINE,sizeof(char)); processor_name = (char *)calloc(MAXLINE,sizeof(char)); job_frag_filename = (char *)calloc(MAXLINE,sizeof(char)); slave_inputfilename = (char *)calloc(MAXLINE, sizeof(char)); working_inputfilename = (char *)calloc(MAXLINE, sizeof(char)); num_processors=0; file_ptr=fopen_file(AVAILABLE_PROCESSORS_FILE,"r"); while(fgets(line,MAXLINE,file_ptr)!=NULL) ++num_processors; fclose(file_ptr); /* copy inputfile top (above VARIABLE_POSITIONS line) to inputfile_top.pid */ sprintf(inputfile_top_filename,"inputfile_top.%d",GET_PID); file_ptr = fopen_file(inputfile_top_filename,"w"); input_file_ptr = fopen_file(inputfilename,"r"); fgets(line,MXLINE_INPUT,input_file_ptr); while(strncmp(line,"START",5)!=0) fgets(line,MXLINE_INPUT,input_file_ptr); fprintf(file_ptr,"%s",line); fgets(line,MXLINE_INPUT,input_file_ptr); sscanf(line,"%s",dummystring); convert_string_to_all_caps(dummystring); while(strstr(dummystring,"_POSITIONS")==0) { if(strcmp(dummystring,"OUTPUT_PREFIX")!=0 && strcmp(dummystring,"PID")!=0 && strcmp(dummystring,"PROCESS_ID")!=0 && strcmp(dummystring,"END")!=0) fprintf(file_ptr,"%s",line); fgets(line,MXLINE_INPUT, input_file_ptr); sscanf(line,"%s",dummystring); convert_string_to_all_caps(dummystring); } fclose(input_file_ptr); fclose(file_ptr); /* create slave inputfile job_fragments (ie: inputfile stuff below VARIABLE_POSITIONS); spread evenly over num_processors fragments */ i=1; k=1; while(protein->var_pos[i].seq_position != ENDFLAG) { if(protein->var_pos[i].fixed_flag == 0) { { sprintf(job_frag_filename,"%s.%d.%d.job_frag", inputfilename, GET_PID, k); file_ptr = fopen_file(job_frag_filename,"a"); sprintf(line,"%s \t", protein->seqpos_text_map[seqpos_to_inputted_string(protein->var_pos[i].seq_position, protein->seqpos_text_map)].seqpos_text); j=1; num_residues_calculate=0; while(j<=protein->var_pos[i].number_of_choices) { sprintf(dummystring,"%s/var_fix/%d/%s.%d.var_fix_energy", protein->parameters.lookup_energy_table_directory, protein->var_pos[i].seq_position, protein->var_pos[i].choice[j].resparam_ptr->residuetype, protein->var_pos[i].seq_position); /* the side-bkbn for this file does not exist, so it needs to be made */ /* precalculating all rotamer-rotamer energies, so go ahead and include it */ if(does_this_file_exist(dummystring)==0 || protein->parameters.disk_lookup_table_flag==1) if(protein->var_pos[i].choice[j].resparam_ptr->residuecode > 0) { sprintf(line,"%s%s,",line,protein->var_pos[i].choice[j].resparam_ptr->one_letter_code); ++num_residues_calculate; } ++j; } j=0; while(line[j]!='\0') ++j; if(num_residues_calculate!=0) line[j-1]='.'; fprintf(file_ptr, "%s\n",line); fclose(file_ptr); } } ++i; if(k==num_processors) k=1; else ++k; } /* assemble the final slave inputfiles, part 1 - single job fragments */ k=0; for(i=1;i<=num_processors;++i) { ++k; sprintf(slave_inputfilename,"%s.%d.%d.slave.input", inputfilename, GET_PID, k); cp_file(inputfile_top_filename, slave_inputfilename); file_ptr = fopen_file(slave_inputfilename,"a"); fprintf(file_ptr,"LOGFILE_FLAG 0\n"); fprintf(file_ptr,"OTHER_RESIDUES none\n"); fprintf(file_ptr,"OUTPUT_PREFIX %s.%d.%d.slave\n",inputfilename, GET_PID, k); fprintf(file_ptr,"VARIABLE_POSITIONS\n"); fclose(file_ptr); sprintf(job_frag_filename,"%s.%d.%d.job_frag", inputfilename, GET_PID, i); touch_file(job_frag_filename); append_file(job_frag_filename, slave_inputfilename); } /* assemble the final slave inputfiles, part 2 - pairs of job fragments */ if(protein->parameters.disk_lookup_table_flag==1) /* if not 1, then pairs will be calc'd as needed */ { for(i=1;i<=num_processors;++i) { for(j=(i+1);j<=num_processors;++j) { ++k; sprintf(slave_inputfilename,"%s.%d.%d.slave.input", inputfilename, GET_PID, k); cp_file(inputfile_top_filename, slave_inputfilename); file_ptr = fopen_file(slave_inputfilename,"a"); fprintf(file_ptr,"LOGFILE_FLAG 0\n"); fprintf(file_ptr,"OTHER_RESIDUES none\n"); fprintf(file_ptr,"OUTPUT_PREFIX %s.%d.%d.slave\n",inputfilename, GET_PID, k); fprintf(file_ptr,"VARIABLE_POSITIONS\n"); fclose(file_ptr); sprintf(job_frag_filename,"%s.%d.%d.job_frag", inputfilename, GET_PID, i); touch_file(job_frag_filename); append_file(job_frag_filename, slave_inputfilename); sprintf(job_frag_filename,"%s.%d.%d.job_frag", inputfilename, GET_PID, j); touch_file(job_frag_filename); append_file(job_frag_filename, slave_inputfilename); // printf("%d\t%d\t%d\n",i,j,k); } } } num_jobs = k; /* clean up files */ rm_file(inputfile_top_filename); for(k=1;k<=num_processors;++k) { sprintf(job_frag_filename,"%s.%d.%d.job_frag", inputfilename, GET_PID, k); rm_file(job_frag_filename); } /* launch foremen processes on remote machines */ sprintf(command,"%s %s lookup_table_foreman %d %d", EXECUTABLE_FILENAME, inputfilename, GET_PID, num_jobs); launch_command(CURRENT_WORKING_DIRECTORY, command); /* wait until all the slaves inputs are working or done */ all_jobs_done_flag = 0; k=0; while(all_jobs_done_flag == 0) { sleep(2); all_jobs_done_flag = 0; for(i=1;i<=num_jobs;++i) { sprintf(slave_inputfilename,"%s.%d.%d.slave.input", inputfilename, GET_PID, i); /* if an inputfile exists, we're not done yet */ if(does_this_file_exist(slave_inputfilename) == 1) { ++all_jobs_done_flag; } } if(all_jobs_done_flag==1) /* only one job left to start; start counting to 200 sec */ { ++k; all_jobs_done_flag=0; /* reset and wait */ if(k>=150) /* we've waited 5 min, with only one file left yet, this one file still exists */ all_jobs_done_flag=1; /* something is wrong; move onto next step */ } else if(all_jobs_done_flag==0) all_jobs_done_flag=1; else all_jobs_done_flag=0; } /* wait for working slaves to finish */ all_jobs_done_flag = 0; k = 0; while(all_jobs_done_flag == 0) { sleep(2); all_jobs_done_flag = 1; for(i=1;i<=num_jobs;++i) { sprintf(working_inputfilename,"%s.%d.%d.slave.working", inputfilename, GET_PID, i); /* if a workingfile exists, we're not done yet */ if(does_this_file_exist(working_inputfilename) == 1) { all_jobs_done_flag = 0; /* a job hung or is taking a long time for some reason, so bail; the data can be generated later by the master */ if(k >= 150) all_jobs_done_flag = 1; } } ++k; } /* clean up files */ for(i=1;i<=num_jobs;++i) { sprintf(slave_inputfilename,"%s.%d.%d.slave.done", inputfilename, GET_PID, i); if(does_this_file_exist(slave_inputfilename)==1) rm_file(slave_inputfilename); sprintf(slave_inputfilename,"%s.%d.%d.slave.working", inputfilename, GET_PID, i); if(does_this_file_exist(slave_inputfilename)==1) rm_file(slave_inputfilename); /* there shouldn't be any of these, but sometimes bad things happen */ sprintf(slave_inputfilename,"%s.%d.%d.slave.input", inputfilename, GET_PID, i); if(does_this_file_exist(slave_inputfilename)==1) rm_file(slave_inputfilename); } free_memory(command); free_memory(line); free_memory(dummystring); free_memory(inputfile_top_filename); free_memory(processor_name); free_memory(job_frag_filename); free_memory(slave_inputfilename); free_memory(working_inputfilename); } /* this is launched from a lookup_table_master job; this function launches EGAD lookup_table slaves using the slave inputfiles generated by the master. Once all the inputfiles are done or working, exit. */ void lookup_table_foreman(char *inputfilename, int master_pid, int num_jobs) { int i, all_jobs_done_flag,ctr; char *slave_inputfilename, *command, *working_inputfilename, *done_inputfilename, *line; extern char *EXECUTABLE_FILENAME, *CURRENT_WORKING_DIRECTORY, *INPUTFILENAME; extern int QUIET_FLAG; if(INPUTFILENAME==NULL) INPUTFILENAME = inputfilename; command = (char *)calloc(2*MAXLINE,sizeof(char)); if(EXECUTABLE_FILENAME==NULL) failure_report("ERROR For lookup_table_master, EXECUTABLE_FILENAME must be defined","exit"); else { if(does_this_file_exist(EXECUTABLE_FILENAME)==0) { sprintf(command,"ERROR EXECUTABLE_FILENAME %s does not exist.",EXECUTABLE_FILENAME); failure_report(command,"exit"); } } slave_inputfilename = (char *)calloc(MAXLINE,sizeof(char)); working_inputfilename = (char *)calloc(MAXLINE,sizeof(char)); done_inputfilename = (char *)calloc(MAXLINE,sizeof(char)); line = (char *)calloc(MAXLINE,sizeof(char)); if(grep_line_from_file("QUIET",line,MAXLINE,inputfilename)==0) QUIET_FLAG=1; else sscanf(line,"%s %d",done_inputfilename,&QUIET_FLAG); if(CURRENT_WORKING_DIRECTORY==NULL) CURRENT_WORKING_DIRECTORY = (char *)calloc(MAXLINE,sizeof(char)); sprintf(CURRENT_WORKING_DIRECTORY,"%s",getenv("PWD")); all_jobs_done_flag = 0; ctr=0; while(all_jobs_done_flag == 0) { /* loop through the number of lookup table slave jobs */ for(i=1;i<=num_jobs;++i) { sprintf(slave_inputfilename,"%s.%d.%d.slave.input", inputfilename, master_pid, i); sprintf(done_inputfilename,"%s.%d.%d.slave.done", inputfilename, master_pid, i); sprintf(working_inputfilename,"%s.%d.%d.slave.working", inputfilename, master_pid, i); /* the file exists, so it's not being worked on, nor is it done */ if(does_this_file_exist(slave_inputfilename) == 1) { /* mv the file to .working so no other slave touches it */ mv_file(slave_inputfilename, working_inputfilename); sprintf(command,"%s %s lookup_table_slave",EXECUTABLE_FILENAME, working_inputfilename); if(system(command) == 0) /* successful execution, so mark this job as done */ { mv_file(working_inputfilename, done_inputfilename); } else /* something bad happened, so place this job back in the queue; let someone else deal with it */ { mv_file(working_inputfilename, slave_inputfilename); } } } sleep(2); /* wait for all jobs on other machines to finish; wait in case some other machine */ all_jobs_done_flag = 1; for(i=1;i<=num_jobs;++i) { sprintf(done_inputfilename,"%s.%d.%d.slave.done", inputfilename, master_pid, i); /* a .done does not exist; we must continue waiting */ if(does_this_file_exist(done_inputfilename) == 0) all_jobs_done_flag = 0; } ++ctr; if(ctr > num_jobs) all_jobs_done_flag = 1; } free_memory(command); free_memory(slave_inputfilename); free_memory(working_inputfilename); free_memory(done_inputfilename); free_memory(line); exit(0); }