1 package com.griddynamics.jagger.invoker;
3 import com.google.common.collect.AbstractIterator;
4 import com.griddynamics.jagger.util.Pair;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.Objects;
13 import java.util.Random;
14 import java.util.concurrent.ArrayBlockingQueue;
27 public abstract class ExclusiveAccessLoadBalancer<Q, E>
extends PairSupplierFactoryLoadBalancer<Q, E> {
29 private final static Logger log = LoggerFactory.getLogger(ExclusiveAccessLoadBalancer.class);
32 super(pairSupplierFactory);
35 private volatile ArrayBlockingQueue<Pair<Q, E>> pairQueue;
36 private volatile Long randomnessSeed;
39 this.randomnessSeed = randomnessSeed;
42 protected abstract boolean isToCircleAnIteration();
48 protected abstract Pair<Q, E> pollNext();
52 return new AbstractIterator<Pair<Q, E>>() {
54 Pair<Q, E> current = null;
57 protected Pair<Q, E> computeNext() {
58 if (current != null && isToCircleAnIteration()) {
59 log.debug(
"Returning pair - {}", current);
60 pairQueue.add(current);
64 log.debug(
"Providing pair - {}", current);
69 public String toString() {
70 return super.getClass() +
" iterator";
79 log.debug(
"already initialized. returning...");
85 PairSupplier<Q, E> pairSupplier = getPairSupplier();
86 List<Pair<Q, E>> pairList =
new ArrayList<>(pairSupplier.size());
90 if (kernelInfo != null) {
91 index = kernelInfo.getKernelId();
92 step = kernelInfo.getKernelsNumber();
94 for (; index < pairSupplier.size(); index += step) {
95 pairList.add(pairSupplier.get(index));
98 if (Objects.nonNull(randomnessSeed)) {
99 log.info(
"'randomnessSeed' value is not null. Going to shuffle the pairs");
100 Collections.shuffle(pairList,
new Random(randomnessSeed));
103 log.info(
"{} pairs on this node to balance", pairList.size());
104 log.debug(
"Pairs to load balance: {}", pairList);
106 pairQueue =
new ArrayBlockingQueue<>(pairSupplier.size(),
true, pairList);