From f2de23854ef3595bbf560d5039092d3226f80654 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Sat, 10 Dec 2011 00:59:29 +0000 Subject: [PATCH] Added BoundedExecutor and documented executor policies. --- ...r.java => IncomingConnectionExecutor.java} | 2 +- .../net/sf/briar/db/DatabaseExecutorImpl.java | 56 ------------------- .../net/sf/briar/db/DatabaseModule.java | 19 ++++++- .../net/sf/briar/plugins/PluginsModule.java | 1 + .../net/sf/briar/protocol/ProtocolModule.java | 20 ++++++- .../protocol/VerificationExecutorImpl.java | 54 ------------------ .../transport/ConnectionDispatcherImpl.java | 12 ++-- .../transport/ConnectionRecogniserImpl.java | 14 ++--- .../sf/briar/transport/TransportModule.java | 5 +- util/net/sf/briar/util/BoundedExecutor.java | 56 +++++++++++++++++++ 10 files changed, 111 insertions(+), 128 deletions(-) rename api/net/sf/briar/api/transport/{ConnectionRecogniserExecutor.java => IncomingConnectionExecutor.java} (89%) delete mode 100644 components/net/sf/briar/db/DatabaseExecutorImpl.java delete mode 100644 components/net/sf/briar/protocol/VerificationExecutorImpl.java create mode 100644 util/net/sf/briar/util/BoundedExecutor.java diff --git a/api/net/sf/briar/api/transport/ConnectionRecogniserExecutor.java b/api/net/sf/briar/api/transport/IncomingConnectionExecutor.java similarity index 89% rename from api/net/sf/briar/api/transport/ConnectionRecogniserExecutor.java rename to api/net/sf/briar/api/transport/IncomingConnectionExecutor.java index 8251c48a40..e3048a1eb5 100644 --- a/api/net/sf/briar/api/transport/ConnectionRecogniserExecutor.java +++ b/api/net/sf/briar/api/transport/IncomingConnectionExecutor.java @@ -14,4 +14,4 @@ import com.google.inject.BindingAnnotation; @BindingAnnotation @Target({ PARAMETER }) @Retention(RUNTIME) -public @interface ConnectionRecogniserExecutor {} \ No newline at end of file +public @interface IncomingConnectionExecutor {} \ No newline at end of file diff --git a/components/net/sf/briar/db/DatabaseExecutorImpl.java b/components/net/sf/briar/db/DatabaseExecutorImpl.java deleted file mode 100644 index f6e0d1cf0f..0000000000 --- a/components/net/sf/briar/db/DatabaseExecutorImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -package net.sf.briar.db; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * An executor that limits the number of concurrent database tasks and the - * number of tasks queued for execution. - */ -class DatabaseExecutorImpl implements Executor { - - // FIXME: Determine suitable values for these constants empirically - - /** - * The maximum number of tasks that can be queued for execution - * before attempting to execute another task will block. - */ - private static final int MAX_QUEUED_TASKS = 10; - - /** The number of idle threads to keep in the pool. */ - private static final int MIN_THREADS = 1; - - /** The maximum number of concurrent tasks. */ - private static final int MAX_THREADS = 10; - - private static final Logger LOG = - Logger.getLogger(DatabaseExecutorImpl.class.getName()); - - private final BlockingQueue<Runnable> queue; - - DatabaseExecutorImpl() { - this(MAX_QUEUED_TASKS, MIN_THREADS, MAX_THREADS); - } - - DatabaseExecutorImpl(int maxQueued, int minThreads, int maxThreads) { - queue = new ArrayBlockingQueue<Runnable>(maxQueued); - new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS, - queue); - } - - public void execute(Runnable r) { - try { - // Block until there's space in the queue - queue.put(r); - } catch(InterruptedException e) { - if(LOG.isLoggable(Level.INFO)) - LOG.info("Interrupted while queueing task"); - Thread.currentThread().interrupt(); - } - } -} diff --git a/components/net/sf/briar/db/DatabaseModule.java b/components/net/sf/briar/db/DatabaseModule.java index 19abe69a7c..9fa851d6d1 100644 --- a/components/net/sf/briar/db/DatabaseModule.java +++ b/components/net/sf/briar/db/DatabaseModule.java @@ -15,6 +15,7 @@ import net.sf.briar.api.protocol.GroupFactory; import net.sf.briar.api.protocol.PacketFactory; import net.sf.briar.api.transport.ConnectionContextFactory; import net.sf.briar.api.transport.ConnectionWindowFactory; +import net.sf.briar.util.BoundedExecutor; import com.google.inject.AbstractModule; import com.google.inject.Provides; @@ -22,11 +23,27 @@ import com.google.inject.Singleton; public class DatabaseModule extends AbstractModule { + // FIXME: Determine suitable values for these constants empirically + + /** + * The maximum number of database tasks that can be queued for execution + * before submitting another task will block. + */ + private static final int MAX_QUEUED_DB_TASKS = 10; + + /** The minimum number of database threads to keep in the pool. */ + private static final int MIN_DB_THREADS = 1; + + /** The maximum number of database threads. */ + private static final int MAX_DB_THREADS = 10; + @Override protected void configure() { bind(DatabaseCleaner.class).to(DatabaseCleanerImpl.class); + // The executor is bounded, so tasks must be independent and short-lived bind(Executor.class).annotatedWith(DatabaseExecutor.class).toInstance( - new DatabaseExecutorImpl()); + new BoundedExecutor(MAX_QUEUED_DB_TASKS, MIN_DB_THREADS, + MAX_DB_THREADS)); } @Provides diff --git a/components/net/sf/briar/plugins/PluginsModule.java b/components/net/sf/briar/plugins/PluginsModule.java index 9ade114d13..fa5ff5d42f 100644 --- a/components/net/sf/briar/plugins/PluginsModule.java +++ b/components/net/sf/briar/plugins/PluginsModule.java @@ -13,6 +13,7 @@ public class PluginsModule extends AbstractModule { @Override protected void configure() { + // The executor is unbounded, so tasks can be dependent or long-lived bind(ExecutorService.class).annotatedWith( PluginExecutor.class).toInstance( Executors.newCachedThreadPool()); diff --git a/components/net/sf/briar/protocol/ProtocolModule.java b/components/net/sf/briar/protocol/ProtocolModule.java index 921d30cd47..aec69bdc63 100644 --- a/components/net/sf/briar/protocol/ProtocolModule.java +++ b/components/net/sf/briar/protocol/ProtocolModule.java @@ -19,12 +19,28 @@ import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.serial.ObjectReader; +import net.sf.briar.util.BoundedExecutor; import com.google.inject.AbstractModule; import com.google.inject.Provides; public class ProtocolModule extends AbstractModule { + // FIXME: Determine suitable values for these constants empirically + + /** + * The maximum number of verification tasks that can be queued for + * execution before submitting another task will block. + */ + private static final int MAX_QUEUED_VERIFIER_TASKS = 10; + + /** The minimum number of verification threads to keep in the pool. */ + private static final int MIN_VERIFIER_THREADS = 1; + + /** The maximum number of verification threads. */ + private static final int MAX_VERIFIER_THREADS = + Runtime.getRuntime().availableProcessors(); + @Override protected void configure() { bind(AuthorFactory.class).to(AuthorFactoryImpl.class); @@ -34,9 +50,11 @@ public class ProtocolModule extends AbstractModule { bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class); bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class); bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class); + // The executor is bounded, so tasks must be independent and short-lived bind(Executor.class).annotatedWith( VerificationExecutor.class).toInstance( - new VerificationExecutorImpl()); + new BoundedExecutor(MAX_QUEUED_VERIFIER_TASKS, + MIN_VERIFIER_THREADS, MAX_VERIFIER_THREADS)); } @Provides diff --git a/components/net/sf/briar/protocol/VerificationExecutorImpl.java b/components/net/sf/briar/protocol/VerificationExecutorImpl.java deleted file mode 100644 index 0c8b439a82..0000000000 --- a/components/net/sf/briar/protocol/VerificationExecutorImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -package net.sf.briar.protocol; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * An executor that limits the number of concurrent message verification tasks - * and the number of tasks queued for execution. - */ -class VerificationExecutorImpl implements Executor { - - // FIXME: Determine suitable values for these constants empirically - - /** - * The maximum number of tasks that can be queued for execution - * before attempting to execute another task will block. - */ - private static final int MAX_QUEUED_TASKS = 10; - - /** The number of idle threads to keep in the pool. */ - private static final int MIN_THREADS = 1; - - private static final Logger LOG = - Logger.getLogger(VerificationExecutorImpl.class.getName()); - - private final BlockingQueue<Runnable> queue; - - VerificationExecutorImpl() { - this(MAX_QUEUED_TASKS, MIN_THREADS, - Runtime.getRuntime().availableProcessors()); - } - - VerificationExecutorImpl(int maxQueued, int minThreads, int maxThreads) { - queue = new ArrayBlockingQueue<Runnable>(maxQueued); - new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS, - queue); - } - - public void execute(Runnable r) { - try { - // Block until there's space in the queue - queue.put(r); - } catch(InterruptedException e) { - if(LOG.isLoggable(Level.INFO)) - LOG.info("Interrupted while queueing task"); - Thread.currentThread().interrupt(); - } - } -} diff --git a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java index f390db3d6d..1cb97fdef6 100644 --- a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java +++ b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java @@ -18,7 +18,7 @@ import net.sf.briar.api.transport.BatchTransportWriter; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionDispatcher; import net.sf.briar.api.transport.ConnectionRecogniser; -import net.sf.briar.api.transport.ConnectionRecogniserExecutor; +import net.sf.briar.api.transport.IncomingConnectionExecutor; import net.sf.briar.api.transport.StreamTransportConnection; import net.sf.briar.api.transport.TransportConstants; @@ -29,24 +29,24 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { private static final Logger LOG = Logger.getLogger(ConnectionDispatcherImpl.class.getName()); - private final Executor executor; + private final Executor connExecutor; private final ConnectionRecogniser recogniser; private final BatchConnectionFactory batchConnFactory; private final StreamConnectionFactory streamConnFactory; @Inject - ConnectionDispatcherImpl(@ConnectionRecogniserExecutor Executor executor, + ConnectionDispatcherImpl(@IncomingConnectionExecutor Executor connExecutor, ConnectionRecogniser recogniser, BatchConnectionFactory batchConnFactory, StreamConnectionFactory streamConnFactory) { - this.executor = executor; + this.connExecutor = connExecutor; this.recogniser = recogniser; this.batchConnFactory = batchConnFactory; this.streamConnFactory = streamConnFactory; } public void dispatchReader(TransportId t, BatchTransportReader r) { - executor.execute(new DispatchBatchConnection(t, r)); + connExecutor.execute(new DispatchBatchConnection(t, r)); } public void dispatchWriter(ContactId c, TransportId t, TransportIndex i, @@ -56,7 +56,7 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { public void dispatchIncomingConnection(TransportId t, StreamTransportConnection s) { - executor.execute(new DispatchStreamConnection(t, s)); + connExecutor.execute(new DispatchStreamConnection(t, s)); } public void dispatchOutgoingConnection(ContactId c, TransportId t, diff --git a/components/net/sf/briar/transport/ConnectionRecogniserImpl.java b/components/net/sf/briar/transport/ConnectionRecogniserImpl.java index 81753d7d72..1118ebe769 100644 --- a/components/net/sf/briar/transport/ConnectionRecogniserImpl.java +++ b/components/net/sf/briar/transport/ConnectionRecogniserImpl.java @@ -33,7 +33,7 @@ import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportIndex; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionRecogniser; -import net.sf.briar.api.transport.ConnectionRecogniserExecutor; +import net.sf.briar.api.transport.IncomingConnectionExecutor; import net.sf.briar.api.transport.ConnectionWindow; import net.sf.briar.util.ByteUtils; @@ -45,7 +45,7 @@ DatabaseListener { private static final Logger LOG = Logger.getLogger(ConnectionRecogniserImpl.class.getName()); - private final Executor executor; + private final Executor connExecutor; private final DatabaseComponent db; private final CryptoComponent crypto; private final Cipher tagCipher; // Locking: this @@ -55,9 +55,9 @@ DatabaseListener { private boolean initialised = false; // Locking: this @Inject - ConnectionRecogniserImpl(@ConnectionRecogniserExecutor Executor executor, + ConnectionRecogniserImpl(@IncomingConnectionExecutor Executor connExecutor, DatabaseComponent db, CryptoComponent crypto) { - this.executor = executor; + this.connExecutor = connExecutor; this.db = db; this.crypto = crypto; tagCipher = crypto.getTagCipher(); @@ -154,7 +154,7 @@ DatabaseListener { if(e instanceof ContactRemovedEvent) { // Remove the expected IVs for the ex-contact final ContactId c = ((ContactRemovedEvent) e).getContactId(); - executor.execute(new Runnable() { + connExecutor.execute(new Runnable() { public void run() { removeContact(c); } @@ -162,7 +162,7 @@ DatabaseListener { } else if(e instanceof TransportAddedEvent) { // Add the expected IVs for the new transport final TransportId t = ((TransportAddedEvent) e).getTransportId(); - executor.execute(new Runnable() { + connExecutor.execute(new Runnable() { public void run() { addTransport(t); } @@ -172,7 +172,7 @@ DatabaseListener { RemoteTransportsUpdatedEvent r = (RemoteTransportsUpdatedEvent) e; final ContactId c = r.getContactId(); final Collection<Transport> transports = r.getTransports(); - executor.execute(new Runnable() { + connExecutor.execute(new Runnable() { public void run() { updateContact(c, transports); } diff --git a/components/net/sf/briar/transport/TransportModule.java b/components/net/sf/briar/transport/TransportModule.java index a4e6ea18e0..9ebc40e8d5 100644 --- a/components/net/sf/briar/transport/TransportModule.java +++ b/components/net/sf/briar/transport/TransportModule.java @@ -7,7 +7,7 @@ import net.sf.briar.api.transport.ConnectionContextFactory; import net.sf.briar.api.transport.ConnectionDispatcher; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionRecogniser; -import net.sf.briar.api.transport.ConnectionRecogniserExecutor; +import net.sf.briar.api.transport.IncomingConnectionExecutor; import net.sf.briar.api.transport.ConnectionRegistry; import net.sf.briar.api.transport.ConnectionWindowFactory; import net.sf.briar.api.transport.ConnectionWriterFactory; @@ -29,8 +29,9 @@ public class TransportModule extends AbstractModule { ConnectionWindowFactoryImpl.class); bind(ConnectionWriterFactory.class).to( ConnectionWriterFactoryImpl.class); + // The executor is unbounded, so tasks can be dependent or long-lived bind(Executor.class).annotatedWith( - ConnectionRecogniserExecutor.class).toInstance( + IncomingConnectionExecutor.class).toInstance( Executors.newCachedThreadPool()); } } diff --git a/util/net/sf/briar/util/BoundedExecutor.java b/util/net/sf/briar/util/BoundedExecutor.java new file mode 100644 index 0000000000..c019353b8a --- /dev/null +++ b/util/net/sf/briar/util/BoundedExecutor.java @@ -0,0 +1,56 @@ +package net.sf.briar.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An executor that limits the number of concurrently executing tasks and the + * number of tasks queued for execution. + */ +public class BoundedExecutor implements Executor { + + private static final Logger LOG = + Logger.getLogger(BoundedExecutor.class.getName()); + + private final Semaphore semaphore; + private final BlockingQueue<Runnable> queue; + private final Executor executor; + + public BoundedExecutor(int maxQueued, int minThreads, int maxThreads) { + semaphore = new Semaphore(maxQueued + maxThreads); + queue = new LinkedBlockingQueue<Runnable>(); + executor = new ThreadPoolExecutor(minThreads, maxThreads, 60, + TimeUnit.SECONDS, queue); + } + + public void execute(final Runnable r) { + try { + semaphore.acquire(); + executor.execute(new Runnable() { + public void run() { + try { + r.run(); + } finally { + semaphore.release(); + } + } + }); + } catch(InterruptedException e) { + if(LOG.isLoggable(Level.INFO)) + LOG.info("Interrupted while queueing task"); + Thread.currentThread().interrupt(); + throw new RejectedExecutionException(); + } catch(RejectedExecutionException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); + semaphore.release(); + throw e; + } + } +} -- GitLab