12 #ifndef RD_LEADERPICKER_H
13 #define RD_LEADERPICKER_H
42 : default_threshold(threshold), default_nthreads(1) {}
44 : default_threshold(threshold), default_nthreads(nthreads) {}
67 unsigned int pickSize)
const;
71 unsigned int pickSize,
double threshold)
const;
75 unsigned int pickSize,
77 double threshold)
const;
81 unsigned int pickSize,
100 double threshold,
int nthreads)
const {
103 if (poolSize < pickSize)
105 distmatFunctor functor(distMat);
106 return this->lazyPick(functor, poolSize, pickSize, firstPicks, threshold,
112 unsigned int pickSize)
const {
114 return pick(distMat, poolSize, pickSize, iv, default_threshold,
119 #ifdef USE_THREADED_LEADERPICKER
123 template <
typename T>
124 void *LeaderPickerWork(
void *arg);
126 template <
typename T>
127 struct LeaderPickerState {
130 unsigned int capacity;
132 unsigned int next[2];
135 LeaderPickerState<T> *stat;
138 } LeaderPickerThread;
140 std::vector<LeaderPickerThread> threads;
141 std::vector<LeaderPickerBlock> blocks;
142 pthread_barrier_t wait;
143 pthread_barrier_t done;
145 LeaderPickerBlock *head_block;
146 unsigned int thread_op;
147 unsigned int nthreads;
155 for (
unsigned int i = 0; i < count; i++)
v[i] = i;
162 bcount = (count + (bsize - 1)) / bsize;
163 unsigned int tasks = (bcount + 1) / 2;
165 if (nt > (
int)tasks) nt = tasks;
168 bcount = (count + (bsize - 1)) / bsize;
170 blocks.resize(bcount);
171 head_block = &blocks[0];
176 unsigned int len = count;
177 for (
unsigned int i = 0; i < bcount; i++) {
178 LeaderPickerBlock *block = &blocks[i];
181 block->capacity = bsize;
183 block->next[0] = i + 1;
185 block->capacity = len;
194 head_block->capacity = count;
195 head_block->len = count;
196 head_block->next[0] = 0;
197 head_block->next[1] = 0;
198 head_block->ptr = &
v[0];
204 pthread_barrier_init(&wait, NULL, nthreads + 1);
205 pthread_barrier_init(&done, NULL, nthreads + 1);
208 for (
unsigned int i = 0; i < nthreads; i++) {
210 threads[i].stat =
this;
211 pthread_create(&threads[i].tid, NULL, LeaderPickerWork<T>,
212 (
void *)&threads[i]);
221 pthread_barrier_wait(&wait);
222 for (
unsigned int i = 0; i < nthreads; i++)
223 pthread_join(threads[i].tid, 0);
224 pthread_barrier_destroy(&wait);
225 pthread_barrier_destroy(&done);
231 if (head_block->len)
return false;
232 unsigned int next_tick = head_block->next[tick];
233 if (!next_tick)
return true;
234 head_block = &blocks[next_tick];
239 unsigned int compact(
int *dst,
int *src,
unsigned int len) {
240 unsigned int count = 0;
241 for (
unsigned int i = 0; i < len; i++) {
247 void compact_job(
unsigned int cycle) {
250 unsigned int tock = tick ^ 1;
252 LeaderPickerBlock *list = head_block;
254 unsigned int next_tick = list->next[tick];
256 LeaderPickerBlock *next = &blocks[next_tick];
257 unsigned int next_next_tick = next->next[tick];
259 list->len =
compact(list->ptr, list->ptr, list->len);
260 if (list->len + next->len <= list->capacity) {
261 list->len +=
compact(list->ptr + list->len, next->ptr, next->len);
262 list->next[tock] = next_next_tick;
264 next->len =
compact(next->ptr, next->ptr, next->len);
266 list->next[tock] = next_tick;
267 next->next[tock] = next_next_tick;
269 list->next[tock] = next_next_tick;
271 cycle = nthreads - 1;
274 if (next_next_tick) {
275 list = &blocks[next_next_tick];
280 list->len =
compact(list->ptr, list->ptr, list->len);
281 list->next[tock] = 0;
292 pthread_barrier_wait(&wait);
293 pthread_barrier_wait(&done);
306 template <
typename T>
307 void *LeaderPickerWork(
void *arg) {
308 typename LeaderPickerState<T>::LeaderPickerThread *thread;
309 thread = (
typename LeaderPickerState<T>::LeaderPickerThread *)arg;
310 LeaderPickerState<T> *stat = thread->stat;
313 pthread_barrier_wait(&stat->wait);
314 if (stat->thread_op)
return (
void *)0;
315 stat->compact_job(thread->id);
316 pthread_barrier_wait(&stat->done);
321 template <
typename T>
331 for (
unsigned int i = 0; i < count; i++)
v[i] = i;
337 unsigned int compact(
int *dst,
int *src,
unsigned int len) {
338 unsigned int count = 0;
339 for (
unsigned int i = 0; i < len; i++) {
340 double ld = (*func)(
query, src[i]);
342 if (ld >
threshold) dst[count++] = src[i];
362 template <
typename T>
364 unsigned int pickSize,
366 double threshold,
int nthreads)
const {
369 if (poolSize < pickSize)
372 if (!pickSize) pickSize = poolSize;
376 stat.threshold = threshold;
379 unsigned int picked = 0;
380 unsigned int pick = 0;
382 if (!firstPicks.empty()) {
383 for (RDKit::INT_VECT::const_iterator pIdx = firstPicks.begin();
384 pIdx != firstPicks.end(); ++pIdx) {
385 pick =
static_cast<unsigned int>(*pIdx);
386 if (
pick >= poolSize) {
389 picks.push_back(
pick);
395 while (picked < pickSize && !stat.empty()) {
396 pick = stat.compact_next();
397 picks.push_back(
pick);
403 template <
typename T>
405 unsigned int pickSize)
const {
411 template <
typename T>
413 unsigned int pickSize,
414 double threshold)
const {
419 template <
typename T>
421 unsigned int pickSize,
423 double threshold)
const {