Skip to content
Snippets Groups Projects
Unverified Commit 0c085f13 authored by akwizgran's avatar akwizgran
Browse files

Added "polite" delegating executor.

parent 4123f4a5
No related branches found
No related tags found
No related merge requests found
package org.briarproject.bramble;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import static java.util.logging.Level.FINE;
/**
* An {@link Executor} that delegates its tasks to another {@link Executor}
* while limiting the number of tasks that are delegated concurrently. Tasks
* are delegated in the order they are submitted to this executor.
*/
@NotNullByDefault
public class PoliteExecutor implements Executor {
private static final Level LOG_LEVEL = FINE;
private final Object lock = new Object();
@GuardedBy("lock")
private final Queue<Runnable> queue = new LinkedList<Runnable>();
private final Executor delegate;
private final int maxConcurrentTasks;
private final Logger log;
@GuardedBy("lock")
private int concurrentTasks = 0;
/**
* @param tag the tag to be used for logging
* @param delegate the executor to which tasks will be delegated
* @param maxConcurrentTasks the maximum number of tasks that will be
* delegated concurrently. If this is set to 1, tasks submitted to this
* executor will run in the order they are submitted and will not run
* concurrently
*/
public PoliteExecutor(String tag, Executor delegate,
int maxConcurrentTasks) {
this.delegate = delegate;
this.maxConcurrentTasks = maxConcurrentTasks;
log = Logger.getLogger(tag);
}
@Override
public void execute(final Runnable r) {
final long submitted = System.currentTimeMillis();
Runnable wrapped = new Runnable() {
@Override
public void run() {
if (log.isLoggable(LOG_LEVEL)) {
long queued = System.currentTimeMillis() - submitted;
log.log(LOG_LEVEL, "Queue time " + queued + " ms");
}
try {
r.run();
} finally {
scheduleNext();
}
}
};
synchronized (lock) {
if (concurrentTasks < maxConcurrentTasks) {
concurrentTasks++;
delegate.execute(wrapped);
} else {
queue.add(wrapped);
}
}
}
private void scheduleNext() {
synchronized (lock) {
Runnable next = queue.poll();
if (next == null) concurrentTasks--;
else delegate.execute(next);
}
}
}
package org.briarproject.bramble;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -9,6 +11,7 @@ import java.util.logging.Logger;
import static java.util.logging.Level.FINE;
@NotNullByDefault
public class TimeLoggingExecutor extends ThreadPoolExecutor {
private static final Level LOG_LEVEL = FINE;
......@@ -26,22 +29,21 @@ public class TimeLoggingExecutor extends ThreadPoolExecutor {
@Override
public void execute(final Runnable r) {
final long submitted = System.currentTimeMillis();
super.execute(new Runnable() {
@Override
public void run() {
long started = System.currentTimeMillis();
if (log.isLoggable(LOG_LEVEL)) {
long duration = started - submitted;
log.log(LOG_LEVEL, "Queue time " + duration + " ms");
}
r.run();
long finished = System.currentTimeMillis();
if (log.isLoggable(LOG_LEVEL)) {
long duration = finished - started;
log.log(LOG_LEVEL, "Execution time " + duration + " ms");
if (log.isLoggable(LOG_LEVEL)) {
final long submitted = System.currentTimeMillis();
super.execute(new Runnable() {
@Override
public void run() {
long started = System.currentTimeMillis();
long queued = started - submitted;
log.log(LOG_LEVEL, "Queue time " + queued + " ms");
r.run();
long executing = System.currentTimeMillis() - started;
log.log(LOG_LEVEL, "Execution time " + executing + " ms");
}
}
});
});
} else {
super.execute(r);
}
}
}
......@@ -32,7 +32,7 @@ public class CryptoModule {
public static class EagerSingletons {
@Inject
@CryptoExecutor
Executor cryptoExecutor;
ExecutorService cryptoExecutor;
}
/**
......@@ -86,11 +86,18 @@ public class CryptoModule {
@Provides
@Singleton
@CryptoExecutor
Executor getCryptoExecutor(LifecycleManager lifecycleManager) {
ExecutorService getCryptoExecutorService(
LifecycleManager lifecycleManager) {
lifecycleManager.registerForShutdown(cryptoExecutor);
return cryptoExecutor;
}
@Provides
@CryptoExecutor
Executor getCryptoExecutor() {
return cryptoExecutor;
}
@Provides
SecureRandom getSecureRandom(CryptoComponent crypto) {
return crypto.getSecureRandom();
......
package org.briarproject.bramble;
import org.briarproject.bramble.test.BrambleTestCase;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class PoliteExecutorTest extends BrambleTestCase {
private static final String TAG = "Test";
private static final int TASKS = 10;
@Test
public void testTasksAreDelegatedInOrderOfSubmission() throws Exception {
// Delegate to a single-threaded executor
Executor delegate = Executors.newSingleThreadExecutor();
// Allow all the tasks to be delegated straight away
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2);
final List<Integer> list = new Vector<Integer>();
final CountDownLatch latch = new CountDownLatch(TASKS);
for (int i = 0; i < TASKS; i++) {
final int result = i;
polite.execute(new Runnable() {
@Override
public void run() {
list.add(result);
latch.countDown();
}
});
}
// Wait for all the tasks to finish
latch.await();
// The tasks should have run in the order they were submitted
assertEquals(ascendingOrder(), list);
}
@Test
public void testQueuedTasksAreDelegatedInOrderOfSubmission()
throws Exception {
// Delegate to a single-threaded executor
Executor delegate = Executors.newSingleThreadExecutor();
// Allow two tasks to be delegated at a time
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 2);
final List<Integer> list = new Vector<Integer>();
final CountDownLatch latch = new CountDownLatch(TASKS);
for (int i = 0; i < TASKS; i++) {
final int result = i;
polite.execute(new Runnable() {
@Override
public void run() {
list.add(result);
latch.countDown();
}
});
}
// Wait for all the tasks to finish
latch.await();
// The tasks should have run in the order they were submitted
assertEquals(ascendingOrder(), list);
}
@Test
public void testTasksRunInParallelOnDelegate() throws Exception {
// Delegate to a multi-threaded executor
Executor delegate = Executors.newCachedThreadPool();
// Allow all the tasks to be delegated straight away
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2);
final List<Integer> list = new Vector<Integer>();
final CountDownLatch[] latches = new CountDownLatch[TASKS];
for (int i = 0; i < TASKS; i++) latches[i] = new CountDownLatch(1);
for (int i = 0; i < TASKS; i++) {
final int result = i;
polite.execute(new Runnable() {
@Override
public void run() {
try {
// Each task waits for the next task, if any, to finish
if (result < TASKS - 1) latches[result + 1].await();
list.add(result);
} catch (InterruptedException e) {
fail();
}
latches[result].countDown();
}
});
}
// Wait for all the tasks to finish
for (int i = 0; i < TASKS; i++) latches[i].await();
// The tasks should have finished in reverse order
assertEquals(descendingOrder(), list);
}
@Test
public void testTasksDoNotRunInParallelOnDelegate() throws Exception {
// Delegate to a multi-threaded executor
Executor delegate = Executors.newCachedThreadPool();
// Allow one task to be delegated at a time
PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 1);
final List<Integer> list = new Vector<Integer>();
final CountDownLatch latch = new CountDownLatch(TASKS);
for (int i = 0; i < TASKS; i++) {
final int result = i;
polite.execute(new Runnable() {
@Override
public void run() {
try {
// Each task runs faster than the previous task
Thread.sleep(TASKS - result);
list.add(result);
} catch (InterruptedException e) {
fail();
}
latch.countDown();
}
});
}
// Wait for all the tasks to finish
latch.await();
// The tasks should have finished in the order they were submitted
assertEquals(ascendingOrder(), list);
}
private List<Integer> ascendingOrder() {
Integer[] array = new Integer[TASKS];
for (int i = 0; i < TASKS; i++) array[i] = i;
return Arrays.asList(array);
}
private List<Integer> descendingOrder() {
Integer[] array = new Integer[TASKS];
for (int i = 0; i < TASKS; i++) array[i] = TASKS - 1 - i;
return Arrays.asList(array);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment