1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 """
26 Parallize calculation of pairwise rmsd between the frames of a trajectory
27 """
28 import Biskit.hosts as hosts
29
30 import Biskit.tools as T
31 import Biskit.settings as settings
32 import Biskit.mathUtils as mathUtils
33 from Biskit.Errors import BiskitError
34 from Biskit.LogFile import ErrLog, LogFile
35 from Biskit.EnsembleTraj import EnsembleTraj, traj2ensemble
36
37 import tempfile
38 import Numeric as N
39 import os, time
40
41
42
43 from Biskit.PVM import TrackingJobMaster
44
45
47 pass
48
49
51 """
52 Parallize calculation of pairwise rmsd between the frames of one or two
53 trajectories.
54 """
55
56 slave_script = T.projectRoot() + '/Biskit/TrajFlexSlave.py'
57
58 - def __init__(self, traj1, traj2=None, aMask=None, hosts=hosts.cpus_all,
59 niceness=hosts.nice_dic, show_output=0, add_hosts=0,
60 log=None, slaveLog=None, verbose=1,
61 only_off_diagonal=1, only_cross_member=0):
62 """
63 @param traj1: Trajectory or EnsembleTraj, traj1 and 2 must have the
64 same atom content. If only traj1 is given, the pairwise
65 rms is calculated between its frames.
66 @type traj1: Trajectory OR EnsembleTraj
67 @param traj2: see traj1
68 @type traj2: Trajectory OR EnsembleTraj
69 @param aMask: atom mask, consider only subset of atoms (default: all)
70 @type aMask: [0|1]
71 @param hosts: slave hosts to be used
72 (default: L{Biskit.hosts.cpus_all})
73 @type hosts: [str]
74 @param niceness: { str:int, 'default':int }, nice value for each
75 host
76 @type niceness: dict
77 @param show_output: open xterm window for each slave (default: 0)
78 @type show_output: 0|1
79 @param add_hosts: add hosts to PVM before starting (default: 0)
80 @type add_hosts: 1|0
81 @param log: log file for master (default: None-> StdErr )
82 @type log: LogFile
83 @param slaveLog: slave log (default: None->'TrajFlexSlave_errors.log')
84 @type slaveLog: LogFile
85 @param verbose: print progress infos (default: 1)
86 @type verbose: 0|1
87 @param only_off_diagonal: Don't calculate self-rms of frames.
88 Only considered for a single trajectory
89 (default: 1)
90 @type only_off_diagonal: 0|1
91 @param only_cross_member: Don't calculate rms between frames from
92 same member trajectory only considered for
93 a single trajectory(requires EnsembleTraj)
94 (default: 0)
95 @type only_cross_member: 0|1
96 """
97
98 self.outFolder = tempfile.mktemp('trajFlex_',
99 dir=settings.tempDirShared )
100 os.mkdir( self.outFolder )
101
102 self.log = log or ErrLog()
103 self.slaveLog = slaveLog or LogFile('TrajFlexSlave_errors.log')
104 self.verbose = verbose
105 self.hosts = hosts
106
107 self.traj_1 = traj1
108 self.traj_2 = traj2
109
110 self.only_off_diagonal = traj2 is None and only_off_diagonal
111 self.only_cross_member = traj2 is None and only_cross_member
112
113 self.trajMap = None
114 if traj2 is None:
115 self.trajMap = self.memberMap( traj1 )
116
117
118 frame_files_1 = self.__dumpFrames( traj1, self.outFolder, 'traj1' )
119
120 frame_files_2 = self.__dumpFrames( traj2, self.outFolder, 'traj2' )
121
122
123 self.tasks = self.__taskDict( frame_files_1, frame_files_2)
124
125 chunk_size = 1
126 TrackingJobMaster.__init__(self, self.tasks, chunk_size,
127 hosts, niceness, self.slave_script,
128 show_output=show_output, verbose=verbose,
129 add_hosts=add_hosts)
130
131
133 """
134 @param slave_tid: slave task id
135 @type slave_tid: int
136
137 @return: dictionary with init parameters
138 @rtype: {param:value}
139 """
140 return {'ferror':self.slaveLog.fname,
141 'trajMap':self.trajMap,
142 'only_off_diagonal':self.only_off_diagonal,
143 'only_cross_member':self.only_cross_member }
144
145
147 """
148 @param n_per_node: how many chunks should be generated per node
149 @type n_per_node: int
150 @param n_nodes: number of slave nodes
151 @type n_nodes: int
152 @param n_frames: length of trajectory
153 @type n_frames: int
154
155 @return: calculate number of frames per chunk
156 @rtype: int
157 """
158 r = int(round( n_frames * 1.0 / N.sqrt(n_per_node * n_nodes) ))
159 if r > 25:
160 return r
161
162 return 25
163
164
166 """
167 Tidy up.
168 """
169 T.tryRemove( self.outFolder, verbose=self.verbose, tree=1 )
170
171
173 """
174 Divide frame indices into chunks.
175
176 @param n_frames: number of frames per chunk
177 @type n_frames: int
178
179 @return: list with start and stop frame index of each chunk
180 @rtype: [(int, int)]
181 """
182 if traj is None:
183 return None
184
185 l = len( traj )
186
187 n, rest = l / n_frames, l % n_frames
188 if rest:
189 n += 1
190
191
192 i_windows = []
193
194 for i in range(0,n):
195
196 start, stop = i*n_frames, i*n_frames + n_frames
197
198 if stop > l: stop = l
199 if i == n-1:
200 stop = l
201
202 i_windows.append( (start, stop) )
203
204 return i_windows
205
206
208 """
209 @param f_frames_1: file name of chunk 1 of frames
210 @type f_frames_1: {(int, int) : str}
211 @param f_frames_2: file name of chunk 2 of frames
212 @type f_frames_2: {(int, int) : str}
213
214 @return: { ((int, int),(int, int)) : (str, str) }
215 @rtype: {((int, int),(int, int)) : (str, str)}
216 """
217 intra_traj = f_frames_2 is None
218 if intra_traj:
219 if self.verbose:
220 self.log.add('Intra-trajectory calculation requested.')
221 f_frames_2 = f_frames_1
222
223 if self.verbose: self.log.add_nobreak('setting up task list...')
224
225 i_windows = f_frames_1.keys()
226 j_windows = f_frames_2.keys()
227
228
229 tasks = {}
230
231 for i in range( len(i_windows) ):
232 for j in range( i * intra_traj, len(j_windows) ):
233
234 start_i, stop_i = i_windows[i]
235 start_j, stop_j = j_windows[j]
236
237 key = ((i_windows[i], j_windows[j]))
238
239 tasks[key] = ( f_frames_1[ i_windows[i] ],
240 f_frames_2[ j_windows[j] ] )
241
242 if self.verbose: self.log.add('done')
243
244 return tasks
245
246
248 """
249 @param traj: Trajectory
250 @type traj: Trajectory
251 @param outFolder: folder for pickled arrays
252 @type outFolder: str
253 @param prefix: file name prefix
254 @type prefix: str
255 @return: { (int,int) : str } OR None, if traj is None
256 @rtype: {(int,int) : str}
257 """
258 if traj is None:
259 return None
260
261 if self.verbose: self.log.add_nobreak('dumping frame chunks...')
262
263 n_frames = self.__windowSize( 20, len( self.hosts ), len( traj ) )
264
265 i_windows = self.__getFrameWindows( traj, n_frames )
266
267 r = {}
268
269 for i in range( len(i_windows) ):
270
271 w = i_windows[i]
272
273 a = traj.frames[ w[0]:w[1] ]
274 f = outFolder + '/%s_%i_to_%i.dat' % ((prefix,) + w)
275 T.Dump( a, f )
276 r[w] = f
277
278 if self.verbose and i % (len(i_windows)/50 + 1) == 0:
279 self.log.add_nobreak('#')
280
281 if self.verbose: self.log.add('done')
282
283 return r
284
285
287 """
288 Tell which traj frame belongs to which member trajectory.
289
290 @param traj: Trajectory
291 @type traj: Trajectory
292
293 @return: member index of each frame OR None if traj is
294 not a EnsembleTraj
295 @rtype: [ int ] OR None
296 """
297 if not isinstance( traj, EnsembleTraj ):
298 return None
299
300 r = N.zeros( len(traj), 'i' )
301
302 for i in range( traj.n_members ):
303
304 mi = traj.memberIndices( i )
305 N.put( r, mi, i )
306
307 return r.tolist()
308
309
311 """
312 Get result matrix ordered such as input trajectory.
313
314 @param mirror: mirror the matrix at diagonal (default: 1)
315 (only for intra-traj)
316 @type mirror: 1|0
317
318 @return: array( (n_frames, n_frames), 'f'), matrix of pairwise rms
319 @rtype: array
320 """
321 if self.verbose: self.log.add_nobreak('assembling result matrix...')
322
323 intra_traj = self.traj_2 is None
324
325 n1 = n2 = len( self.traj_1 )
326 if self.traj_2 is not None:
327 n2 = len( self.traj_2 )
328
329 a = N.zeros( (n1,n2), 'f' )
330
331 if self.verbose: self.log.add_nobreak('#')
332
333 for key, value in self.result.items():
334 i_start, i_stop = key[0]
335 j_start, j_stop = key[1]
336
337 window = N.reshape( value, (i_stop-i_start, j_stop-j_start) )
338 window = window.astype('f')
339
340 a[i_start:i_stop, j_start:j_stop] = window
341
342 if self.verbose: self.log.add_nobreak('#')
343
344 if intra_traj:
345 for i in range( N.shape(a)[0] ):
346 for j in range( i, N.shape(a)[1] ):
347 if a[j,i] == 0:
348 a[j,i] = a[i,j]
349 else:
350 a[i,j] = a[j,i]
351
352 if self.verbose: self.log.add_nobreak('#')
353
354 if intra_traj and not mirror:
355 for i in range( N.shape(a)[0] ):
356 for j in range( i, N.shape(a)[1] ):
357 a[j,i] = 0.
358
359 if self.verbose: self.log.add('done')
360
361 return a
362
363
365 """
366 Get result matrix ordered first by member then by time. (requires
367 EnsembleTraj)
368
369 @param mirror: mirror matrix at diagonal (only for intra-traj. rms)
370 (default: 0)
371 @type mirror: 0|1
372
373 @param step: take only every step frame [1]
374 @type step: int
375 """
376 intra_traj = self.traj_2 is None
377
378 m = self.getResult( mirror=intra_traj )
379
380 i1 = i2 = self.traj_1.argsortMember( step=step )
381
382 if self.traj_2 is not None:
383 i2 = self.traj_2.argsortMember( step=step )
384
385 a = N.take( m, i1, 0 )
386 a = N.take( a, i2, 1 )
387
388 if intra_traj and not mirror:
389 for i in range( N.shape(a)[0] ):
390 for j in range( i, N.shape(a)[1] ):
391 a[j,i] = 0.
392
393 return a
394
395
397 """
398 @return: list of all calculated pairwise rms values
399 @rtype: [float]
400
401 @raise FlexError: if there are no results yet
402 """
403 r = []
404 for v in self.result.values():
405 r.extend( v )
406
407 if not r:
408 raise FlexError, "No results yet."
409
410 return r
411
412
414 """
415 @return: average pairwise rmsd and it's standard deviation
416 @rtype: (float, float)
417
418 @raise FlexError: if there are no results yet
419 """
420 r = self.rmsList()
421 return N.average(r), mathUtils.SD(r)
422
423
424
425
426
427
429 """
430 Test class
431 """
432
433 - def run( self, local=0 ):
434 """
435 run function test
436
437 @param local: transfer local variables to global and perform
438 other tasks only when run locally
439 @type local: 1|0
440
441 @return: 1
442 @rtype: int
443 """
444 from Biskit.MatrixPlot import MatrixPlot
445 from RandomArray import random
446
447 traj_1 = T.Load( T.testRoot() + '/lig_pcr_00/traj.dat' )
448 traj_1 = traj2ensemble( traj_1 )
449
450
451
452 frames = []
453 for i in range( len( traj_1 ) ):
454 f = traj_1.frames[i]
455 d = N.zeros( N.shape( f ), 'f')
456 if i > 0:
457 d = random( N.shape( f ) ) * ((i / 10) + 1)
458 frames += [f + d]
459
460 traj_2 = traj_1.clone()
461 traj_2.frames = frames
462
463 master = TrajFlexMaster( traj_1, traj_2,
464 hosts=hosts.cpus_all,
465 show_output=local,
466 add_hosts=1,
467 log=None,
468 slaveLog=None,
469 verbose=local,
470 only_cross_member=0 )
471
472 r = master.calculateResult( mirror=0 )
473
474 if local:
475 p = MatrixPlot( r, palette='plasma2', legend=1 )
476 p.show()
477 globals().update( locals() )
478
479 return 1
480
481
483 """
484 Precalculated result to check for consistent performance.
485
486 @return: 1
487 @rtype: int
488 """
489 return 1
490
491
492
493 if __name__ == '__main__':
494
495 test = Test()
496
497 assert test.run( local=1 ) == test.expected_result()
498