Package Biskit :: Package PVM :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module Biskit.PVM.dispatcher

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2002-2005 Wolfgang Rieping 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 19  ## Contributions: Wolfgang Rieping, Raik Gruenberg 
 20  ## $Revision: 2.4 $ 
 21  ## last $Date: 2006/12/18 15:07:22 $ 
 22  ## last $Author: graik $ 
 24  """ 
 25  Manage Master/Slave tasks. 
 26  """ 
 29  from PVMThread import PVMMasterSlave 
 30  import Biskit.settings as settings 
 31  from Status import Status 
 32  from Biskit.PVM import pvmTools 
 33  import pvm, socket 
 35  MSG_JOB_START = 1 
 36  MSG_JOB_DONE = 2 
38 -class JobMaster(PVMMasterSlave):
40 - def __init__(self, data, chunk_size, hosts, niceness, slave_script, 41 show_output = 0, result = None, redistribute=1, verbose=1 ):
42 """ 43 @param data: dict of items to be proessed {id:object}. 44 @type data: dict 45 @param chunk_size: number of items that are processed by a job 46 @type chunk_size: int 47 @param hosts: list of host-names 48 @type hosts: [str] 49 @param niceness: nice dictionary [host-name: niceness] 50 @type niceness: dict 51 @param slave_script: absolute path to slave-script 52 @type slave_script: str 53 @param result: items which have already been processed 54 (ie. they are contained in result) are not 55 processed again. 56 @type result: dict 57 @param redistribute: at the end, send same job out several times 58 (default: 1) 59 @type redistribute: 1|0 60 @param verbose: verbosity level (default: 1) 61 @type verbose: 1|0 62 """ 63 PVMMasterSlave.__init__(self, verbose=verbose) 64 65 ## change names of multiple hosts 66 d = {} 67 68 for host in hosts: 69 if host in d: 70 d[host] += 1 71 else: 72 d[host] = 1 73 74 unique_list = [] 75 76 for host, number in d.items(): 77 if number > 1: 78 79 for i in range(number): 80 nickname = host + '_%d' % i 81 82 d = {'host': host, 83 'nickname': nickname} 84 85 unique_list.append(d) 86 else: 87 d = {'host': host, 88 'nickname': host} 89 90 unique_list.append(d) 91 92 self.hosts = unique_list 93 self.niceness = niceness 94 = data 95 self.slave_script = slave_script 96 self.chunk_size = chunk_size 97 98 self.current_pos = 0 99 self.show_output = show_output 100 101 self.verbose = verbose 102 103 if result is None: 104 result = {} 105 106 self.result = result 107 108 ## set-up status for results 109 items = [] 110 111 for key in data.keys(): 112 if not key in self.result: 113 items.append(key) 114 115 self.status = Status(items, redistribute=redistribute ) 116 117 self.__finished = 0 118 119 if verbose: print 'Processing %d items ...' % len(items)
120 121
122 - def start(self):
123 """ 124 Start slave job 125 """ 126 self.finished = 0 127 128 PVMMasterSlave.start(self) 129 130 self.startMessageLoop() 131 self.spawnAll(self.niceness, self.show_output)
132 133
134 - def getInitParameters(self, slave_tid):
135 """ 136 Override to collect slave initiation parameters. 137 """ 138 return None
139 140
141 - def done(self):
142 """ 143 Override to do something after last job result has been received. 144 """ 145 print "Done"
146 147
148 - def finish( self ):
149 """ 150 This method is called one time, after the master has 151 received the last missing result. 152 """ 153 self.done()
154 155
156 - def __finish(self):
157 """ 158 Call finish(); but only call it once. 159 """ 160 self.status.lock.acquire() 161 162 if not self.__finished: 163 self.finish() 164 self.__finished = 1 165 166 self.status.lock.release()
167 168
169 - def bindMessages(self, slave_tid):
170 """ 171 @param slave_tid: slave task tid 172 @type slave_tid: int 173 """ 174 self.bind(MSG_JOB_DONE, slave_tid, self.__job_done)
175 176
177 - def spawn(self, host, nickname, niceness, show_output = 0):
178 """ 179 Spawn a job. 180 181 @param host: host name 182 @type host: str 183 @param nickname: host nickname (for uniqueness, i.e more than 184 on job vill be run on a multiple cpu machine) 185 @type nickname: str 186 @param niceness: nice dictionary [host-name: niceness] 187 @type niceness: int 188 189 @return: slave task tid 190 @rtype: int 191 """ 192 if show_output: 193 display = socket.gethostname() + ':0.0' 194 195 command = settings.xterm_bin 196 argv = ['-title', str(host), '-geometry', '30x10', 197 '-display', display, '-e', 198 settings.python_bin, '-i', self.slave_script, 199 str(niceness)] 200 else: 201 command = settings.python_bin 202 argv = ['-i', self.slave_script, str(niceness)] 203 204 args = (command, argv, pvm.spawnOpts['TaskHost'], host, 1) 205 206 return PVMMasterSlave.spawn(self, args, nickname)
207 208
209 - def spawnAll(self, niceness, show_output = 0):
210 """ 211 Spawn many jobs. 212 213 @param niceness: nice dictionary [host-name: niceness] 214 @type niceness: dict 215 """ 216 self.slaves = {} 217 218 for d in self.hosts: 219 host = d['host'] 220 nickname = d['nickname'] 221 222 try: 223 nice = niceness[host] 224 except: 225 nice = niceness.get('default', 0) 226 227 slave_tid = self.spawn(host, nickname, nice, show_output) 228 229 if slave_tid <= 0: 230 print 'error spawning', host 231 try: 232 print '\t', pvmTools.pvmerrors[ slave_tid ] 233 except Exception, error: 234 print 'unknown error', error 235 236 else: 237 self.bindMessages(slave_tid) 238 self.slaves[slave_tid] = d 239 if self.verbose: print slave_tid, nickname, 'spawned.'
240 241
242 - def initializationDone(self, slave_tid):
243 """ 244 is called by a slave that has been initialized and 245 is now ready for start-up. 246 247 @param slave_tid: slave task tid 248 @type slave_tid: int 249 """ 250 ## start processing 251 self.__start_job(slave_tid)
252 253
254 - def start_job(self, slave_tid):
255 """ 256 Called when a new job is about to be started. Override to add 257 other startup tasks than the default, see L{__start_job} 258 259 @param slave_tid: slave task tid 260 @type slave_tid: int 261 """ 262 pass
263 264
265 - def get_slave_chunk(self, data_keys ):
266 """ 267 Assemble task dictionary that is send to the slave for a single job. 268 Override this, if the values of are to be changed/created on 269 the fly. 270 271 @param data_keys: subset of keys to 272 @type data_keys: [any] 273 274 @return: dict mapping the data keys to the actual data values 275 @rtype: {any:any} 276 """ 277 chunk = {} 278 279 for id in data_keys: 280 chunk[id] =[id] 281 282 return chunk
283 284
285 - def __start_job(self, slave_tid):
286 """ 287 Tasks performed befor the job is launched. 288 289 @param slave_tid: slave task tid 290 @type slave_tid: int 291 """ 292 self.start_job(slave_tid) 293 294 ## get items that have not been processed 295 queue, n_left = self.status.next_chunk( self.chunk_size) 296 297 if not queue: 298 return 299 300 nickname = self.slaves[slave_tid]['nickname'] 301 if self.verbose: 302 print '%d (%s) %d items left'%(slave_tid, nickname, n_left) 303 304 chunk = self.get_slave_chunk( queue ) 305 306 self.send(slave_tid, MSG_JOB_START, (chunk,))
307 308
309 - def is_valid_slave(self, slave_tid):
310 """ 311 Checked each time before a new job is given to a slave, if 0, the 312 job is given to another slave. Override. 313 314 @param slave_tid: slave task tid 315 @type slave_tid: int 316 """ 317 return 1
318 319
320 - def job_done(self, slave_tid, result):
321 """ 322 Override to add tasks to be preformend when the job is done 323 (other than the default, see L{__job_done}). 324 325 @param slave_tid: slave task tid 326 @type slave_tid: int 327 """ 328 pass
329 330
331 - def __job_done(self, slave_tid, result):
332 """ 333 Tasks that are preformed when the job is done. 334 335 @param slave_tid: slave task tid 336 @type slave_tid: int 337 @param result: slave result dictionary 338 @type result: dict 339 """ 340 ## synchronize on internal lock of Status to avoid the distribution 341 ## of new items while processed ones are not yet marked "finished" 342 self.status.lock.acquire() 343 344 self.job_done(slave_tid, result) 345 346 self.result.update(result) 347 348 ## mark result as finished. 349 for item in result.keys(): 350 self.status.deactivate(item) 351 352 self.status.lock.release() 353 354 ## once again 355 if not self.is_valid_slave(slave_tid): 356 return 357 358 if not self.status.done(): 359 self.__start_job(slave_tid) 360 else: 361 self.__finish()
362 363
364 -class JobSlave(PVMMasterSlave):
366 - def __init__(self):
367 """ 368 """ 369 PVMMasterSlave.__init__(self) 370 self.setMessageLoopDelay(0.5)
371 372
373 - def start(self):
374 """ 375 """ 376 PVMMasterSlave.start(self) 377 378 self.bindMessages() 379 self.startMessageLoop()
380 381
382 - def bindMessages(self):
383 """ 384 """ 385 parent = self.getParent() 386 self.bind(MSG_JOB_START, parent, self.__go)
387 388
389 - def initialize(self, params):
390 """ 391 Automatically invoked by parent after slave's 392 message-loop is up. Override to use. 393 """ 394 pass
395 396
397 - def go(self, *args, **kw):
398 """ 399 Must be overridden in order to do the actual work. 400 Result should be returned. 401 Default tasks are defined in L{__go}. 402 403 @param args: arguments 404 @type args: (any) 405 @param kw: dictionary with key=value pairs 406 @type kw: {key:value} 407 """ 408 pass
409 410
411 - def __go(self, *args, **kw):
412 """ 413 Startup tasks. 414 415 @param args: arguments 416 @type args: (any) 417 @param kw: dictionary with key=value pairs 418 @type kw: {key:value} 419 """ 420 result = self.go(*args, **kw) 421 422 ## send result back to parent 423 my_tid = self.getTID() 424 425 self.send(self.getParent(), MSG_JOB_DONE, (my_tid, result))
426 427 428 if __name__ == '__main__': 429 430 import os, sys 431 432 if len(sys.argv) == 2: 433 434 niceness = int(sys.argv[1]) 435 os.nice(niceness) 436 437 slave = JobSlave() 438 slave.start() 439