diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java index c8d63980e61ea37f63877aee99ce80ebdd15479d..cf8e00d04e7213623a0c25ceb4d6a74a9aa43b43 100644 --- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java @@ -3,6 +3,7 @@ package net.sf.briar.transport.batch; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,6 +24,8 @@ import net.sf.briar.api.transport.ConnectionReaderFactory; class IncomingBatchConnection { + private static final int MAX_WAITING_DB_WRITES = 5; + private static final Logger LOG = Logger.getLogger(IncomingBatchConnection.class.getName()); @@ -33,6 +36,7 @@ class IncomingBatchConnection { private final ConnectionContext ctx; private final BatchTransportReader reader; private final byte[] tag; + private final Semaphore semaphore; IncomingBatchConnection(Executor executor, ConnectionReaderFactory connFactory, @@ -45,6 +49,7 @@ class IncomingBatchConnection { this.ctx = ctx; this.reader = reader; this.tag = tag; + semaphore = new Semaphore(MAX_WAITING_DB_WRITES); } void read() { @@ -59,6 +64,7 @@ class IncomingBatchConnection { if(proto.hasAck()) { final Ack a = proto.readAck(); // Store the ack on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -67,11 +73,13 @@ class IncomingBatchConnection { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasBatch()) { final UnverifiedBatch b = proto.readBatch(); // Verify and store the batch on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -83,11 +91,13 @@ class IncomingBatchConnection { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasSubscriptionUpdate()) { final SubscriptionUpdate s = proto.readSubscriptionUpdate(); // Store the update on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -96,11 +106,13 @@ class IncomingBatchConnection { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasTransportUpdate()) { final TransportUpdate t = proto.readTransportUpdate(); // Store the update on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -109,12 +121,15 @@ class IncomingBatchConnection { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else { throw new FormatException(); } } + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); reader.dispose(false); diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index 071868574c47907d68c20cb3c85056cc02353ed7..17839b393b199e84cbc899781a6caab12933e997 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -11,6 +11,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; @@ -49,6 +50,8 @@ import net.sf.briar.api.transport.StreamTransportConnection; abstract class StreamConnection implements DatabaseListener { + private static final int MAX_WAITING_DB_WRITES = 5; + private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES }; private static final Logger LOG = @@ -63,6 +66,8 @@ abstract class StreamConnection implements DatabaseListener { protected final ContactId contactId; protected final StreamTransportConnection connection; + private final Semaphore semaphore; + private int writerFlags = 0; // Locking: this private Collection<MessageId> offered = null; // Locking: this private LinkedList<MessageId> requested = null; // Locking: this @@ -82,6 +87,7 @@ abstract class StreamConnection implements DatabaseListener { this.protoWriterFactory = protoWriterFactory; this.contactId = contactId; this.connection = connection; + semaphore = new Semaphore(MAX_WAITING_DB_WRITES); } protected abstract ConnectionReader createConnectionReader() @@ -126,6 +132,7 @@ abstract class StreamConnection implements DatabaseListener { if(proto.hasAck()) { final Ack a = proto.readAck(); // Store the ack on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -134,11 +141,13 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasBatch()) { final UnverifiedBatch b = proto.readBatch(); // Verify and store the batch on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -150,6 +159,7 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasOffer()) { @@ -182,6 +192,7 @@ abstract class StreamConnection implements DatabaseListener { // Mark the unrequested messages as seen on another thread final List<MessageId> l = Collections.unmodifiableList(seen); + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -190,6 +201,7 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); // Store the requested message IDs and notify the writer @@ -203,6 +215,7 @@ abstract class StreamConnection implements DatabaseListener { } else if(proto.hasSubscriptionUpdate()) { final SubscriptionUpdate s = proto.readSubscriptionUpdate(); // Store the update on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -211,11 +224,13 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else if(proto.hasTransportUpdate()) { final TransportUpdate t = proto.readTransportUpdate(); // Store the update on another thread + semaphore.acquire(); executor.execute(new Runnable() { public void run() { try { @@ -224,6 +239,7 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } + semaphore.release(); } }); } else { @@ -233,6 +249,8 @@ abstract class StreamConnection implements DatabaseListener { } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false);