diff --git a/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..02f88c7c1d0ea5e63368323cf57afc0648bb4945 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/PoliteExecutor.java @@ -0,0 +1,84 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.concurrent.GuardedBy; + +import static java.util.logging.Level.FINE; + +/** + * An {@link Executor} that delegates its tasks to another {@link Executor} + * while limiting the number of tasks that are delegated concurrently. Tasks + * are delegated in the order they are submitted to this executor. + */ +@NotNullByDefault +public class PoliteExecutor implements Executor { + + private static final Level LOG_LEVEL = FINE; + + private final Object lock = new Object(); + @GuardedBy("lock") + private final Queue<Runnable> queue = new LinkedList<Runnable>(); + private final Executor delegate; + private final int maxConcurrentTasks; + private final Logger log; + + @GuardedBy("lock") + private int concurrentTasks = 0; + + /** + * @param tag the tag to be used for logging + * @param delegate the executor to which tasks will be delegated + * @param maxConcurrentTasks the maximum number of tasks that will be + * delegated concurrently. If this is set to 1, tasks submitted to this + * executor will run in the order they are submitted and will not run + * concurrently + */ + public PoliteExecutor(String tag, Executor delegate, + int maxConcurrentTasks) { + this.delegate = delegate; + this.maxConcurrentTasks = maxConcurrentTasks; + log = Logger.getLogger(tag); + } + + @Override + public void execute(final Runnable r) { + final long submitted = System.currentTimeMillis(); + Runnable wrapped = new Runnable() { + @Override + public void run() { + if (log.isLoggable(LOG_LEVEL)) { + long queued = System.currentTimeMillis() - submitted; + log.log(LOG_LEVEL, "Queue time " + queued + " ms"); + } + try { + r.run(); + } finally { + scheduleNext(); + } + } + }; + synchronized (lock) { + if (concurrentTasks < maxConcurrentTasks) { + concurrentTasks++; + delegate.execute(wrapped); + } else { + queue.add(wrapped); + } + } + } + + private void scheduleNext() { + synchronized (lock) { + Runnable next = queue.poll(); + if (next == null) concurrentTasks--; + else delegate.execute(next); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java index 77823e5d09a8e78dda77b38bbf824b2f969c8b5f..e7385f59b086c664e1c49f2ca15952f2eb2c6a9f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/TimeLoggingExecutor.java @@ -1,5 +1,7 @@ package org.briarproject.bramble; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; @@ -9,6 +11,7 @@ import java.util.logging.Logger; import static java.util.logging.Level.FINE; +@NotNullByDefault public class TimeLoggingExecutor extends ThreadPoolExecutor { private static final Level LOG_LEVEL = FINE; @@ -26,22 +29,21 @@ public class TimeLoggingExecutor extends ThreadPoolExecutor { @Override public void execute(final Runnable r) { - final long submitted = System.currentTimeMillis(); - super.execute(new Runnable() { - @Override - public void run() { - long started = System.currentTimeMillis(); - if (log.isLoggable(LOG_LEVEL)) { - long duration = started - submitted; - log.log(LOG_LEVEL, "Queue time " + duration + " ms"); - } - r.run(); - long finished = System.currentTimeMillis(); - if (log.isLoggable(LOG_LEVEL)) { - long duration = finished - started; - log.log(LOG_LEVEL, "Execution time " + duration + " ms"); + if (log.isLoggable(LOG_LEVEL)) { + final long submitted = System.currentTimeMillis(); + super.execute(new Runnable() { + @Override + public void run() { + long started = System.currentTimeMillis(); + long queued = started - submitted; + log.log(LOG_LEVEL, "Queue time " + queued + " ms"); + r.run(); + long executing = System.currentTimeMillis() - started; + log.log(LOG_LEVEL, "Execution time " + executing + " ms"); } - } - }); + }); + } else { + super.execute(r); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java index 1ac3cc29cf495e8e8ccc340c296f15d8f813ecab..bcad98cda799701c0a742d8a38b270fcb0da3966 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/crypto/CryptoModule.java @@ -32,7 +32,7 @@ public class CryptoModule { public static class EagerSingletons { @Inject @CryptoExecutor - Executor cryptoExecutor; + ExecutorService cryptoExecutor; } /** @@ -86,11 +86,18 @@ public class CryptoModule { @Provides @Singleton @CryptoExecutor - Executor getCryptoExecutor(LifecycleManager lifecycleManager) { + ExecutorService getCryptoExecutorService( + LifecycleManager lifecycleManager) { lifecycleManager.registerForShutdown(cryptoExecutor); return cryptoExecutor; } + @Provides + @CryptoExecutor + Executor getCryptoExecutor() { + return cryptoExecutor; + } + @Provides SecureRandom getSecureRandom(CryptoComponent crypto) { return crypto.getSecureRandom(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java b/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..426c4cc1d329b37a37d6093cd1331318adf8ed59 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/PoliteExecutorTest.java @@ -0,0 +1,142 @@ +package org.briarproject.bramble; + +import org.briarproject.bramble.test.BrambleTestCase; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class PoliteExecutorTest extends BrambleTestCase { + + private static final String TAG = "Test"; + private static final int TASKS = 10; + + @Test + public void testTasksAreDelegatedInOrderOfSubmission() throws Exception { + // Delegate to a single-threaded executor + Executor delegate = Executors.newSingleThreadExecutor(); + // Allow all the tasks to be delegated straight away + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2); + final List<Integer> list = new Vector<Integer>(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + list.add(result); + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have run in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + @Test + public void testQueuedTasksAreDelegatedInOrderOfSubmission() + throws Exception { + // Delegate to a single-threaded executor + Executor delegate = Executors.newSingleThreadExecutor(); + // Allow two tasks to be delegated at a time + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 2); + final List<Integer> list = new Vector<Integer>(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + list.add(result); + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have run in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + @Test + public void testTasksRunInParallelOnDelegate() throws Exception { + // Delegate to a multi-threaded executor + Executor delegate = Executors.newCachedThreadPool(); + // Allow all the tasks to be delegated straight away + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, TASKS * 2); + final List<Integer> list = new Vector<Integer>(); + final CountDownLatch[] latches = new CountDownLatch[TASKS]; + for (int i = 0; i < TASKS; i++) latches[i] = new CountDownLatch(1); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + try { + // Each task waits for the next task, if any, to finish + if (result < TASKS - 1) latches[result + 1].await(); + list.add(result); + } catch (InterruptedException e) { + fail(); + } + latches[result].countDown(); + } + }); + } + // Wait for all the tasks to finish + for (int i = 0; i < TASKS; i++) latches[i].await(); + // The tasks should have finished in reverse order + assertEquals(descendingOrder(), list); + } + + @Test + public void testTasksDoNotRunInParallelOnDelegate() throws Exception { + // Delegate to a multi-threaded executor + Executor delegate = Executors.newCachedThreadPool(); + // Allow one task to be delegated at a time + PoliteExecutor polite = new PoliteExecutor(TAG, delegate, 1); + final List<Integer> list = new Vector<Integer>(); + final CountDownLatch latch = new CountDownLatch(TASKS); + for (int i = 0; i < TASKS; i++) { + final int result = i; + polite.execute(new Runnable() { + @Override + public void run() { + try { + // Each task runs faster than the previous task + Thread.sleep(TASKS - result); + list.add(result); + } catch (InterruptedException e) { + fail(); + } + latch.countDown(); + } + }); + } + // Wait for all the tasks to finish + latch.await(); + // The tasks should have finished in the order they were submitted + assertEquals(ascendingOrder(), list); + } + + private List<Integer> ascendingOrder() { + Integer[] array = new Integer[TASKS]; + for (int i = 0; i < TASKS; i++) array[i] = i; + return Arrays.asList(array); + } + + private List<Integer> descendingOrder() { + Integer[] array = new Integer[TASKS]; + for (int i = 0; i < TASKS; i++) array[i] = TASKS - 1 - i; + return Arrays.asList(array); + } +}