1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """
25 Add some extra functionality to JobMaster
26 """
27
28 from Biskit.PVM.dispatcher import JobMaster
29 import pvmTools as pvm
30 from Biskit.PVM.Status import Status
31 import Biskit.tools as T
32
33 from threading import Thread, RLock, _RLock, Condition, _Condition
34 import time
35 import copy
36
38 """
39 TrackingJobMaster
40
41 This class extends JobMaster with the following extras:
42 - reporting of the average time each slave spends on a job
43 - automatic adding of slave computers to PVM
44 - different ways to be notified of a completed calculation
45 - restarting of interrupted calculations
46
47 The calculation is performed non-blocking in a thread after a call
48 to master.start().
49 The end of calculation is signalled on master.lock / master.lockMsg.
50 The result can then be obtained with getResult().
51
52 Alternatively, a callback method can be registered that is called
53 after the calculation finished (master.setCallback()).
54
55 The perhaps easiest (but also least flexible) way is to instead use the
56 calculateResult() method. This starts the calculation and blocks execution
57 until the result is returned.
58
59 Consider overriding cleanup(), done() and getResult().
60
61 An interrupted calculation can be restarted from a restart file:
62 - during calculation, pickle the result of getRst() to a file
63 - call the script Biskit/restartPVM -i |file_name|
64
65 Manual restart is possible as follows:
66 1. pickle master.data, master.result, master.status.objects
67 2. master.exit() / Exception / kill, etc.
68 3. initialize master with same parameters as before
69 4. unpickle and re-assign master.data, master.result,
70 master.status.objects
71 5. master.start()
72
73 @note: The master sends out an exit signal to all slaves but doesn't
74 wait for a response (there isn't any) and continues in the finish()
75 method. Since, at the end, the same job is distributed to several slaves,
76 some of them might still be running when cleanup() or done() are
77 executed. The slave script must tolerate errors that, e.g., happen
78 if cleanup() is called while it is running.
79
80 @todo: try finding some solution to the problem where the master
81 sends out an exit signal to all slaves but doesn't wait for
82 a response (see note)
83 @todo: test restart function
84 @todo: restart data are not automatically saved (e.g. in intervals)
85 """
86
87 - def __init__(self, data={}, chunk_size=5,
88 hosts=[], niceness={'default':20},
89 slave_script='', verbose=1,
90 show_output=0, add_hosts=1, redistribute=1 ):
91 """
92 @param data: dict of items to be processed
93 @type data: {str_id:any}
94 @param chunk_size: number of items that are processed per job
95 @type chunk_size: int
96 @param hosts: list of host-names
97 @type hosts: [str]
98 @param niceness: host niceness dictionary {str_host-name: int_niceness}
99 @type niceness: {str:int}
100 @param slave_script: absolute path to slave-script
101 @type slave_script: str
102 @param verbose: verbosity level (default: 1)
103 @type verbose: 1|0
104 @param show_output: display one xterm per slave (default: 0)
105 @type show_output: 1|0
106 @param add_hosts: add hosts to PVM before starting (default: 1)
107 @type add_hosts: 1|0
108 @param redistribute: at the end, send same job out several times
109 (default: 1)
110 @type redistribute: 1|0
111 """
112 if add_hosts:
113 if verbose: T.errWrite('adding %i hosts to pvm...' % len(hosts) )
114 pvm.addHosts( hosts=hosts )
115 if verbose: T.errWriteln('done')
116
117 JobMaster.__init__( self, data, chunk_size, hosts, niceness,
118 slave_script, show_output=show_output,
119 redistribute=redistribute, verbose=verbose )
120
121 self.progress = {}
122
123 self.disabled_hosts = []
124 self.slow_hosts = {}
125
126 self.verbose = verbose
127
128
129 self.lock = RLock()
130 self.lockMsg = Condition( self.lock )
131
132
133 self.call_done = None
134
135
137 """
138 Get nickname of host from TaskID.
139
140 @param slave_tid: slave task tid
141 @type slave_tid: int
142 """
143 nickname = self.nicknameFromTID( slave_tid )
144 return nickname.split('_')[0]
145
146
148 """
149 Override JobMaster method to disable slow nodes on the fly
150
151 @param slave_tid: slave task tid
152 @type slave_tid: int
153 """
154 return self.hostnameFromTID( slave_tid ) not in self.disabled_hosts
155
156
158 """
159 @param host_list: list of hosts
160 @type host_list: [str]
161 @param slow_factor: factor describing the calculation speed of a node
162 @type slow_factor: float
163 """
164 for h in host_list:
165 self.slow_hosts[h] = slow_factor
166
167
169 """
170 Overriding JobMaster method
171
172 @param slave_tid: slave task tid
173 @type slave_tid: int
174 """
175 host = self.nicknameFromTID( slave_tid )
176
177 d = {'given':0, 'done':0, 'time':0 }
178 if self.progress.has_key( host ):
179 d = self.progress[ host ]
180
181 d['given'] += 1
182 d['timeStart'] = time.time()
183
184 self.progress[ host ] = d
185
186
187 - def job_done( self, slave_tid, result ):
188 """
189 Overriding JobMaster method
190
191 @param slave_tid: slave task tid
192 @type slave_tid: int
193 @param result: slave result dictionary
194 @type result: dict
195 """
196 host = self.nicknameFromTID( slave_tid )
197
198 self.progress[host]['done'] += 1
199 self.progress[host]['time'] = time.time() \
200 - self.progress[host]['timeStart']
201
202
204 """
205 Report how many jobs were processed in what time per host.
206 """
207 if self.verbose:
208 print 'host \tgiven\tdone\t time'
209 for host in self.progress:
210
211 d = self.progress[host]
212 print '%-25s\t%i\t%i\t%6.2f s' %\
213 (host, d['given'], d['done'], d['time'])
214
215
217 """
218 Register function to be called after calculation is finished.
219 @param funct: will be called with an instance of the master
220 as single argument
221 @type funct: function
222 """
223 self.call_done = funct
224
225
227 """
228 Called after exit. Override.
229 """
230 pass
231
232
234 """
235 Called by finish() after exit(), cleanup(), and reportProgress(), but
236 before thread notification (notifyAll() ) and before executing
237 the callBack method. Override.
238 """
239 pass
240
241
243 """
244 Notify thread waiting on self.lockMsg that master has finished.
245 """
246 self.lock.acquire()
247 self.lockMsg.notifyAll()
248 self.lock.release()
249
250
252 """
253 Called one time, after last job result has been received. It should
254 not be necessary to override this further. Override done() instead.
255 """
256 self.exit()
257 self.cleanup()
258
259 self.reportProgress()
260
261 self.done()
262
263 self.notifyAll()
264
265 if self.call_done:
266 self.call_done( self )
267
268
270 """
271 Return result dict, if it is available.
272 Override to return something else - which will also be the return value
273 of calculateResult().
274
275 @param arg: keyword-value pairs, for subclass implementations
276 @type arg: {key:value}
277
278 @return: {any:any}
279 @rtype: {any:any}
280 """
281 return self.result
282
283
285 """
286 Convenience function that is starting the parallel calculation and
287 blocks execution until it is finished.
288
289 @param arg: keyword-value pairs, for subclass implementations
290 @type arg: {key:value}
291
292 @return: array( (n_frames, n_frames), 'f'), matrix of pairwise rms
293 @rtype: array
294 """
295 self.start()
296
297 self.lock.acquire()
298 self.lockMsg.wait()
299 self.lock.release()
300
301 return self.getResult( **arg )
302
303
305 """
306 Get data necessary for a restart of the running calculation.
307 Locks, file handles and private data are *NOT* saved.
308 Override if necessary but call this method in child method.
309
310 @return: {..}, dict with 'pickleable' fields of master
311 @rtype: dict
312 """
313 self.status.lock.acquire()
314
315
316 rst = {}
317 for k,v in self.__dict__.items():
318
319 skip = 0
320 for t in [ Thread, _RLock, _Condition, Status, file ]:
321 if isinstance( v, t ):
322 skip = 1
323
324 if str(k)[0] == '_':
325 skip = 1
326
327 if not skip:
328 rst[k] = copy.copy( v )
329
330 rst['status_objects'] = copy.deepcopy( self.status.objects )
331 rst['master_class'] = self.__class__
332
333 self.status.lock.release()
334
335 return rst
336
337
339 """
340 Pickle data necessary for a restart of the running calculation.
341
342 @param fname: file name
343 @type fname: str
344 """
345 T.Dump( self.getRst(), fname )
346
347
348 - def setRst( self, rst_data ):
349 """
350 Prepare this master for restart, called by restart().
351 Override if necessary but call in child.
352
353 @param rst_data: {..}, parameters for master.__dict__ + some
354 special fields
355 @type rst_data: dict
356
357 @return: {..}, parameters for master.__dict__ without special fields
358 @rtype: dict
359 """
360 self.__class__ = rst_data['master_class']
361 self.status.objects = rst_data['status_objects']
362
363 del rst_data['master_class']
364 del rst_data['status_objects']
365
366 return rst_data
367
368
370 """
371 @param rst_data: restart data
372 @type rst_data: dict
373 @param params: key-value pairs, for subclass implementations
374 @type params: {key:value}
375 """
376
377 master = TrackingJobMaster( **params )
378
379
380 rst_data = master.setRst( rst_data )
381
382
383 master.__dict__.update( rst_data )
384
385 return master
386