diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index f3d3dbe471783ae3bc4b63eeac74026121046bae..c96467e6ce0ecd418b3180b1684ae632af762aec 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -10,7 +10,9 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,6 +54,10 @@ abstract class StreamConnection implements DatabaseListener { private static final Logger LOG = Logger.getLogger(StreamConnection.class.getName()); + private static final Runnable CLOSE = new Runnable() { + public void run() {} + }; + protected final DatabaseComponent db; protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionWriterFactory connWriterFactory; @@ -62,12 +68,11 @@ abstract class StreamConnection implements DatabaseListener { private final Executor dbExecutor, verificationExecutor; private final AtomicBoolean canSendOffer; - private final LinkedList<Runnable> writerTasks; // Locking: this + private final BlockingQueue<Runnable> writerTasks; private Collection<MessageId> offered = null; // Locking: this private volatile ProtocolWriter writer = null; - private volatile boolean closed = false; StreamConnection(@DatabaseExecutor Executor dbExecutor, @VerificationExecutor Executor verificationExecutor, @@ -86,7 +91,7 @@ abstract class StreamConnection implements DatabaseListener { this.contactId = contactId; this.transport = transport; canSendOffer = new AtomicBoolean(false); - writerTasks = new LinkedList<Runnable>(); + writerTasks = new LinkedBlockingQueue<Runnable>(); } protected abstract ConnectionReader createConnectionReader() @@ -100,7 +105,7 @@ abstract class StreamConnection implements DatabaseListener { dbExecutor.execute(new GenerateAcks()); } else if(e instanceof ContactRemovedEvent) { ContactId c = ((ContactRemovedEvent) e).getContactId(); - if(contactId.equals(c)) closed = true; + if(contactId.equals(c)) writerTasks.add(CLOSE); } else if(e instanceof MessagesAddedEvent) { if(canSendOffer.getAndSet(false)) dbExecutor.execute(new GenerateOffer()); @@ -133,6 +138,7 @@ abstract class StreamConnection implements DatabaseListener { Request r = reader.readRequest(); // Retrieve the offered message IDs Collection<MessageId> offered = getOfferedMessageIds(); + if(offered == null) throw new FormatException(); // Work out which messages were requested BitSet b = r.getBitmap(); List<MessageId> requested = new LinkedList<MessageId>(); @@ -168,16 +174,13 @@ abstract class StreamConnection implements DatabaseListener { } } - private synchronized Collection<MessageId> getOfferedMessageIds() - throws FormatException { - if(offered == null) throw new FormatException(); // Unexpected request + private synchronized Collection<MessageId> getOfferedMessageIds() { Collection<MessageId> ids = offered; offered = null; return ids; } - // Locking: this - private void setOfferedMessageIds(Collection<MessageId> ids) { + private synchronized void setOfferedMessageIds(Collection<MessageId> ids) { assert offered == null; offered = ids; } @@ -193,19 +196,14 @@ abstract class StreamConnection implements DatabaseListener { dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateOffer()); // Main loop - while(!closed) { - Runnable task = null; - synchronized(this) { - while(writerTasks.isEmpty()) { - try { - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - task = writerTasks.poll(); + while(true) { + try { + Runnable task = writerTasks.take(); + if(task == CLOSE) break; + task.run(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); } - task.run(); } transport.dispose(true); } catch(DbException e) { @@ -286,10 +284,7 @@ abstract class StreamConnection implements DatabaseListener { public void run() { try { Request r = db.receiveOffer(contactId, offer); - synchronized(StreamConnection.this) { - writerTasks.add(new WriteRequest(r)); - StreamConnection.this.notifyAll(); - } + writerTasks.add(new WriteRequest(r)); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } @@ -379,10 +374,7 @@ abstract class StreamConnection implements DatabaseListener { try { Ack a = db.generateAck(contactId, maxBatches); while(a != null) { - synchronized(StreamConnection.this) { - writerTasks.add(new WriteAck(a)); - StreamConnection.this.notifyAll(); - } + writerTasks.add(new WriteAck(a)); a = db.generateAck(contactId, maxBatches); } } catch(DbException e) { @@ -425,16 +417,8 @@ abstract class StreamConnection implements DatabaseListener { int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE); try { RawBatch b = db.generateBatch(contactId, capacity, requested); - if(b == null) { - // No batch to write - send another offer - new GenerateOffer().run(); - } else { - // Write the batch - synchronized(StreamConnection.this) { - writerTasks.add(new WriteBatch(b, requested)); - StreamConnection.this.notifyAll(); - } - } + if(b == null) new GenerateOffer().run(); + else writerTasks.add(new WriteBatch(b, requested)); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } @@ -456,13 +440,8 @@ abstract class StreamConnection implements DatabaseListener { assert writer != null; try { writer.writeBatch(batch); - if(requested.isEmpty()) { - // No more batches to send - send another offer - dbExecutor.execute(new GenerateOffer()); - } else { - // Send another batch - dbExecutor.execute(new GenerateBatch(requested)); - } + if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer()); + else dbExecutor.execute(new GenerateBatch(requested)); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); transport.dispose(false); @@ -482,13 +461,10 @@ abstract class StreamConnection implements DatabaseListener { // No messages to offer - wait for some to be added canSendOffer.set(true); } else { - synchronized(StreamConnection.this) { - // Store the offered message IDs - setOfferedMessageIds(o.getMessageIds()); - // Write the offer on the writer thread - writerTasks.add(new WriteOffer(o)); - StreamConnection.this.notifyAll(); - } + // Store the offered message IDs + setOfferedMessageIds(o.getMessageIds()); + // Write the offer on the writer thread + writerTasks.add(new WriteOffer(o)); } } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); @@ -522,12 +498,7 @@ abstract class StreamConnection implements DatabaseListener { public void run() { try { SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) { - synchronized(StreamConnection.this) { - writerTasks.add(new WriteSubscriptionUpdate(s)); - StreamConnection.this.notifyAll(); - } - } + if(s != null) writerTasks.add(new WriteSubscriptionUpdate(s)); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } @@ -560,12 +531,7 @@ abstract class StreamConnection implements DatabaseListener { public void run() { try { TransportUpdate t = db.generateTransportUpdate(contactId); - if(t != null) { - synchronized(StreamConnection.this) { - writerTasks.add(new WriteTransportUpdate(t)); - StreamConnection.this.notifyAll(); - } - } + if(t != null) writerTasks.add(new WriteTransportUpdate(t)); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); }