1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 Binding incoming pvm-messages to methods.
25 """
26
27 from Biskit.PVM import pvm
28 from threading import Thread
29
30 MSG_PING = 999999
31 MSG_MESSAGELOOP_UP = 999998
32 MSG_INITIALIZE = 999997
33 MSG_INITIALIZATION_DONE = 999996
34 MSG_EXIT = 999995
35
37
48
49
50 - def post(self, message):
51
52 import time
53
54 self.lines.append('\n' + str(time.time()) + ':')
55 self.lines.append(message)
56
57 file = open(self.filename, 'w')
58 file.write('\n'.join(self.lines))
59 file.close()
60
61
63 print '\n'.join(self.lines)
64
65
70
71
72
73
75 """
76 a simple class binding incoming pvm-messages to methods.
77 """
78
79
81 from threading import Event
82
83 Thread.__init__(self)
84
85 self.__mytid = pvm.mytid()
86
87 try:
88
89 self.__parent = pvm.parent()
90 except:
91 self.__parent = None
92
93 self.__bindings = {}
94 self.__stop = 0
95 self.__tasks = {}
96
97 self.setMessageLoopDelay(0.1)
98 self.__loopEvent = Event()
99
100 if log: self.__log = Logfile()
101 else: self.__log = None
102
103
104
105 self.setPingTimeout(5.)
106 self.bind(MSG_PING, -1, self._ping)
107
108
110 return self.__mytid
111
112
114 """
115 @return: process ID of parent
116 @rtype: int
117 """
118 return self.__parent
119
120
122 return self.__bindings
123
124
125 - def bind(self, message_tag, tid, method):
127
128
129 - def unbind(self, tid_message_tag):
131
132
134 """
135 time is in seconds
136 """
137 self.__delay = delay
138
139
141 return self.__delay
142
143
145 self.__loopEvent.clear()
146
147
149 self.__loopEvent.set()
150
151
153 return not self.__loopEvent.isSet()
154
155
157 self.__pingTimeout = t
158
159
161 return self.__pingTimeout
162
163
168
169
172
173
175 return self.__tasks
176
177
179 for nickname, TID in self.getTasks().items():
180 if TID == tid:
181 return nickname
182
183 raise NameError, 'tid not known'
184
185
186 - def spawn(self, pvm_task, nickname = None):
187 child_tid = pvm.spawn(*pvm_task)[0]
188
189 if child_tid > 0:
190
191 if nickname is None:
192 nickname = child_tid
193
194 self.__tasks[nickname] = child_tid
195
196 return child_tid
197
198
203
204
205 - def send(self, task, msg_tag, value = None):
206 """
207 if 'task' is a tuple, msg_tag is send to all the tids
208 given in that list.
209 """
210 from time import time
211
212 if not type(task) == type(()):
213 task = (task,)
214
215 hash = self.getTasks()
216
217 time_stamp = time()
218
219 for t in task:
220
221
222
223 try:
224 tid = hash[t]
225 except:
226 tid = t
227
228 self.send_primitive(tid, msg_tag, (time_stamp, value))
229
230
232 from time import time
233
234 tids = tuple(self.getTasks().values())
235
236 time_stamp = time()
237
238 for tid in tids:
239 self.send_primitive(tid, msg_tag, (time_stamp, value))
240
242 import time
243 from Numeric import argsort
244
245 while not self.isStopped():
246
247 bindings = self.getBindings()
248
249
250
251
252 incoming = {}
253
254 for tid, message in bindings.keys():
255
256 if pvm.probe(tid, message):
257
258 pvm.recv(tid, message)
259
260
261
262 parameters = pvm.unpack()
263
264
265
266 value = (message, parameters[0], parameters[1])
267
268 try:
269 incoming[tid].append(value)
270 except:
271 incoming[tid] = [value]
272
273
274
275
276
277
278
279 for tid, values in incoming.items():
280
281 time_stamps = map(lambda v: v[1], values)
282 indices = argsort(time_stamps)
283
284 for i in indices:
285
286 message = values[i][0]
287 parameters = values[i][2]
288
289
290
291 if parameters is None:
292 bindings[(tid, message)]()
293 else:
294 bindings[(tid, message)](*parameters)
295
296
297
298 time.sleep(self.getMessageLoopDelay())
299
300
301
302
303 self.__loopEvent.wait()
304
305
306 - def ping(self, nickname):
322
323
324 - def _ping(self, sender_tid):
326
327
328
329
331 if self.__log:
332 self.__log.write()
333
334
335 - def post(self, message):
336 if self.__log:
337 self.__log.post(message)
338
339
340 - def post_message_received(self, msg_tag, tid, params):
341 log_msg = 'PVMThread: received and unpacked: ' + \
342 'msg_tag = %d, tid = %d\n' %(msg_tag, tid)
343 log_msg += '[params = %s]' %str(params)
344 self.post(log_msg)
345
346
347 - def post_message_sent(self, msg_tag, tid, params):
348 log_msg = 'PVMThread: message sent: ' + \
349 'msg_tag = %d, tid = %d\n' %(msg_tag, tid)
350 log_msg += '[params = %s]' %str(params)
351 self.post(log_msg)
352
353
354 - def post_execute_method(self, msg_tag, tid, params):
355 log_msg = 'PVMThread: execute method bound to: ' + \
356 'msg_tag = %d, tid = %d\n' %(msg_tag, tid)
357 log_msg += '[params = %s]' %str(params)
358 self.post(log_msg)
359
361 if self.__log:
362 self.__log.rm()
363
364
366
367 - def __init__(self, verbose=1, *arg, **kw):
380
381
382 - def spawn(self, pvm_task, nickname = None):
396
397
408
409
411 """
412 called by slave when its messge-loop has been started
413 """
414
415
416
417 init_params = self.getInitParameters(slave)
418
419 if init_params is not None:
420 init_params = (init_params,)
421
422 self.send(slave, MSG_INITIALIZE, init_params)
423
424
426 return None
427
428
430 pass
431
432
442
443
445 pass
446
447
449 parent = self.getParent()
450
451
452 if parent is None:
453
454 for nickname, tid in self.getTasks().items():
455
456 if tid != self.getTID():
457
458 self.send(nickname, MSG_EXIT, None)
459 if self.verbose: print nickname, 'shutting down...'
460 else:
461 pvm.kill(self.getTID())
462
463
464
465 self.stop()
466