Jagger
 All Classes Namespaces Files Functions Variables Enumerator Groups Pages
ExclusiveAccessLoadBalancer.java
Go to the documentation of this file.
1 package com.griddynamics.jagger.invoker;
2 
3 import com.google.common.collect.AbstractIterator;
4 import com.griddynamics.jagger.util.Pair;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.Objects;
13 import java.util.Random;
14 import java.util.concurrent.ArrayBlockingQueue;
15 
27 public abstract class ExclusiveAccessLoadBalancer<Q, E> extends PairSupplierFactoryLoadBalancer<Q, E> {
28 
29  private final static Logger log = LoggerFactory.getLogger(ExclusiveAccessLoadBalancer.class);
30 
31  public ExclusiveAccessLoadBalancer(PairSupplierFactory<Q, E> pairSupplierFactory) {
32  super(pairSupplierFactory);
33  }
34 
35  private volatile ArrayBlockingQueue<Pair<Q, E>> pairQueue;
36  private volatile Long randomnessSeed;
37 
38  public void setRandomnessSeed(Long randomnessSeed) {
39  this.randomnessSeed = randomnessSeed;
40  }
41 
42  protected abstract boolean isToCircleAnIteration();
43 
44  protected ArrayBlockingQueue<Pair<Q, E>> getPairQueue() {
45  return pairQueue;
46  }
47 
48  protected abstract Pair<Q, E> pollNext();
49 
50  @Override
51  public Iterator<Pair<Q, E>> provide() {
52  return new AbstractIterator<Pair<Q, E>>() {
53 
54  Pair<Q, E> current = null;
55 
56  @Override
57  protected Pair<Q, E> computeNext() {
58  if (current != null && isToCircleAnIteration()) {
59  log.debug("Returning pair - {}", current);
60  pairQueue.add(current);
61  }
62  current = pollNext();
63 
64  log.debug("Providing pair - {}", current);
65  return current;
66  }
67 
68  @Override
69  public String toString() {
70  return super.getClass() + " iterator";
71  }
72  };
73  }
74 
75  @Override
76  public void init() {
77  synchronized (lock) {
78  if (initialized) {
79  log.debug("already initialized. returning...");
80  return;
81  }
82 
83  super.init();
84 
85  PairSupplier<Q, E> pairSupplier = getPairSupplier();
86  List<Pair<Q, E>> pairList = new ArrayList<>(pairSupplier.size());
87 
88  int index = 0;
89  int step = 1;
90  if (kernelInfo != null) { // then from all the provided pairs pick only those under this node's index.
91  index = kernelInfo.getKernelId();
92  step = kernelInfo.getKernelsNumber();
93  }
94  for (; index < pairSupplier.size(); index += step) {
95  pairList.add(pairSupplier.get(index));
96  }
97 
98  if (Objects.nonNull(randomnessSeed)) {
99  log.info("'randomnessSeed' value is not null. Going to shuffle the pairs");
100  Collections.shuffle(pairList, new Random(randomnessSeed));
101  }
102 
103  log.info("{} pairs on this node to balance", pairList.size());
104  log.debug("Pairs to load balance: {}", pairList);
105 
106  pairQueue = new ArrayBlockingQueue<>(pairSupplier.size(), true, pairList);
107  }
108  }
109 }