From e3242ebb069d95a775fe07ae65cbbdd6c82d2914 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Thu, 8 Dec 2011 16:33:48 +0000 Subject: [PATCH] Read the tag on a connection recogniser thread, don't block the plugin. --- api/net/sf/briar/api/ExceptionHandler.java | 6 - .../api/transport/ConnectionRecogniser.java | 15 +-- .../transport/ConnectionDispatcherImpl.java | 120 ++++++++++-------- .../transport/ConnectionRecogniserImpl.java | 18 +-- .../sf/briar/transport/TransportModule.java | 4 +- .../batch/BatchConnectionReadWriteTest.java | 29 +---- 6 files changed, 72 insertions(+), 120 deletions(-) delete mode 100644 api/net/sf/briar/api/ExceptionHandler.java diff --git a/api/net/sf/briar/api/ExceptionHandler.java b/api/net/sf/briar/api/ExceptionHandler.java deleted file mode 100644 index ccc91d094e..0000000000 --- a/api/net/sf/briar/api/ExceptionHandler.java +++ /dev/null @@ -1,6 +0,0 @@ -package net.sf.briar.api; - -public interface ExceptionHandler<E extends Exception> { - - void handleException(E exception); -} diff --git a/api/net/sf/briar/api/transport/ConnectionRecogniser.java b/api/net/sf/briar/api/transport/ConnectionRecogniser.java index aa448648c5..6bcf2f3bd1 100644 --- a/api/net/sf/briar/api/transport/ConnectionRecogniser.java +++ b/api/net/sf/briar/api/transport/ConnectionRecogniser.java @@ -1,6 +1,5 @@ package net.sf.briar.api.transport; -import net.sf.briar.api.ExceptionHandler; import net.sf.briar.api.db.DbException; import net.sf.briar.api.protocol.TransportId; @@ -11,15 +10,9 @@ import net.sf.briar.api.protocol.TransportId; public interface ConnectionRecogniser { /** - * Asynchronously calls one of the callback's connectionAccepted(), - * connectionRejected() or handleException() methods. + * Returns the context for the given connection if the connection was + * expected, or null if the connection was not expected. */ - void acceptConnection(TransportId t, byte[] tag, Callback c); - - interface Callback extends ExceptionHandler<DbException> { - - void connectionAccepted(ConnectionContext ctx); - - void connectionRejected(); - } + ConnectionContext acceptConnection(TransportId t, byte[] tag) + throws DbException; } diff --git a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java index b35d48d5c8..e9be62a634 100644 --- a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java +++ b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java @@ -1,7 +1,9 @@ package net.sf.briar.transport; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; @@ -16,7 +18,7 @@ import net.sf.briar.api.transport.BatchTransportWriter; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionDispatcher; import net.sf.briar.api.transport.ConnectionRecogniser; -import net.sf.briar.api.transport.ConnectionRecogniser.Callback; +import net.sf.briar.api.transport.ConnectionRecogniserExecutor; import net.sf.briar.api.transport.StreamTransportConnection; import net.sf.briar.api.transport.TransportConstants; @@ -27,45 +29,39 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { private static final Logger LOG = Logger.getLogger(ConnectionDispatcherImpl.class.getName()); + private final Executor executor; private final ConnectionRecogniser recogniser; private final BatchConnectionFactory batchConnFactory; private final StreamConnectionFactory streamConnFactory; @Inject - ConnectionDispatcherImpl(ConnectionRecogniser recogniser, + ConnectionDispatcherImpl(@ConnectionRecogniserExecutor Executor executor, + ConnectionRecogniser recogniser, BatchConnectionFactory batchConnFactory, StreamConnectionFactory streamConnFactory) { + this.executor = executor; this.recogniser = recogniser; this.batchConnFactory = batchConnFactory; this.streamConnFactory = streamConnFactory; } - public void dispatchReader(TransportId t, final BatchTransportReader r) { - // Read the tag - final byte[] tag; - try { - tag = readTag(r.getInputStream()); - } catch(IOException e) { - if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - r.dispose(false); - return; - } - // Get the connection context asynchronously - recogniser.acceptConnection(t, tag, new Callback() { + public void dispatchReader(TransportId t, BatchTransportReader r) { + executor.execute(new DispatchBatchConnection(t, r)); + } - public void connectionAccepted(ConnectionContext ctx) { - batchConnFactory.createIncomingConnection(ctx, r, tag); - } + public void dispatchWriter(ContactId c, TransportIndex i, + BatchTransportWriter w) { + batchConnFactory.createOutgoingConnection(c, i, w); + } - public void connectionRejected() { - r.dispose(true); - } + public void dispatchIncomingConnection(TransportId t, + StreamTransportConnection s) { + executor.execute(new DispatchStreamConnection(t, s)); + } - public void handleException(DbException e) { - if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - r.dispose(false); - } - }); + public void dispatchOutgoingConnection(ContactId c, TransportIndex i, + StreamTransportConnection s) { + streamConnFactory.createOutgoingConnection(c, i, s); } private byte[] readTag(InputStream in) throws IOException { @@ -73,48 +69,62 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { int offset = 0; while(offset < b.length) { int read = in.read(b, offset, b.length - offset); - if(read == -1) throw new IOException(); + if(read == -1) throw new EOFException(); offset += read; } return b; } - public void dispatchWriter(ContactId c, TransportIndex i, - BatchTransportWriter w) { - batchConnFactory.createOutgoingConnection(c, i, w); - } + private class DispatchBatchConnection implements Runnable { - public void dispatchIncomingConnection(TransportId t, - final StreamTransportConnection s) { - // Read the tag - final byte[] tag; - try { - tag = readTag(s.getInputStream()); - } catch(IOException e) { - if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - s.dispose(false); - return; + private final TransportId t; + private final BatchTransportReader r; + + private DispatchBatchConnection(TransportId t, BatchTransportReader r) { + this.t = t; + this.r = r; } - // Get the connection context asynchronously - recogniser.acceptConnection(t, tag, new Callback() { - public void connectionAccepted(ConnectionContext ctx) { - streamConnFactory.createIncomingConnection(ctx, s, tag); + public void run() { + try { + byte[] tag = readTag(r.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(t, tag); + if(ctx == null) r.dispose(true); + else batchConnFactory.createIncomingConnection(ctx, r, tag); + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + r.dispose(false); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + r.dispose(false); } + } + } - public void connectionRejected() { - s.dispose(true); - } + private class DispatchStreamConnection implements Runnable { + + private final TransportId t; + private final StreamTransportConnection s; - public void handleException(DbException e) { + private DispatchStreamConnection(TransportId t, + StreamTransportConnection s) { + this.t = t; + this.s = s; + } + + public void run() { + try { + byte[] tag = readTag(s.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(t, tag); + if(ctx == null) s.dispose(true); + else streamConnFactory.createIncomingConnection(ctx, s, tag); + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + s.dispose(false); + } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); s.dispose(false); } - }); - } - - public void dispatchOutgoingConnection(ContactId c, TransportIndex i, - StreamTransportConnection s) { - streamConnFactory.createOutgoingConnection(c, i, s); + } } -} +} \ No newline at end of file diff --git a/components/net/sf/briar/transport/ConnectionRecogniserImpl.java b/components/net/sf/briar/transport/ConnectionRecogniserImpl.java index 4a2e31e8da..c5c29d9ef1 100644 --- a/components/net/sf/briar/transport/ConnectionRecogniserImpl.java +++ b/components/net/sf/briar/transport/ConnectionRecogniserImpl.java @@ -107,23 +107,7 @@ DatabaseListener { return new Bytes(tag); } - public void acceptConnection(final TransportId t, final byte[] tag, - final Callback callback) { - executor.execute(new Runnable() { - public void run() { - try { - ConnectionContext ctx = acceptConnection(t, tag); - if(ctx == null) callback.connectionRejected(); - else callback.connectionAccepted(ctx); - } catch(DbException e) { - callback.handleException(e); - } - } - }); - } - - // Package access for testing - ConnectionContext acceptConnection(TransportId t, byte[] tag) + public ConnectionContext acceptConnection(TransportId t, byte[] tag) throws DbException { if(tag.length != TAG_LENGTH) throw new IllegalArgumentException(); diff --git a/components/net/sf/briar/transport/TransportModule.java b/components/net/sf/briar/transport/TransportModule.java index 1e9debcd77..876ec82387 100644 --- a/components/net/sf/briar/transport/TransportModule.java +++ b/components/net/sf/briar/transport/TransportModule.java @@ -12,7 +12,6 @@ import net.sf.briar.api.transport.ConnectionWindowFactory; import net.sf.briar.api.transport.ConnectionWriterFactory; import com.google.inject.AbstractModule; -import com.google.inject.Singleton; public class TransportModule extends AbstractModule { @@ -23,8 +22,7 @@ public class TransportModule extends AbstractModule { bind(ConnectionDispatcher.class).to(ConnectionDispatcherImpl.class); bind(ConnectionReaderFactory.class).to( ConnectionReaderFactoryImpl.class); - bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class).in( - Singleton.class); + bind(ConnectionRecogniser.class).to(ConnectionRecogniserImpl.class); bind(ConnectionWindowFactory.class).to( ConnectionWindowFactoryImpl.class); bind(ConnectionWriterFactory.class).to( diff --git a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java index e753cac020..0ae03f8fbc 100644 --- a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java +++ b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java @@ -8,14 +8,12 @@ import java.io.File; import java.util.Collection; import java.util.Collections; import java.util.Random; -import java.util.concurrent.CountDownLatch; import junit.framework.TestCase; import net.sf.briar.TestDatabaseModule; import net.sf.briar.TestUtils; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; -import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.event.DatabaseEvent; import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.MessagesAddedEvent; @@ -30,7 +28,6 @@ import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionRecogniser; -import net.sf.briar.api.transport.ConnectionRecogniser.Callback; import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.crypto.CryptoModule; import net.sf.briar.db.DatabaseModule; @@ -158,10 +155,7 @@ public class BatchConnectionReadWriteTest extends TestCase { byte[] tag = new byte[TAG_LENGTH]; int read = in.read(tag); assertEquals(tag.length, read); - TestCallback callback = new TestCallback(); - rec.acceptConnection(transportId, tag, callback); - callback.latch.await(); - ConnectionContext ctx = callback.ctx; + ConnectionContext ctx = rec.acceptConnection(transportId, tag); assertNotNull(ctx); assertEquals(contactId, ctx.getContactId()); assertEquals(transportIndex, ctx.getTransportIndex()); @@ -198,25 +192,4 @@ public class BatchConnectionReadWriteTest extends TestCase { if(e instanceof MessagesAddedEvent) messagesAdded = true; } } - - private static class TestCallback implements Callback { - - private final CountDownLatch latch = new CountDownLatch(1); - private ConnectionContext ctx = null; - - public void connectionAccepted(ConnectionContext ctx) { - this.ctx = ctx; - latch.countDown(); - } - - public void connectionRejected() { - fail(); - latch.countDown(); - } - - public void handleException(DbException e) { - fail(); - latch.countDown(); - } - } } -- GitLab