diff --git a/api/net/sf/briar/api/plugins/Plugin.java b/api/net/sf/briar/api/plugins/Plugin.java index 6d476087ca1cc63877cd86f8c057b551e3624e53..a3ae0b87dbfa1a584b03540c2343339be29820ec 100644 --- a/api/net/sf/briar/api/plugins/Plugin.java +++ b/api/net/sf/briar/api/plugins/Plugin.java @@ -28,9 +28,8 @@ public interface Plugin { long getPollingInterval(); /** - * Attempts to establish connections using the current transport and - * configuration properties, and passes any created connections to the - * callback. + * Attempts to establish connections to all contacts, passing any created + * connections to the callback. */ void poll(); diff --git a/api/net/sf/briar/api/protocol/ProtocolWriter.java b/api/net/sf/briar/api/protocol/ProtocolWriter.java index 2e7c6aaf8cf26413809f9a5a5d55371412db38c6..fa07127b89650c04b79da4c43b44c289240b714d 100644 --- a/api/net/sf/briar/api/protocol/ProtocolWriter.java +++ b/api/net/sf/briar/api/protocol/ProtocolWriter.java @@ -4,6 +4,8 @@ import java.io.IOException; public interface ProtocolWriter { + void flush() throws IOException; + int getMaxBatchesForAck(long capacity); int getMaxMessagesForOffer(long capacity); diff --git a/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java b/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java index fbed9d08577c3360d6f457d19eb941c3e7c57317..8c0d465ebf2f3dd08fac8a60fef58888562bd10f 100644 --- a/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java +++ b/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java @@ -4,5 +4,5 @@ import java.io.OutputStream; public interface ProtocolWriterFactory { - ProtocolWriter createProtocolWriter(OutputStream out); + ProtocolWriter createProtocolWriter(OutputStream out, boolean flush); } diff --git a/api/net/sf/briar/api/transport/BatchTransportWriter.java b/api/net/sf/briar/api/transport/BatchTransportWriter.java index 4f2c6f087732ca427d7aada1fae3358bd1670def..0476543d47506271b0ba682c46f0fb2ada051b27 100644 --- a/api/net/sf/briar/api/transport/BatchTransportWriter.java +++ b/api/net/sf/briar/api/transport/BatchTransportWriter.java @@ -14,6 +14,11 @@ public interface BatchTransportWriter { /** Returns an output stream for writing to the transport. */ OutputStream getOutputStream(); + /** + * Returns true if the output stream should be flushed after each packet. + */ + boolean shouldFlush(); + /** * Closes the writer and disposes of any associated resources. The * argument indicates whether the writer is being closed because of an diff --git a/api/net/sf/briar/api/transport/StreamTransportConnection.java b/api/net/sf/briar/api/transport/StreamTransportConnection.java index 337aae9300ed9613d68ec91eaea84986d450a0db..528341541775a45974ee1d666bd05d6a9298eb0c 100644 --- a/api/net/sf/briar/api/transport/StreamTransportConnection.java +++ b/api/net/sf/briar/api/transport/StreamTransportConnection.java @@ -17,6 +17,11 @@ public interface StreamTransportConnection { /** Returns an output stream for writing to the connection. */ OutputStream getOutputStream() throws IOException; + /** + * Returns true if the output stream should be flushed after each packet. + */ + boolean shouldFlush(); + /** * Closes the connection and disposes of any associated resources. The * first argument indicates whether the connection is being closed because diff --git a/components/net/sf/briar/plugins/AbstractPlugin.java b/components/net/sf/briar/plugins/AbstractPlugin.java index 546c6b2d2893c1bd02fa1fe9fbccdf96025ade84..24b08006c8324c7513a110d6e7a64b250b56ebb3 100644 --- a/components/net/sf/briar/plugins/AbstractPlugin.java +++ b/components/net/sf/briar/plugins/AbstractPlugin.java @@ -10,19 +10,19 @@ public abstract class AbstractPlugin implements Plugin { protected final Executor pluginExecutor; - protected boolean started = false; // Locking: this + protected boolean running = false; // Locking: this protected AbstractPlugin(@PluginExecutor Executor pluginExecutor) { this.pluginExecutor = pluginExecutor; } public synchronized void start() throws IOException { - if(started) throw new IllegalStateException(); - started = true; + if(running) throw new IllegalStateException(); + running = true; } public synchronized void stop() throws IOException { - if(!started) throw new IllegalStateException(); - started = false; + if(!running) throw new IllegalStateException(); + running = false; } } diff --git a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java index 749ab8ea37512ae9800cd603f81b405b47219d99..ea1faa626b215b8d1c05ffc8b963c076b0a69f0f 100644 --- a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java +++ b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java @@ -78,30 +78,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } throw new IOException(e.toString()); } - pluginExecutor.execute(createContactSocketBinder()); - } - - @Override - public synchronized void stop() throws IOException { - super.stop(); - if(socket != null) { - socket.close(); - socket = null; - } - } - - private Runnable createContactSocketBinder() { - return new Runnable() { + pluginExecutor.execute(new Runnable() { public void run() { bindContactSocket(); } - }; + }); } private void bindContactSocket() { String uuid; synchronized(this) { - if(!started) return; + if(!running) return; uuid = getUuid(); makeDeviceDiscoverable(); } @@ -115,7 +102,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { return; } synchronized(this) { - if(!started) { + if(!running) { try { scn.close(); } catch(IOException e) { @@ -134,7 +121,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } private synchronized String getUuid() { - assert started; + assert running; TransportProperties p = callback.getLocalProperties(); String uuid = p.get("uuid"); if(uuid == null) { @@ -149,7 +136,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } private synchronized void makeDeviceDiscoverable() { - assert started; + assert running; // Try to make the device discoverable (requires root on Linux) try { localDevice.setDiscoverable(DiscoveryAgent.GIAC); @@ -169,7 +156,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { StreamConnectionNotifier scn; StreamConnection s; synchronized(this) { - if(!started) return; + if(!running) return; scn = socket; } try { @@ -185,6 +172,15 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } + @Override + public synchronized void stop() throws IOException { + super.stop(); + if(socket != null) { + socket.close(); + socket = null; + } + } + public boolean shouldPoll() { return true; } @@ -193,21 +189,24 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { return pollingInterval; } - public synchronized void poll() { - if(!started) return; - pluginExecutor.execute(createConnectors()); - } - - private Runnable createConnectors() { - return new Runnable() { + public void poll() { + synchronized(this) { + if(!running) return; + } + pluginExecutor.execute(new Runnable() { public void run() { connectAndCallBack(); } - }; + }); } private void connectAndCallBack() { - Map<ContactId, String> discovered = discoverContactUrls(); + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return; + remote = callback.getRemoteProperties(); + } + Map<ContactId, String> discovered = discoverContactUrls(remote); for(Entry<ContactId, String> e : discovered.entrySet()) { ContactId c = e.getKey(); String url = e.getValue(); @@ -216,13 +215,12 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } - private Map<ContactId, String> discoverContactUrls() { + private Map<ContactId, String> discoverContactUrls( + Map<ContactId, TransportProperties> remote) { DiscoveryAgent discoveryAgent; - Map<ContactId, TransportProperties> remote; synchronized(this) { - if(!started) return Collections.emptyMap(); + if(!running) return Collections.emptyMap(); discoveryAgent = localDevice.getDiscoveryAgent(); - remote = callback.getRemoteProperties(); } Map<String, ContactId> addresses = new HashMap<String, ContactId>(); Map<ContactId, String> uuids = new HashMap<ContactId, String>(); @@ -236,18 +234,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { uuids.put(c, uuid); } } + if(addresses.isEmpty()) return Collections.emptyMap(); ContactListener listener = new ContactListener(discoveryAgent, Collections.unmodifiableMap(addresses), Collections.unmodifiableMap(uuids)); synchronized(discoveryLock) { try { discoveryAgent.startInquiry(DiscoveryAgent.GIAC, listener); + return listener.waitForUrls(); } catch(BluetoothStateException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); return Collections.emptyMap(); - } - try { - return listener.waitForUrls(); } catch(InterruptedException e) { if(LOG.isLoggable(Level.INFO)) LOG.info("Interrupted while waiting for URLs"); @@ -259,12 +256,10 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private StreamTransportConnection connect(ContactId c, String url) { synchronized(this) { - if(!started) return null; + if(!running) return null; } try { - if(LOG.isLoggable(Level.INFO)) LOG.info("Connecting to " + url); StreamConnection s = (StreamConnection) Connector.open(url); - if(LOG.isLoggable(Level.INFO)) LOG.info("Connected"); return new BluetoothTransportConnection(s); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); @@ -273,7 +268,14 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } public StreamTransportConnection createConnection(ContactId c) { - String url = discoverContactUrls().get(c); + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return null; + remote = callback.getRemoteProperties(); + } + if(!remote.containsKey(c)) return null; + remote = Collections.singletonMap(c, remote.get(c)); + String url = discoverContactUrls(remote).get(c); return url == null ? null : connect(c, url); } @@ -325,7 +327,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void createInvitationConnection(ConnectionCallback c) { DiscoveryAgent discoveryAgent; synchronized(this) { - if(!started) return; + if(!running) return; discoveryAgent = localDevice.getDiscoveryAgent(); } // Try to discover the other party until the invitation times out @@ -350,7 +352,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } synchronized(this) { - if(!started) return; + if(!running) return; } } if(url == null) return; @@ -365,7 +367,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void bindInvitationSocket(final ConnectionCallback c) { synchronized(this) { - if(!started) return; + if(!running) return; makeDeviceDiscoverable(); } // Bind the socket @@ -400,7 +402,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void acceptInvitationConnection(ConnectionCallback c, StreamConnectionNotifier scn) { synchronized(this) { - if(!started) return; + if(!running) return; } try { StreamConnection s = scn.acceptAndOpen(); diff --git a/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java b/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java index 9b04eb437a8366da73ad940ae1cc7dc49dbf3474..c136875dcca2d724e19de51b89be861ee8e50d27 100644 --- a/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java +++ b/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java @@ -29,6 +29,10 @@ class BluetoothTransportConnection implements StreamTransportConnection { return stream.openOutputStream(); } + public boolean shouldFlush() { + return true; + } + public void dispose(boolean exception, boolean recognised) { try { stream.close(); diff --git a/components/net/sf/briar/plugins/file/FilePlugin.java b/components/net/sf/briar/plugins/file/FilePlugin.java index 7b13fc8a166d904a1de67cd29b065536e916eb17..52e0998d3934a4b84c0305fd38843891131cef84 100644 --- a/components/net/sf/briar/plugins/file/FilePlugin.java +++ b/components/net/sf/briar/plugins/file/FilePlugin.java @@ -64,7 +64,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin { private BatchTransportWriter createWriter(String filename) { synchronized(this) { - if(!started) return null; + if(!running) return null; } File dir = chooseOutputDirectory(); if(dir == null || !dir.exists() || !dir.isDirectory()) return null; @@ -86,7 +86,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin { } protected synchronized void createReaderFromFile(final File f) { - if(!started) return; + if(!running) return; pluginExecutor.execute(new ReaderCreator(f)); } diff --git a/components/net/sf/briar/plugins/file/FileTransportWriter.java b/components/net/sf/briar/plugins/file/FileTransportWriter.java index fee646fc1240cd09a4f0797720f6f0746afd123e..a7eaacebd203ad095bff933210880826d6abec4c 100644 --- a/components/net/sf/briar/plugins/file/FileTransportWriter.java +++ b/components/net/sf/briar/plugins/file/FileTransportWriter.java @@ -34,6 +34,10 @@ class FileTransportWriter implements BatchTransportWriter { return out; } + public boolean shouldFlush() { + return false; + } + public void dispose(boolean exception) { try { out.close(); diff --git a/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java b/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java index 31b575aefc6cbd97e8198c3033a64527c296a5b0..9bea390ea94e2750636cde207baf76bdfd5bf2ea 100644 --- a/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java +++ b/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java @@ -58,7 +58,7 @@ implements RemovableDriveMonitor.Callback { } public long getPollingInterval() { - return 0L; + throw new UnsupportedOperationException(); } public void poll() { diff --git a/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java b/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java index bc650249eeb890a7dfd57961d948d79c8f4b5b25..a38f982387d729f93bc09eb8cee3bff1a6e71731 100644 --- a/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java +++ b/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java @@ -53,19 +53,20 @@ class SimpleSocketPlugin extends SocketPlugin { @Override protected Socket createClientSocket() throws IOException { - assert started; + assert running; return new Socket(); } @Override protected ServerSocket createServerSocket() throws IOException { - assert started; + assert running; return new ServerSocket(); } + // Locking: this @Override - protected synchronized SocketAddress getLocalSocketAddress() { - assert started; + protected SocketAddress getLocalSocketAddress() { + assert running; SocketAddress addr = createSocketAddress(callback.getLocalProperties()); if(addr == null) { try { @@ -87,9 +88,10 @@ class SimpleSocketPlugin extends SocketPlugin { boolean link = addr.isLinkLocalAddress(); boolean site = addr.isSiteLocalAddress(); if(lan == (link || site)) { - if(LOG.isLoggable(Level.INFO)) + if(LOG.isLoggable(Level.INFO)) { LOG.info("Choosing interface " + addr.getHostAddress()); + } return addr; } } @@ -99,9 +101,10 @@ class SimpleSocketPlugin extends SocketPlugin { for(NetworkInterface iface : ifaces) { for(InetAddress addr : Collections.list(iface.getInetAddresses())) { if(!addr.isLoopbackAddress()) { - if(LOG.isLoggable(Level.INFO)) + if(LOG.isLoggable(Level.INFO)) { LOG.info("Accepting interface " + addr.getHostAddress()); + } return addr; } } @@ -109,16 +112,17 @@ class SimpleSocketPlugin extends SocketPlugin { throw new IOException("No suitable interfaces"); } + // Locking: this @Override - protected synchronized SocketAddress getRemoteSocketAddress(ContactId c) { - assert started; + protected SocketAddress getRemoteSocketAddress(ContactId c) { + assert running; TransportProperties p = callback.getRemoteProperties().get(c); return p == null ? null : createSocketAddress(p); } - private synchronized SocketAddress createSocketAddress( - TransportProperties p) { - assert started; + // Locking: this + private SocketAddress createSocketAddress(TransportProperties p) { + assert running; assert p != null; String host = p.get("external"); if(host == null) host = p.get("internal"); @@ -133,9 +137,10 @@ class SimpleSocketPlugin extends SocketPlugin { return new InetSocketAddress(host, port); } + // Locking: this @Override - protected synchronized void setLocalSocketAddress(SocketAddress s) { - assert started; + protected void setLocalSocketAddress(SocketAddress s) { + assert running; if(!(s instanceof InetSocketAddress)) throw new IllegalArgumentException(); InetSocketAddress i = (InetSocketAddress) s; diff --git a/components/net/sf/briar/plugins/socket/SocketPlugin.java b/components/net/sf/briar/plugins/socket/SocketPlugin.java index 552acc468773f95794b3a5853d1e6bd975d3d221..505375dd59c62e9c3f803838729a07d9c11082b4 100644 --- a/components/net/sf/briar/plugins/socket/SocketPlugin.java +++ b/components/net/sf/briar/plugins/socket/SocketPlugin.java @@ -4,11 +4,13 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.ContactId; +import net.sf.briar.api.TransportProperties; import net.sf.briar.api.plugins.PluginExecutor; import net.sf.briar.api.plugins.StreamPlugin; import net.sf.briar.api.plugins.StreamPluginCallback; @@ -54,7 +56,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { ServerSocket ss = null; try { synchronized(this) { - if(!started) return; + if(!running) return; addr = getLocalSocketAddress(); ss = createServerSocket(); if(addr == null || ss == null) return; @@ -77,7 +79,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { return; } synchronized(this) { - if(!started) { + if(!running) { try { ss.close(); } catch(IOException e) { @@ -93,7 +95,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { while(true) { Socket s; synchronized(this) { - if(!started) return; + if(!running) return; } try { s = ss.accept(); @@ -119,21 +121,19 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { if(socket != null) socket.close(); } - public synchronized void poll() { - // Subclasses may not support polling - if(!shouldPoll()) throw new UnsupportedOperationException(); - if(!started) return; - for(ContactId c : callback.getRemoteProperties().keySet()) { - pluginExecutor.execute(createConnector(c)); + public void poll() { + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return; + remote = callback.getRemoteProperties(); + } + for(final ContactId c : remote.keySet()) { + pluginExecutor.execute(new Runnable() { + public void run() { + connectAndCallBack(c); + } + }); } - } - - private Runnable createConnector(final ContactId c) { - return new Runnable() { - public void run() { - connectAndCallBack(c); - } - }; } private void connectAndCallBack(ContactId c) { @@ -146,7 +146,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { Socket s; try { synchronized(this) { - if(!started) return null; + if(!running) return null; addr = getRemoteSocketAddress(c); s = createClientSocket(); if(addr == null || s == null) return null; diff --git a/components/net/sf/briar/plugins/socket/SocketTransportConnection.java b/components/net/sf/briar/plugins/socket/SocketTransportConnection.java index f855f3310081c1cbd4340fa8fee10c765bed5c1c..2890ec63f8706910a314656fc0b3b2121021c662 100644 --- a/components/net/sf/briar/plugins/socket/SocketTransportConnection.java +++ b/components/net/sf/briar/plugins/socket/SocketTransportConnection.java @@ -28,6 +28,10 @@ class SocketTransportConnection implements StreamTransportConnection { return socket.getOutputStream(); } + public boolean shouldFlush() { + return true; + } + public void dispose(boolean exception, boolean recognised) { try { socket.close(); diff --git a/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java b/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java index 1327769f3a53654e81dd1a55afb3856f4b24bbd2..a2d40759e4fecc5b83303cf02af388eb9bca7cb7 100644 --- a/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java +++ b/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java @@ -21,7 +21,8 @@ class ProtocolWriterFactoryImpl implements ProtocolWriterFactory { this.writerFactory = writerFactory; } - public ProtocolWriter createProtocolWriter(OutputStream out) { - return new ProtocolWriterImpl(serial, writerFactory, out); + public ProtocolWriter createProtocolWriter(OutputStream out, + boolean flush) { + return new ProtocolWriterImpl(serial, writerFactory, out, flush); } } diff --git a/components/net/sf/briar/protocol/ProtocolWriterImpl.java b/components/net/sf/briar/protocol/ProtocolWriterImpl.java index 4dc1d44ece277d611b50e7e905609a097d04dce9..ba8aebfaa872e3ffa7dc8ab0fd720ca76814b3a3 100644 --- a/components/net/sf/briar/protocol/ProtocolWriterImpl.java +++ b/components/net/sf/briar/protocol/ProtocolWriterImpl.java @@ -27,12 +27,14 @@ class ProtocolWriterImpl implements ProtocolWriter { private final SerialComponent serial; private final OutputStream out; + private final boolean flush; private final Writer w; ProtocolWriterImpl(SerialComponent serial, WriterFactory writerFactory, - OutputStream out) { + OutputStream out, boolean flush) { this.serial = serial; this.out = out; + this.flush = flush; w = writerFactory.createWriter(out); } @@ -67,6 +69,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(BatchId b : a.getBatchIds()) w.writeBytes(b.getBytes()); w.writeListEnd(); + if(flush) out.flush(); } public void writeBatch(RawBatch b) throws IOException { @@ -74,6 +77,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(byte[] raw : b.getMessages()) out.write(raw); w.writeListEnd(); + if(flush) out.flush(); } public void writeOffer(Offer o) throws IOException { @@ -81,6 +85,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes()); w.writeListEnd(); + if(flush) out.flush(); } public void writeRequest(Request r) throws IOException { @@ -100,6 +105,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeStructId(Types.REQUEST); w.writeUint7((byte) (bytes * 8 - length)); w.writeBytes(bitmap); + if(flush) out.flush(); } public void writeSubscriptionUpdate(SubscriptionUpdate s) @@ -112,6 +118,7 @@ class ProtocolWriterImpl implements ProtocolWriter { } w.writeMapEnd(); w.writeInt64(s.getTimestamp()); + if(flush) out.flush(); } private void writeGroup(Writer w, Group g) throws IOException { @@ -133,5 +140,10 @@ class ProtocolWriterImpl implements ProtocolWriter { } w.writeListEnd(); w.writeInt64(t.getTimestamp()); + if(flush) out.flush(); + } + + public void flush() throws IOException { + out.flush(); } } diff --git a/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java b/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java index 1e78871fb8d6e4226f744be104cbd225180158b0..720b0d32352b927c6ee2f2946754b00cd65b272a 100644 --- a/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java +++ b/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java @@ -55,7 +55,8 @@ class OutgoingBatchConnection { transport.getOutputStream(), transport.getCapacity(), ctx.getSecret()); OutputStream out = conn.getOutputStream(); - ProtocolWriter writer = protoFactory.createProtocolWriter(out); + ProtocolWriter writer = protoFactory.createProtocolWriter(out, + transport.shouldFlush()); // There should be enough space for a packet long capacity = conn.getRemainingCapacity(); if(capacity < MAX_PACKET_LENGTH) throw new EOFException(); @@ -88,8 +89,7 @@ class OutgoingBatchConnection { capacity = writer.getMessageCapacityForBatch(capacity); b = db.generateBatch(contactId, (int) capacity); } - // Flush the output stream - out.flush(); + writer.flush(); transport.dispose(false); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); diff --git a/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java b/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java index e7229ab7ac5f94b83cb3323ff8bebe7c5e8e3df3..846ac80de250fd110b0033d5ec9e672e8722438f 100644 --- a/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java @@ -26,11 +26,11 @@ class IncomingStreamConnection extends StreamConnection { ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, - ConnectionContext ctx, StreamTransportConnection connection, + ConnectionContext ctx, StreamTransportConnection transport, byte[] tag) { super(dbExecutor, verificationExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, - ctx.getContactId(), connection); + ctx.getContactId(), transport); this.ctx = ctx; this.tag = tag; } diff --git a/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java b/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java index c27724f1424b624f044bfe0b23f2271d975f2137..68c33157adea4ac25a0b68d9ce37aa56396b056f 100644 --- a/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java @@ -31,10 +31,10 @@ class OutgoingStreamConnection extends StreamConnection { ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, TransportIndex transportIndex, - StreamTransportConnection connection) { + StreamTransportConnection transport) { super(dbExecutor, verificationExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, - contactId, connection); + contactId, transport); this.transportIndex = transportIndex; } diff --git a/components/net/sf/briar/protocol/stream/StreamConnection.java b/components/net/sf/briar/protocol/stream/StreamConnection.java index 22486cdc06fe53c10e0dfc18a711d472e694986f..7d378db49fe3c49d59e33e1b296c23455e41cf54 100644 --- a/components/net/sf/briar/protocol/stream/StreamConnection.java +++ b/components/net/sf/briar/protocol/stream/StreamConnection.java @@ -191,7 +191,8 @@ abstract class StreamConnection implements DatabaseListener { try { db.addListener(this); OutputStream out = createConnectionWriter().getOutputStream(); - writer = protoWriterFactory.createProtocolWriter(out); + writer = protoWriterFactory.createProtocolWriter(out, + transport.shouldFlush()); // Send the initial packets: transports, subs, acks, offer dbExecutor.execute(new GenerateTransportUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate()); @@ -203,6 +204,7 @@ abstract class StreamConnection implements DatabaseListener { if(task == CLOSE) break; task.run(); } + writer.flush(); if(!disposed.getAndSet(true)) transport.dispose(false, true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); diff --git a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java index b78c2a55c50ff8823d76ecfb1827002577ca7737..8a713928565b1cf168f22f04486f80fad8fa1da7 100644 --- a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java +++ b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java @@ -77,53 +77,58 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { private class DispatchBatchConnection implements Runnable { - private final TransportId t; - private final BatchTransportReader r; + private final TransportId transportId; + private final BatchTransportReader transport; - private DispatchBatchConnection(TransportId t, BatchTransportReader r) { - this.t = t; - this.r = r; + private DispatchBatchConnection(TransportId transportId, + BatchTransportReader transport) { + this.transportId = transportId; + this.transport = transport; } public void run() { try { - byte[] tag = readTag(r.getInputStream()); - ConnectionContext ctx = recogniser.acceptConnection(t, tag); - if(ctx == null) r.dispose(false, false); - else batchConnFactory.createIncomingConnection(ctx, r, tag); + byte[] tag = readTag(transport.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(transportId, + tag); + if(ctx == null) transport.dispose(false, false); + else batchConnFactory.createIncomingConnection(ctx, transport, + tag); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - r.dispose(true, false); + transport.dispose(true, false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - r.dispose(true, false); + transport.dispose(true, false); } } } private class DispatchStreamConnection implements Runnable { - private final TransportId t; - private final StreamTransportConnection s; + private final TransportId transportId; + private final StreamTransportConnection transport; - private DispatchStreamConnection(TransportId t, - StreamTransportConnection s) { - this.t = t; - this.s = s; + private DispatchStreamConnection(TransportId transportId, + StreamTransportConnection transport) { + this.transportId = transportId; + this.transport = transport; } public void run() { try { - byte[] tag = readTag(s.getInputStream()); - ConnectionContext ctx = recogniser.acceptConnection(t, tag); - if(ctx == null) s.dispose(false, false); - else streamConnFactory.createIncomingConnection(ctx, s, tag); + byte[] tag = readTag(transport.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(transportId, + tag); + if(ctx == null) transport.dispose(false, false); + else streamConnFactory.createIncomingConnection(ctx, transport, + tag); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - s.dispose(true, false); + transport.dispose(true, false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - s.dispose(true, false); + transport.dispose(true, false); } } } diff --git a/test/net/sf/briar/ProtocolIntegrationTest.java b/test/net/sf/briar/ProtocolIntegrationTest.java index 6e4559538598bd72afe5df527c698d1072af8121..43590eec301c247abb0ff8b1db57869a52cc4b24 100644 --- a/test/net/sf/briar/ProtocolIntegrationTest.java +++ b/test/net/sf/briar/ProtocolIntegrationTest.java @@ -139,10 +139,11 @@ public class ProtocolIntegrationTest extends TestCase { ConnectionWriter conn = connectionWriterFactory.createConnectionWriter( out, Long.MAX_VALUE, secret.clone()); OutputStream out1 = conn.getOutputStream(); - ProtocolWriter proto = protocolWriterFactory.createProtocolWriter(out1); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out1, + false); Ack a = packetFactory.createAck(Collections.singletonList(ack)); - proto.writeAck(a); + writer.writeAck(a); Collection<byte[]> batch = Arrays.asList(new byte[][] { message.getSerialised(), @@ -151,7 +152,7 @@ public class ProtocolIntegrationTest extends TestCase { message3.getSerialised() }); RawBatch b = packetFactory.createBatch(batch); - proto.writeBatch(b); + writer.writeBatch(b); Collection<MessageId> offer = Arrays.asList(new MessageId[] { message.getId(), @@ -160,13 +161,13 @@ public class ProtocolIntegrationTest extends TestCase { message3.getId() }); Offer o = packetFactory.createOffer(offer); - proto.writeOffer(o); + writer.writeOffer(o); BitSet requested = new BitSet(4); requested.set(1); requested.set(3); Request r = packetFactory.createRequest(requested, 4); - proto.writeRequest(r); + writer.writeRequest(r); // Use a LinkedHashMap for predictable iteration order Map<Group, Long> subs = new LinkedHashMap<Group, Long>(); @@ -174,13 +175,13 @@ public class ProtocolIntegrationTest extends TestCase { subs.put(group1, 0L); SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs, timestamp); - proto.writeSubscriptionUpdate(s); + writer.writeSubscriptionUpdate(s); TransportUpdate t = packetFactory.createTransportUpdate(transports, timestamp); - proto.writeTransportUpdate(t); + writer.writeTransportUpdate(t); - out1.flush(); + writer.flush(); return out.toByteArray(); } @@ -188,20 +189,19 @@ public class ProtocolIntegrationTest extends TestCase { InputStream in = new ByteArrayInputStream(connectionData); byte[] tag = new byte[TAG_LENGTH]; assertEquals(TAG_LENGTH, in.read(tag, 0, TAG_LENGTH)); - ConnectionReader r = connectionReaderFactory.createConnectionReader(in, - secret.clone(), tag); - in = r.getInputStream(); - ProtocolReader protocolReader = - protocolReaderFactory.createProtocolReader(in); + ConnectionReader conn = connectionReaderFactory.createConnectionReader( + in, secret.clone(), tag); + InputStream in1 = conn.getInputStream(); + ProtocolReader reader = protocolReaderFactory.createProtocolReader(in1); // Read the ack - assertTrue(protocolReader.hasAck()); - Ack a = protocolReader.readAck(); + assertTrue(reader.hasAck()); + Ack a = reader.readAck(); assertEquals(Collections.singletonList(ack), a.getBatchIds()); // Read and verify the batch - assertTrue(protocolReader.hasBatch()); - Batch b = protocolReader.readBatch().verify(); + assertTrue(reader.hasBatch()); + Batch b = reader.readBatch().verify(); Collection<Message> messages = b.getMessages(); assertEquals(4, messages.size()); Iterator<Message> it = messages.iterator(); @@ -211,8 +211,8 @@ public class ProtocolIntegrationTest extends TestCase { checkMessageEquality(message3, it.next()); // Read the offer - assertTrue(protocolReader.hasOffer()); - Offer o = protocolReader.readOffer(); + assertTrue(reader.hasOffer()); + Offer o = reader.readOffer(); Collection<MessageId> offered = o.getMessageIds(); assertEquals(4, offered.size()); Iterator<MessageId> it1 = offered.iterator(); @@ -222,8 +222,8 @@ public class ProtocolIntegrationTest extends TestCase { assertEquals(message3.getId(), it1.next()); // Read the request - assertTrue(protocolReader.hasRequest()); - Request req = protocolReader.readRequest(); + assertTrue(reader.hasRequest()); + Request req = reader.readRequest(); BitSet requested = req.getBitmap(); assertFalse(requested.get(0)); assertTrue(requested.get(1)); @@ -233,8 +233,8 @@ public class ProtocolIntegrationTest extends TestCase { assertEquals(2, requested.cardinality()); // Read the subscription update - assertTrue(protocolReader.hasSubscriptionUpdate()); - SubscriptionUpdate s = protocolReader.readSubscriptionUpdate(); + assertTrue(reader.hasSubscriptionUpdate()); + SubscriptionUpdate s = reader.readSubscriptionUpdate(); Map<Group, Long> subs = s.getSubscriptions(); assertEquals(2, subs.size()); assertEquals(Long.valueOf(0L), subs.get(group)); @@ -242,8 +242,8 @@ public class ProtocolIntegrationTest extends TestCase { assertTrue(s.getTimestamp() == timestamp); // Read the transport update - assertTrue(protocolReader.hasTransportUpdate()); - TransportUpdate t = protocolReader.readTransportUpdate(); + assertTrue(reader.hasTransportUpdate()); + TransportUpdate t = reader.readTransportUpdate(); assertEquals(transports, t.getTransports()); assertTrue(t.getTimestamp() == timestamp); diff --git a/test/net/sf/briar/protocol/ConstantsTest.java b/test/net/sf/briar/protocol/ConstantsTest.java index 14b5e3ca565a198081c097e4f7d9c51514f2a24e..2d02098f0754456f7b88d985aae74c712e103efd 100644 --- a/test/net/sf/briar/protocol/ConstantsTest.java +++ b/test/net/sf/briar/protocol/ConstantsTest.java @@ -84,7 +84,8 @@ public class ConstantsTest extends TestCase { private void testBatchesFitIntoAck(int length) throws Exception { // Create an ack with as many batch IDs as possible ByteArrayOutputStream out = new ByteArrayOutputStream(length); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); int maxBatches = writer.getMaxBatchesForAck(length); Collection<BatchId> acked = new ArrayList<BatchId>(); for(int i = 0; i < maxBatches; i++) { @@ -116,7 +117,8 @@ public class ConstantsTest extends TestCase { // Add the message to a batch ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); RawBatch b = packetFactory.createBatch(Collections.singletonList( message.getSerialised())); writer.writeBatch(b); @@ -140,7 +142,8 @@ public class ConstantsTest extends TestCase { private void testMessagesFitIntoOffer(int length) throws Exception { // Create an offer with as many message IDs as possible ByteArrayOutputStream out = new ByteArrayOutputStream(length); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); int maxMessages = writer.getMaxMessagesForOffer(length); Collection<MessageId> offered = new ArrayList<MessageId>(); for(int i = 0; i < maxMessages; i++) { @@ -165,7 +168,8 @@ public class ConstantsTest extends TestCase { // Add the subscriptions to an update ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs, Long.MAX_VALUE); writer.writeSubscriptionUpdate(s); @@ -194,7 +198,8 @@ public class ConstantsTest extends TestCase { // Add the transports to an update ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); TransportUpdate t = packetFactory.createTransportUpdate(transports, Long.MAX_VALUE); writer.writeTransportUpdate(t); diff --git a/test/net/sf/briar/protocol/ProtocolReadWriteTest.java b/test/net/sf/briar/protocol/ProtocolReadWriteTest.java index 880e45438eaf703bbacc839fc3afdc1bc6eefa25..d74d910814539a4c1f89a1e55da5b907a785d417 100644 --- a/test/net/sf/briar/protocol/ProtocolReadWriteTest.java +++ b/test/net/sf/briar/protocol/ProtocolReadWriteTest.java @@ -80,7 +80,7 @@ public class ProtocolReadWriteTest extends TestCase { public void testWriteAndRead() throws Exception { // Write ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter writer = writerFactory.createProtocolWriter(out); + ProtocolWriter writer = writerFactory.createProtocolWriter(out, true); Ack a = packetFactory.createAck(Collections.singletonList(batchId)); writer.writeAck(a); diff --git a/test/net/sf/briar/protocol/ProtocolWriterImplTest.java b/test/net/sf/briar/protocol/ProtocolWriterImplTest.java index c558bda32e52f9540ff743ee2cb59f1ab9c713c0..fa1139e85945a63aec0580c536ce4cc8d10f4107 100644 --- a/test/net/sf/briar/protocol/ProtocolWriterImplTest.java +++ b/test/net/sf/briar/protocol/ProtocolWriterImplTest.java @@ -37,7 +37,8 @@ public class ProtocolWriterImplTest extends TestCase { @Test public void testWriteBitmapNoPadding() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out); + ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out, + true); BitSet b = new BitSet(); // 11011001 = 0xD9 b.set(0); @@ -61,7 +62,8 @@ public class ProtocolWriterImplTest extends TestCase { @Test public void testWriteBitmapWithPadding() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out); + ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out, + true); BitSet b = new BitSet(); // 01011001 = 0x59 b.set(1); diff --git a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java index 2df1d1035bff026a98380ac5799bfd27f57fd3f7..d2916e7bda75393e7cc458039c5c59e073f75ec3 100644 --- a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java +++ b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java @@ -112,7 +112,7 @@ public class BatchConnectionReadWriteTest extends TestCase { ProtocolWriterFactory protoFactory = alice.getInstance(ProtocolWriterFactory.class); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - Long.MAX_VALUE); + Long.MAX_VALUE, false); OutgoingBatchConnection batchOut = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); diff --git a/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java b/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java index 0e8759eaf063494db6694e333f0a2ba89b112447..7fbf325b6625adf0f98a1f4446ecab2831f341d3 100644 --- a/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java +++ b/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java @@ -73,7 +73,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testConnectionTooShort() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - ProtocolConstants.MAX_PACKET_LENGTH); + ProtocolConstants.MAX_PACKET_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); @@ -97,7 +97,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testNothingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - TransportConstants.MIN_CONNECTION_LENGTH); + TransportConstants.MIN_CONNECTION_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); @@ -133,7 +133,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testSomethingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - TransportConstants.MIN_CONNECTION_LENGTH); + TransportConstants.MIN_CONNECTION_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); diff --git a/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java b/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java index 80bafcb4f0c2cba6e6f7b78beac87a2bda33a484..c3afcd2906c7d861278635c96a87e0151cccae36 100644 --- a/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java +++ b/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java @@ -9,12 +9,15 @@ class TestBatchTransportWriter implements BatchTransportWriter { private final ByteArrayOutputStream out; private final long capacity; + private final boolean flush; private boolean disposed = false, exception = false; - TestBatchTransportWriter(ByteArrayOutputStream out, long capacity) { + TestBatchTransportWriter(ByteArrayOutputStream out, long capacity, + boolean flush) { this.out = out; this.capacity = capacity; + this.flush = flush; } public long getCapacity() { @@ -25,6 +28,10 @@ class TestBatchTransportWriter implements BatchTransportWriter { return out; } + public boolean shouldFlush() { + return flush; + } + public void dispose(boolean exception) { assert !disposed; disposed = true;