1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """
25 Manage Master/Slave tasks.
26 """
27
28
29 from PVMThread import PVMMasterSlave
30 import Biskit.settings as settings
31 from Status import Status
32 from Biskit.PVM import pvmTools
33 import pvm, socket
34
35 MSG_JOB_START = 1
36 MSG_JOB_DONE = 2
37
39
40 - def __init__(self, data, chunk_size, hosts, niceness, slave_script,
41 show_output = 0, result = None, redistribute=1, verbose=1 ):
42 """
43 @param data: dict of items to be proessed {id:object}.
44 @type data: dict
45 @param chunk_size: number of items that are processed by a job
46 @type chunk_size: int
47 @param hosts: list of host-names
48 @type hosts: [str]
49 @param niceness: nice dictionary [host-name: niceness]
50 @type niceness: dict
51 @param slave_script: absolute path to slave-script
52 @type slave_script: str
53 @param result: items which have already been processed
54 (ie. they are contained in result) are not
55 processed again.
56 @type result: dict
57 @param redistribute: at the end, send same job out several times
58 (default: 1)
59 @type redistribute: 1|0
60 @param verbose: verbosity level (default: 1)
61 @type verbose: 1|0
62 """
63 PVMMasterSlave.__init__(self, verbose=verbose)
64
65
66 d = {}
67
68 for host in hosts:
69 if host in d:
70 d[host] += 1
71 else:
72 d[host] = 1
73
74 unique_list = []
75
76 for host, number in d.items():
77 if number > 1:
78
79 for i in range(number):
80 nickname = host + '_%d' % i
81
82 d = {'host': host,
83 'nickname': nickname}
84
85 unique_list.append(d)
86 else:
87 d = {'host': host,
88 'nickname': host}
89
90 unique_list.append(d)
91
92 self.hosts = unique_list
93 self.niceness = niceness
94 self.data = data
95 self.slave_script = slave_script
96 self.chunk_size = chunk_size
97
98 self.current_pos = 0
99 self.show_output = show_output
100
101 self.verbose = verbose
102
103 if result is None:
104 result = {}
105
106 self.result = result
107
108
109 items = []
110
111 for key in data.keys():
112 if not key in self.result:
113 items.append(key)
114
115 self.status = Status(items, redistribute=redistribute )
116
117 self.__finished = 0
118
119 if verbose: print 'Processing %d items ...' % len(items)
120
121
132
133
135 """
136 Override to collect slave initiation parameters.
137 """
138 return None
139
140
142 """
143 Override to do something after last job result has been received.
144 """
145 print "Done"
146
147
149 """
150 This method is called one time, after the master has
151 received the last missing result.
152 """
153 self.done()
154
155
157 """
158 Call finish(); but only call it once.
159 """
160 self.status.lock.acquire()
161
162 if not self.__finished:
163 self.finish()
164 self.__finished = 1
165
166 self.status.lock.release()
167
168
170 """
171 @param slave_tid: slave task tid
172 @type slave_tid: int
173 """
174 self.bind(MSG_JOB_DONE, slave_tid, self.__job_done)
175
176
177 - def spawn(self, host, nickname, niceness, show_output = 0):
178 """
179 Spawn a job.
180
181 @param host: host name
182 @type host: str
183 @param nickname: host nickname (for uniqueness, i.e more than
184 on job vill be run on a multiple cpu machine)
185 @type nickname: str
186 @param niceness: nice dictionary [host-name: niceness]
187 @type niceness: int
188
189 @return: slave task tid
190 @rtype: int
191 """
192 if show_output:
193 display = socket.gethostname() + ':0.0'
194
195 command = settings.xterm_bin
196 argv = ['-title', str(host), '-geometry', '30x10',
197 '-display', display, '-e',
198 settings.python_bin, '-i', self.slave_script,
199 str(niceness)]
200 else:
201 command = settings.python_bin
202 argv = ['-i', self.slave_script, str(niceness)]
203
204 args = (command, argv, pvm.spawnOpts['TaskHost'], host, 1)
205
206 return PVMMasterSlave.spawn(self, args, nickname)
207
208
209 - def spawnAll(self, niceness, show_output = 0):
210 """
211 Spawn many jobs.
212
213 @param niceness: nice dictionary [host-name: niceness]
214 @type niceness: dict
215 """
216 self.slaves = {}
217
218 for d in self.hosts:
219 host = d['host']
220 nickname = d['nickname']
221
222 try:
223 nice = niceness[host]
224 except:
225 nice = niceness.get('default', 0)
226
227 slave_tid = self.spawn(host, nickname, nice, show_output)
228
229 if slave_tid <= 0:
230 print 'error spawning', host
231 try:
232 print '\t', pvmTools.pvmerrors[ slave_tid ]
233 except Exception, error:
234 print 'unknown error', error
235
236 else:
237 self.bindMessages(slave_tid)
238 self.slaves[slave_tid] = d
239 if self.verbose: print slave_tid, nickname, 'spawned.'
240
241
243 """
244 is called by a slave that has been initialized and
245 is now ready for start-up.
246
247 @param slave_tid: slave task tid
248 @type slave_tid: int
249 """
250
251 self.__start_job(slave_tid)
252
253
255 """
256 Called when a new job is about to be started. Override to add
257 other startup tasks than the default, see L{__start_job}
258
259 @param slave_tid: slave task tid
260 @type slave_tid: int
261 """
262 pass
263
264
266 """
267 Assemble task dictionary that is send to the slave for a single job.
268 Override this, if the values of self.data are to be changed/created on
269 the fly.
270
271 @param data_keys: subset of keys to self.data
272 @type data_keys: [any]
273
274 @return: dict mapping the data keys to the actual data values
275 @rtype: {any:any}
276 """
277 chunk = {}
278
279 for id in data_keys:
280 chunk[id] = self.data[id]
281
282 return chunk
283
284
286 """
287 Tasks performed befor the job is launched.
288
289 @param slave_tid: slave task tid
290 @type slave_tid: int
291 """
292 self.start_job(slave_tid)
293
294
295 queue, n_left = self.status.next_chunk( self.chunk_size)
296
297 if not queue:
298 return
299
300 nickname = self.slaves[slave_tid]['nickname']
301 if self.verbose:
302 print '%d (%s) %d items left'%(slave_tid, nickname, n_left)
303
304 chunk = self.get_slave_chunk( queue )
305
306 self.send(slave_tid, MSG_JOB_START, (chunk,))
307
308
310 """
311 Checked each time before a new job is given to a slave, if 0, the
312 job is given to another slave. Override.
313
314 @param slave_tid: slave task tid
315 @type slave_tid: int
316 """
317 return 1
318
319
321 """
322 Override to add tasks to be preformend when the job is done
323 (other than the default, see L{__job_done}).
324
325 @param slave_tid: slave task tid
326 @type slave_tid: int
327 """
328 pass
329
330
332 """
333 Tasks that are preformed when the job is done.
334
335 @param slave_tid: slave task tid
336 @type slave_tid: int
337 @param result: slave result dictionary
338 @type result: dict
339 """
340
341
342 self.status.lock.acquire()
343
344 self.job_done(slave_tid, result)
345
346 self.result.update(result)
347
348
349 for item in result.keys():
350 self.status.deactivate(item)
351
352 self.status.lock.release()
353
354
355 if not self.is_valid_slave(slave_tid):
356 return
357
358 if not self.status.done():
359 self.__start_job(slave_tid)
360 else:
361 self.__finish()
362
363
365
371
372
380
381
387
388
390 """
391 Automatically invoked by parent after slave's
392 message-loop is up. Override to use.
393 """
394 pass
395
396
397 - def go(self, *args, **kw):
398 """
399 Must be overridden in order to do the actual work.
400 Result should be returned.
401 Default tasks are defined in L{__go}.
402
403 @param args: arguments
404 @type args: (any)
405 @param kw: dictionary with key=value pairs
406 @type kw: {key:value}
407 """
408 pass
409
410
411 - def __go(self, *args, **kw):
412 """
413 Startup tasks.
414
415 @param args: arguments
416 @type args: (any)
417 @param kw: dictionary with key=value pairs
418 @type kw: {key:value}
419 """
420 result = self.go(*args, **kw)
421
422
423 my_tid = self.getTID()
424
425 self.send(self.getParent(), MSG_JOB_DONE, (my_tid, result))
426
427
428 if __name__ == '__main__':
429
430 import os, sys
431
432 if len(sys.argv) == 2:
433
434 niceness = int(sys.argv[1])
435 os.nice(niceness)
436
437 slave = JobSlave()
438 slave.start()
439