1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 """
26 Thread - save job handling for JobMaster (see L{dispatcher}).
27 """
28
29 from threading import Thread, RLock, Condition
30
32 """
33 Keep track of objects that are processed by JobMaster.
34
35 Thread-savety:
36 - next_chunk() is synchronized.
37 - deactivate() must be synchronized to the same Status.lock
38 - activate() is not any longer supposed to be called from outside.
39 """
40
41 - def __init__(self, objects, redistribute=1):
42 """
43 @param objects: e.g. job IDs
44 @type objects: any
45 @param redistribute: send out started but not yet finished jobs
46 to idle slaves (to not wait for slow slaves)
47 (default: 1)
48 @type redistribute: 1|0
49 """
50 self.objects = {}
51 map(lambda k, o = self.objects: o.update({k: None}), objects)
52
53 self.redistribute = redistribute
54
55 self.lock = RLock()
56
57
59 """
60 not thread-save, synchronize on self.lock!
61 """
62 if self.objects[item] is None:
63 self.objects[item] = 1
64 else:
65 self.objects[item] += 1
66
67
69 """
70 mark item as processed.
71 not thread-save, synchronize on self.lock!
72 """
73 if self.objects[item] is None:
74 raise '%s has never been activated.' % item
75
76 self.objects[item] = 0
77
78
80 """
81 Not yet finished items. The not-yet-started come first, then
82 come the active items ordered by how often they have already been
83 distributed.
84
85 @return: list of not yet finished items (started or not)
86 @rtype: [object]
87 """
88 o = self.objects
89 r = [ k for k in o.keys() if (o[k] != 0) ]
90
91
92 pairs = [(o[k], k) for k in r ]
93 pairs.sort()
94 r = [ x[1] for x in pairs ]
95
96 return r
97
98
100 """
101 @return: list of not yet started items
102 @rtype: [object]
103 """
104 r = filter(lambda key, s = self: s.objects[key] is None,
105 self.objects.keys())
106
107 return r
108
109
111 """
112 @return: list of currently active (started) items
113 @rtype: [object]
114 """
115 o = self.objects
116 r = [ k for k in o.keys() if (o[k] != None) and (o[k] > 0) ]
117
118 return r
119
120
122 """
123 @return: 1 if all items have been processed and finished
124 @rtype: 1|0
125 """
126 r = not len(filter(lambda v: v <> 0, self.objects.values()))
127
128 return r
129
130
132 """
133 Get next chunk of at most nmax items that need to be processed.
134 Thread-save.
135
136 @param nmax: size of chunk
137 @type nmax: int
138
139 @return: chunk of items to be processed, number of unproc OR
140 None if all items have been processed
141 @rtype: ([object], int) OR None
142 """
143 self.lock.acquire()
144
145 queue = self.not_started()
146
147
148 if not queue and self.redistribute:
149 queue = self.not_done()
150
151 n_left = len(queue)
152
153 if n_left > nmax:
154 queue = queue[:nmax]
155
156 for item in queue:
157 self.__activate(item)
158
159 self.lock.release()
160
161 return queue, n_left
162
163
165 names = { None: 'not_started',
166 0: 'finished',
167 1: 'active'}
168
169 l = []
170
171 for key, value in self.objects.items():
172
173 key = str(key)
174
175 l.append(key + ': ' + names.get(value, 'active+%s'%str(value)))
176
177 l.append('Status: ' + 'not ' * (not self.done()) + 'done')
178
179 return '\n'.join(l)
180
181 __repr__ = __str__
182