diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java index 36c975eac891d079d7e2e5ce93133135b14f3446..cfdf931ea02c8405d796a3f93ea9bb82a0e4afd9 100644 --- a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java @@ -36,13 +36,13 @@ class IncomingStreamConnection extends StreamConnection { @Override protected ConnectionReader createConnectionReader() throws IOException { return connReaderFactory.createConnectionReader( - connection.getInputStream(), ctx.getSecret(), tag); + transport.getInputStream(), ctx.getSecret(), tag); } @Override protected ConnectionWriter createConnectionWriter() throws IOException { return connWriterFactory.createConnectionWriter( - connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(), + transport.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(), tag); } } diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java index 89252da5c0cc00c18f204c3f726e714eacb6f130..a24bce3400d41c73beb5ec4d75de3915439d04a5 100644 --- a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java @@ -43,7 +43,7 @@ class OutgoingStreamConnection extends StreamConnection { ctx = db.getConnectionContext(contactId, transportIndex); } return connReaderFactory.createConnectionReader( - connection.getInputStream(), ctx.getSecret()); + transport.getInputStream(), ctx.getSecret()); } @Override @@ -54,6 +54,6 @@ class OutgoingStreamConnection extends StreamConnection { ctx = db.getConnectionContext(contactId, transportIndex); } return connWriterFactory.createConnectionWriter( - connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret()); + transport.getOutputStream(), Long.MAX_VALUE, ctx.getSecret()); } } diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index c638afe3fb70718f42d8a97907a27a28d10154da..7544d729a6ac6a49370a14af71f598ffcb4fd9e2 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -50,11 +50,6 @@ abstract class StreamConnection implements DatabaseListener { private static final Logger LOG = Logger.getLogger(StreamConnection.class.getName()); - // A canary indicating that the connection should be closed - private static final Runnable CLOSE_CONNECTION = new Runnable() { - public void run() {} - }; - protected final Executor dbExecutor; protected final DatabaseComponent db; protected final ConnectionReaderFactory connReaderFactory; @@ -62,21 +57,22 @@ abstract class StreamConnection implements DatabaseListener { protected final ProtocolReaderFactory protoReaderFactory; protected final ProtocolWriterFactory protoWriterFactory; protected final ContactId contactId; - protected final StreamTransportConnection connection; + protected final StreamTransportConnection transport; - private final AtomicBoolean canSendOffer = new AtomicBoolean(false); + private final AtomicBoolean canSendOffer; private final LinkedList<Runnable> writerTasks; // Locking: this private Collection<MessageId> offered = null; // Locking: this private volatile ProtocolWriter writer = null; + private volatile boolean closed = false; StreamConnection(@DatabaseExecutor Executor dbExecutor, DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, - StreamTransportConnection connection) { + StreamTransportConnection transport) { this.dbExecutor = dbExecutor; this.db = db; this.connReaderFactory = connReaderFactory; @@ -84,7 +80,8 @@ abstract class StreamConnection implements DatabaseListener { this.protoReaderFactory = protoReaderFactory; this.protoWriterFactory = protoWriterFactory; this.contactId = contactId; - this.connection = connection; + this.transport = transport; + canSendOffer = new AtomicBoolean(false); writerTasks = new LinkedList<Runnable>(); } @@ -99,12 +96,7 @@ abstract class StreamConnection implements DatabaseListener { dbExecutor.execute(new GenerateAcks()); } else if(e instanceof ContactRemovedEvent) { ContactId c = ((ContactRemovedEvent) e).getContactId(); - if(contactId.equals(c)) { - synchronized(this) { - writerTasks.add(CLOSE_CONNECTION); - notifyAll(); - } - } + if(contactId.equals(c)) closed = true; } else if(e instanceof MessagesAddedEvent) { if(canSendOffer.getAndSet(false)) dbExecutor.execute(new GenerateOffer()); @@ -162,13 +154,13 @@ abstract class StreamConnection implements DatabaseListener { throw new FormatException(); } } - connection.dispose(true); + transport.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } @@ -198,7 +190,7 @@ abstract class StreamConnection implements DatabaseListener { dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateOffer()); // Main loop - while(true) { + while(!closed) { Runnable task = null; synchronized(this) { while(writerTasks.isEmpty()) { @@ -210,16 +202,15 @@ abstract class StreamConnection implements DatabaseListener { } task = writerTasks.poll(); } - if(task == CLOSE_CONNECTION) break; task.run(); } - connection.dispose(true); + transport.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } @@ -299,7 +290,7 @@ abstract class StreamConnection implements DatabaseListener { writer.writeRequest(request); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } } @@ -394,7 +385,7 @@ abstract class StreamConnection implements DatabaseListener { writer.writeAck(ack); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } } @@ -453,7 +444,7 @@ abstract class StreamConnection implements DatabaseListener { } } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } } @@ -499,7 +490,7 @@ abstract class StreamConnection implements DatabaseListener { writer.writeOffer(offer); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } } @@ -537,7 +528,7 @@ abstract class StreamConnection implements DatabaseListener { writer.writeSubscriptionUpdate(update); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } } @@ -575,7 +566,7 @@ abstract class StreamConnection implements DatabaseListener { writer.writeTransportUpdate(update); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - connection.dispose(false); + transport.dispose(false); } } }