1
2
3
4
5
6
7 """
8 This modules allows for asynchronous execution of Fdist and
9 spliting of loads.
10
11 FDistAsync Allows for the execution of FDist.
12
13 SplitFDist splits a single Fdist execution in several, taking advantage
14 of multi-core architectures.
15
16 """
17
18 import os
19 import shutil
20 import thread
21 from time import sleep
22 from Bio.PopGen.Async import Local
23 from Bio.PopGen.FDist.Controller import FDistController
24
26 """Asynchronous FDist execution.
27 """
28
29 - def __init__(self, fdist_dir = "", ext = None):
30 """Constructor.
31
32 Parameters:
33 fdist_dir - Where fdist can be found, if = "", then it
34 should be on the path.
35 ext - Extension of binary names (e.g. nothing on Unix,
36 ".exe" on Windows
37 """
38 FDistController.__init__(self, fdist_dir, ext)
39
40 - def run_job(self, parameters, input_files):
41 """Runs FDist asynchronously.
42
43 Gets typical Fdist parameters from a dictionary and
44 makes a "normal" call. This is run, normally, inside
45 a separate thread.
46 """
47 npops = parameters['npops']
48 nsamples = parameters['nsamples']
49 fst = parameters['fst']
50 sample_size = parameters['sample_size']
51 mut = parameters.get('mut', 0)
52 num_sims = parameters.get('num_sims', 20000)
53 data_dir = parameters.get('data_dir', '.')
54 is_dominant = parameters.get('is_dominant', False)
55 theta = parameters.get('theta', 0.06)
56 beta = parameters.get('beta', (0.25, 0.25))
57 max_freq = parameters.get('max_freq', 0.99)
58 fst = self.run_fdist(npops, nsamples, fst, sample_size,
59 mut, num_sims, data_dir,
60 is_dominant, theta, beta,
61 max_freq)
62 output_files = {}
63 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r')
64 return fst, output_files
65
67 """Splits a FDist run.
68
69 The idea is to split a certain number of simulations in smaller
70 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This
71 allows to run simulations in parallel, thus taking advantage
72 of multi-core CPUs.
73
74 Each SplitFDist object can only be used to run a single FDist
75 simulation.
76 """
77 - def __init__(self, report_fun = None,
78 num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
79 """Constructor.
80
81 Parameters:
82 report_fun - Function that is called when a single packet is
83 run, it should have a single parameter: Fst.
84 num_thr - Number of desired threads, typically the number
85 of cores.
86 split_size - Size that a full simulation will be split in.
87 ext - Binary extension name (e.g. nothing on Unix, '.exe' on
88 Windows).
89 """
90 self.async = Local.Local(num_thr)
91 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext)
92 self.report_fun = report_fun
93 self.split_size = split_size
94
95
97 """Monitors and reports (using report_fun) execution.
98
99 Every time a partial simulation ends, calls report_fun.
100 IMPORTANT: monitor calls can be concurrent with other
101 events, ie, a tasks might end while report_fun is being
102 called. This means that report_fun should be consider that
103 other events might be happening while it is running (it
104 can call acquire/release if necessary).
105 """
106 while(True):
107 sleep(1)
108 self.async.access_ds.acquire()
109 keys = self.async.done.keys()[:]
110 self.async.access_ds.release()
111 for done in keys:
112 self.async.access_ds.acquire()
113 fst, files = self.async.done[done]
114 del self.async.done[done]
115 out_dat = files['out.dat']
116 f = open(self.data_dir + os.sep + 'out.dat','a')
117 f.writelines(out_dat.readlines())
118 f.close()
119 out_dat.close()
120 self.async.access_ds.release()
121 for file in os.listdir(self.parts[done]):
122 os.remove (self.parts[done] + os.sep + file)
123 os.rmdir(self.parts[done])
124
125 if self.report_fun:
126 self.report_fun(fst)
127 self.async.access_ds.acquire()
128 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \
129 and len(self.async.done) == 0:
130 break
131 self.async.access_ds.release()
132
133
134
135
137 """Allows the external acquisition of the lock.
138 """
139 self.async.access_ds.acquire()
140
142 """Allows the external release of the lock.
143 """
144 self.async.access_ds.release()
145
146
147 - def run_fdist(self, npops, nsamples, fst, sample_size,
148 mut = 0, num_sims = 20000, data_dir='.',
149 is_dominant = False, theta = 0.06, beta = (0.25, 0.25),
150 max_freq = 0.99):
151 """Runs FDist.
152
153 Parameters can be seen on FDistController.run_fdist.
154
155 It will split a single execution in several parts and
156 create separated data directories.
157 """
158 num_parts = num_sims/self.split_size
159 self.parts = {}
160 self.data_dir = data_dir
161 for directory in range(num_parts):
162 full_path = data_dir + os.sep + str(directory)
163 try:
164 os.mkdir(full_path)
165 except OSError:
166 pass
167 if "ss_file" in os.listdir(data_dir):
168 shutil.copy(data_dir + os.sep + "ss_file", full_path)
169 id = self.async.run_program('fdist', {
170 'npops' : npops,
171 'nsamples' : nsamples,
172 'fst' : fst,
173 'sample_size' : sample_size,
174 'mut' : mut,
175 'num_sims' : self.split_size,
176 'data_dir' : full_path,
177 'is_dominant' : is_dominant,
178 'theta' : theta,
179 'beta' : beta,
180 'max_freq' : max_freq
181 }, {})
182 self.parts[id] = full_path
183 thread.start_new_thread(self.monitor, ())
184