From 2494ff1a1ee83c15f4c1485731d52df08e4d1658 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Thu, 8 Dec 2011 22:13:35 +0000 Subject: [PATCH] Let the plugin determine whether to flush the output stream after each packet. --- api/net/sf/briar/api/plugins/Plugin.java | 5 +- .../sf/briar/api/protocol/ProtocolWriter.java | 2 + .../api/protocol/ProtocolWriterFactory.java | 2 +- .../api/transport/BatchTransportWriter.java | 5 ++ .../transport/StreamTransportConnection.java | 5 ++ .../net/sf/briar/plugins/AbstractPlugin.java | 10 +-- .../plugins/bluetooth/BluetoothPlugin.java | 90 ++++++++++--------- .../BluetoothTransportConnection.java | 4 + .../net/sf/briar/plugins/file/FilePlugin.java | 4 +- .../plugins/file/FileTransportWriter.java | 4 + .../plugins/file/RemovableDrivePlugin.java | 2 +- .../plugins/socket/SimpleSocketPlugin.java | 31 ++++--- .../sf/briar/plugins/socket/SocketPlugin.java | 36 ++++---- .../socket/SocketTransportConnection.java | 4 + .../protocol/ProtocolWriterFactoryImpl.java | 5 +- .../sf/briar/protocol/ProtocolWriterImpl.java | 14 ++- .../batch/OutgoingBatchConnection.java | 6 +- .../stream/IncomingStreamConnection.java | 4 +- .../stream/OutgoingStreamConnection.java | 4 +- .../protocol/stream/StreamConnection.java | 4 +- .../transport/ConnectionDispatcherImpl.java | 51 ++++++----- .../net/sf/briar/ProtocolIntegrationTest.java | 50 +++++------ test/net/sf/briar/protocol/ConstantsTest.java | 15 ++-- .../briar/protocol/ProtocolReadWriteTest.java | 2 +- .../protocol/ProtocolWriterImplTest.java | 6 +- .../batch/BatchConnectionReadWriteTest.java | 2 +- .../batch/OutgoingBatchConnectionTest.java | 6 +- .../batch/TestBatchTransportWriter.java | 9 +- 28 files changed, 223 insertions(+), 159 deletions(-) diff --git a/api/net/sf/briar/api/plugins/Plugin.java b/api/net/sf/briar/api/plugins/Plugin.java index 6d476087ca..a3ae0b87db 100644 --- a/api/net/sf/briar/api/plugins/Plugin.java +++ b/api/net/sf/briar/api/plugins/Plugin.java @@ -28,9 +28,8 @@ public interface Plugin { long getPollingInterval(); /** - * Attempts to establish connections using the current transport and - * configuration properties, and passes any created connections to the - * callback. + * Attempts to establish connections to all contacts, passing any created + * connections to the callback. */ void poll(); diff --git a/api/net/sf/briar/api/protocol/ProtocolWriter.java b/api/net/sf/briar/api/protocol/ProtocolWriter.java index 2e7c6aaf8c..fa07127b89 100644 --- a/api/net/sf/briar/api/protocol/ProtocolWriter.java +++ b/api/net/sf/briar/api/protocol/ProtocolWriter.java @@ -4,6 +4,8 @@ import java.io.IOException; public interface ProtocolWriter { + void flush() throws IOException; + int getMaxBatchesForAck(long capacity); int getMaxMessagesForOffer(long capacity); diff --git a/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java b/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java index fbed9d0857..8c0d465ebf 100644 --- a/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java +++ b/api/net/sf/briar/api/protocol/ProtocolWriterFactory.java @@ -4,5 +4,5 @@ import java.io.OutputStream; public interface ProtocolWriterFactory { - ProtocolWriter createProtocolWriter(OutputStream out); + ProtocolWriter createProtocolWriter(OutputStream out, boolean flush); } diff --git a/api/net/sf/briar/api/transport/BatchTransportWriter.java b/api/net/sf/briar/api/transport/BatchTransportWriter.java index 4f2c6f0877..0476543d47 100644 --- a/api/net/sf/briar/api/transport/BatchTransportWriter.java +++ b/api/net/sf/briar/api/transport/BatchTransportWriter.java @@ -14,6 +14,11 @@ public interface BatchTransportWriter { /** Returns an output stream for writing to the transport. */ OutputStream getOutputStream(); + /** + * Returns true if the output stream should be flushed after each packet. + */ + boolean shouldFlush(); + /** * Closes the writer and disposes of any associated resources. The * argument indicates whether the writer is being closed because of an diff --git a/api/net/sf/briar/api/transport/StreamTransportConnection.java b/api/net/sf/briar/api/transport/StreamTransportConnection.java index 337aae9300..5283415417 100644 --- a/api/net/sf/briar/api/transport/StreamTransportConnection.java +++ b/api/net/sf/briar/api/transport/StreamTransportConnection.java @@ -17,6 +17,11 @@ public interface StreamTransportConnection { /** Returns an output stream for writing to the connection. */ OutputStream getOutputStream() throws IOException; + /** + * Returns true if the output stream should be flushed after each packet. + */ + boolean shouldFlush(); + /** * Closes the connection and disposes of any associated resources. The * first argument indicates whether the connection is being closed because diff --git a/components/net/sf/briar/plugins/AbstractPlugin.java b/components/net/sf/briar/plugins/AbstractPlugin.java index 546c6b2d28..24b08006c8 100644 --- a/components/net/sf/briar/plugins/AbstractPlugin.java +++ b/components/net/sf/briar/plugins/AbstractPlugin.java @@ -10,19 +10,19 @@ public abstract class AbstractPlugin implements Plugin { protected final Executor pluginExecutor; - protected boolean started = false; // Locking: this + protected boolean running = false; // Locking: this protected AbstractPlugin(@PluginExecutor Executor pluginExecutor) { this.pluginExecutor = pluginExecutor; } public synchronized void start() throws IOException { - if(started) throw new IllegalStateException(); - started = true; + if(running) throw new IllegalStateException(); + running = true; } public synchronized void stop() throws IOException { - if(!started) throw new IllegalStateException(); - started = false; + if(!running) throw new IllegalStateException(); + running = false; } } diff --git a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java index 749ab8ea37..ea1faa626b 100644 --- a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java +++ b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java @@ -78,30 +78,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } throw new IOException(e.toString()); } - pluginExecutor.execute(createContactSocketBinder()); - } - - @Override - public synchronized void stop() throws IOException { - super.stop(); - if(socket != null) { - socket.close(); - socket = null; - } - } - - private Runnable createContactSocketBinder() { - return new Runnable() { + pluginExecutor.execute(new Runnable() { public void run() { bindContactSocket(); } - }; + }); } private void bindContactSocket() { String uuid; synchronized(this) { - if(!started) return; + if(!running) return; uuid = getUuid(); makeDeviceDiscoverable(); } @@ -115,7 +102,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { return; } synchronized(this) { - if(!started) { + if(!running) { try { scn.close(); } catch(IOException e) { @@ -134,7 +121,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } private synchronized String getUuid() { - assert started; + assert running; TransportProperties p = callback.getLocalProperties(); String uuid = p.get("uuid"); if(uuid == null) { @@ -149,7 +136,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } private synchronized void makeDeviceDiscoverable() { - assert started; + assert running; // Try to make the device discoverable (requires root on Linux) try { localDevice.setDiscoverable(DiscoveryAgent.GIAC); @@ -169,7 +156,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { StreamConnectionNotifier scn; StreamConnection s; synchronized(this) { - if(!started) return; + if(!running) return; scn = socket; } try { @@ -185,6 +172,15 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } + @Override + public synchronized void stop() throws IOException { + super.stop(); + if(socket != null) { + socket.close(); + socket = null; + } + } + public boolean shouldPoll() { return true; } @@ -193,21 +189,24 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { return pollingInterval; } - public synchronized void poll() { - if(!started) return; - pluginExecutor.execute(createConnectors()); - } - - private Runnable createConnectors() { - return new Runnable() { + public void poll() { + synchronized(this) { + if(!running) return; + } + pluginExecutor.execute(new Runnable() { public void run() { connectAndCallBack(); } - }; + }); } private void connectAndCallBack() { - Map<ContactId, String> discovered = discoverContactUrls(); + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return; + remote = callback.getRemoteProperties(); + } + Map<ContactId, String> discovered = discoverContactUrls(remote); for(Entry<ContactId, String> e : discovered.entrySet()) { ContactId c = e.getKey(); String url = e.getValue(); @@ -216,13 +215,12 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } - private Map<ContactId, String> discoverContactUrls() { + private Map<ContactId, String> discoverContactUrls( + Map<ContactId, TransportProperties> remote) { DiscoveryAgent discoveryAgent; - Map<ContactId, TransportProperties> remote; synchronized(this) { - if(!started) return Collections.emptyMap(); + if(!running) return Collections.emptyMap(); discoveryAgent = localDevice.getDiscoveryAgent(); - remote = callback.getRemoteProperties(); } Map<String, ContactId> addresses = new HashMap<String, ContactId>(); Map<ContactId, String> uuids = new HashMap<ContactId, String>(); @@ -236,18 +234,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { uuids.put(c, uuid); } } + if(addresses.isEmpty()) return Collections.emptyMap(); ContactListener listener = new ContactListener(discoveryAgent, Collections.unmodifiableMap(addresses), Collections.unmodifiableMap(uuids)); synchronized(discoveryLock) { try { discoveryAgent.startInquiry(DiscoveryAgent.GIAC, listener); + return listener.waitForUrls(); } catch(BluetoothStateException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); return Collections.emptyMap(); - } - try { - return listener.waitForUrls(); } catch(InterruptedException e) { if(LOG.isLoggable(Level.INFO)) LOG.info("Interrupted while waiting for URLs"); @@ -259,12 +256,10 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private StreamTransportConnection connect(ContactId c, String url) { synchronized(this) { - if(!started) return null; + if(!running) return null; } try { - if(LOG.isLoggable(Level.INFO)) LOG.info("Connecting to " + url); StreamConnection s = (StreamConnection) Connector.open(url); - if(LOG.isLoggable(Level.INFO)) LOG.info("Connected"); return new BluetoothTransportConnection(s); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); @@ -273,7 +268,14 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } public StreamTransportConnection createConnection(ContactId c) { - String url = discoverContactUrls().get(c); + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return null; + remote = callback.getRemoteProperties(); + } + if(!remote.containsKey(c)) return null; + remote = Collections.singletonMap(c, remote.get(c)); + String url = discoverContactUrls(remote).get(c); return url == null ? null : connect(c, url); } @@ -325,7 +327,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void createInvitationConnection(ConnectionCallback c) { DiscoveryAgent discoveryAgent; synchronized(this) { - if(!started) return; + if(!running) return; discoveryAgent = localDevice.getDiscoveryAgent(); } // Try to discover the other party until the invitation times out @@ -350,7 +352,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } synchronized(this) { - if(!started) return; + if(!running) return; } } if(url == null) return; @@ -365,7 +367,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void bindInvitationSocket(final ConnectionCallback c) { synchronized(this) { - if(!started) return; + if(!running) return; makeDeviceDiscoverable(); } // Bind the socket @@ -400,7 +402,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { private void acceptInvitationConnection(ConnectionCallback c, StreamConnectionNotifier scn) { synchronized(this) { - if(!started) return; + if(!running) return; } try { StreamConnection s = scn.acceptAndOpen(); diff --git a/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java b/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java index 9b04eb437a..c136875dcc 100644 --- a/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java +++ b/components/net/sf/briar/plugins/bluetooth/BluetoothTransportConnection.java @@ -29,6 +29,10 @@ class BluetoothTransportConnection implements StreamTransportConnection { return stream.openOutputStream(); } + public boolean shouldFlush() { + return true; + } + public void dispose(boolean exception, boolean recognised) { try { stream.close(); diff --git a/components/net/sf/briar/plugins/file/FilePlugin.java b/components/net/sf/briar/plugins/file/FilePlugin.java index 7b13fc8a16..52e0998d39 100644 --- a/components/net/sf/briar/plugins/file/FilePlugin.java +++ b/components/net/sf/briar/plugins/file/FilePlugin.java @@ -64,7 +64,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin { private BatchTransportWriter createWriter(String filename) { synchronized(this) { - if(!started) return null; + if(!running) return null; } File dir = chooseOutputDirectory(); if(dir == null || !dir.exists() || !dir.isDirectory()) return null; @@ -86,7 +86,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin { } protected synchronized void createReaderFromFile(final File f) { - if(!started) return; + if(!running) return; pluginExecutor.execute(new ReaderCreator(f)); } diff --git a/components/net/sf/briar/plugins/file/FileTransportWriter.java b/components/net/sf/briar/plugins/file/FileTransportWriter.java index fee646fc12..a7eaacebd2 100644 --- a/components/net/sf/briar/plugins/file/FileTransportWriter.java +++ b/components/net/sf/briar/plugins/file/FileTransportWriter.java @@ -34,6 +34,10 @@ class FileTransportWriter implements BatchTransportWriter { return out; } + public boolean shouldFlush() { + return false; + } + public void dispose(boolean exception) { try { out.close(); diff --git a/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java b/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java index 31b575aefc..9bea390ea9 100644 --- a/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java +++ b/components/net/sf/briar/plugins/file/RemovableDrivePlugin.java @@ -58,7 +58,7 @@ implements RemovableDriveMonitor.Callback { } public long getPollingInterval() { - return 0L; + throw new UnsupportedOperationException(); } public void poll() { diff --git a/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java b/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java index bc650249ee..a38f982387 100644 --- a/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java +++ b/components/net/sf/briar/plugins/socket/SimpleSocketPlugin.java @@ -53,19 +53,20 @@ class SimpleSocketPlugin extends SocketPlugin { @Override protected Socket createClientSocket() throws IOException { - assert started; + assert running; return new Socket(); } @Override protected ServerSocket createServerSocket() throws IOException { - assert started; + assert running; return new ServerSocket(); } + // Locking: this @Override - protected synchronized SocketAddress getLocalSocketAddress() { - assert started; + protected SocketAddress getLocalSocketAddress() { + assert running; SocketAddress addr = createSocketAddress(callback.getLocalProperties()); if(addr == null) { try { @@ -87,9 +88,10 @@ class SimpleSocketPlugin extends SocketPlugin { boolean link = addr.isLinkLocalAddress(); boolean site = addr.isSiteLocalAddress(); if(lan == (link || site)) { - if(LOG.isLoggable(Level.INFO)) + if(LOG.isLoggable(Level.INFO)) { LOG.info("Choosing interface " + addr.getHostAddress()); + } return addr; } } @@ -99,9 +101,10 @@ class SimpleSocketPlugin extends SocketPlugin { for(NetworkInterface iface : ifaces) { for(InetAddress addr : Collections.list(iface.getInetAddresses())) { if(!addr.isLoopbackAddress()) { - if(LOG.isLoggable(Level.INFO)) + if(LOG.isLoggable(Level.INFO)) { LOG.info("Accepting interface " + addr.getHostAddress()); + } return addr; } } @@ -109,16 +112,17 @@ class SimpleSocketPlugin extends SocketPlugin { throw new IOException("No suitable interfaces"); } + // Locking: this @Override - protected synchronized SocketAddress getRemoteSocketAddress(ContactId c) { - assert started; + protected SocketAddress getRemoteSocketAddress(ContactId c) { + assert running; TransportProperties p = callback.getRemoteProperties().get(c); return p == null ? null : createSocketAddress(p); } - private synchronized SocketAddress createSocketAddress( - TransportProperties p) { - assert started; + // Locking: this + private SocketAddress createSocketAddress(TransportProperties p) { + assert running; assert p != null; String host = p.get("external"); if(host == null) host = p.get("internal"); @@ -133,9 +137,10 @@ class SimpleSocketPlugin extends SocketPlugin { return new InetSocketAddress(host, port); } + // Locking: this @Override - protected synchronized void setLocalSocketAddress(SocketAddress s) { - assert started; + protected void setLocalSocketAddress(SocketAddress s) { + assert running; if(!(s instanceof InetSocketAddress)) throw new IllegalArgumentException(); InetSocketAddress i = (InetSocketAddress) s; diff --git a/components/net/sf/briar/plugins/socket/SocketPlugin.java b/components/net/sf/briar/plugins/socket/SocketPlugin.java index 552acc4687..505375dd59 100644 --- a/components/net/sf/briar/plugins/socket/SocketPlugin.java +++ b/components/net/sf/briar/plugins/socket/SocketPlugin.java @@ -4,11 +4,13 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; import net.sf.briar.api.ContactId; +import net.sf.briar.api.TransportProperties; import net.sf.briar.api.plugins.PluginExecutor; import net.sf.briar.api.plugins.StreamPlugin; import net.sf.briar.api.plugins.StreamPluginCallback; @@ -54,7 +56,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { ServerSocket ss = null; try { synchronized(this) { - if(!started) return; + if(!running) return; addr = getLocalSocketAddress(); ss = createServerSocket(); if(addr == null || ss == null) return; @@ -77,7 +79,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { return; } synchronized(this) { - if(!started) { + if(!running) { try { ss.close(); } catch(IOException e) { @@ -93,7 +95,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { while(true) { Socket s; synchronized(this) { - if(!started) return; + if(!running) return; } try { s = ss.accept(); @@ -119,21 +121,19 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { if(socket != null) socket.close(); } - public synchronized void poll() { - // Subclasses may not support polling - if(!shouldPoll()) throw new UnsupportedOperationException(); - if(!started) return; - for(ContactId c : callback.getRemoteProperties().keySet()) { - pluginExecutor.execute(createConnector(c)); + public void poll() { + Map<ContactId, TransportProperties> remote; + synchronized(this) { + if(!running) return; + remote = callback.getRemoteProperties(); + } + for(final ContactId c : remote.keySet()) { + pluginExecutor.execute(new Runnable() { + public void run() { + connectAndCallBack(c); + } + }); } - } - - private Runnable createConnector(final ContactId c) { - return new Runnable() { - public void run() { - connectAndCallBack(c); - } - }; } private void connectAndCallBack(ContactId c) { @@ -146,7 +146,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { Socket s; try { synchronized(this) { - if(!started) return null; + if(!running) return null; addr = getRemoteSocketAddress(c); s = createClientSocket(); if(addr == null || s == null) return null; diff --git a/components/net/sf/briar/plugins/socket/SocketTransportConnection.java b/components/net/sf/briar/plugins/socket/SocketTransportConnection.java index f855f33100..2890ec63f8 100644 --- a/components/net/sf/briar/plugins/socket/SocketTransportConnection.java +++ b/components/net/sf/briar/plugins/socket/SocketTransportConnection.java @@ -28,6 +28,10 @@ class SocketTransportConnection implements StreamTransportConnection { return socket.getOutputStream(); } + public boolean shouldFlush() { + return true; + } + public void dispose(boolean exception, boolean recognised) { try { socket.close(); diff --git a/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java b/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java index 1327769f3a..a2d40759e4 100644 --- a/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java +++ b/components/net/sf/briar/protocol/ProtocolWriterFactoryImpl.java @@ -21,7 +21,8 @@ class ProtocolWriterFactoryImpl implements ProtocolWriterFactory { this.writerFactory = writerFactory; } - public ProtocolWriter createProtocolWriter(OutputStream out) { - return new ProtocolWriterImpl(serial, writerFactory, out); + public ProtocolWriter createProtocolWriter(OutputStream out, + boolean flush) { + return new ProtocolWriterImpl(serial, writerFactory, out, flush); } } diff --git a/components/net/sf/briar/protocol/ProtocolWriterImpl.java b/components/net/sf/briar/protocol/ProtocolWriterImpl.java index 4dc1d44ece..ba8aebfaa8 100644 --- a/components/net/sf/briar/protocol/ProtocolWriterImpl.java +++ b/components/net/sf/briar/protocol/ProtocolWriterImpl.java @@ -27,12 +27,14 @@ class ProtocolWriterImpl implements ProtocolWriter { private final SerialComponent serial; private final OutputStream out; + private final boolean flush; private final Writer w; ProtocolWriterImpl(SerialComponent serial, WriterFactory writerFactory, - OutputStream out) { + OutputStream out, boolean flush) { this.serial = serial; this.out = out; + this.flush = flush; w = writerFactory.createWriter(out); } @@ -67,6 +69,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(BatchId b : a.getBatchIds()) w.writeBytes(b.getBytes()); w.writeListEnd(); + if(flush) out.flush(); } public void writeBatch(RawBatch b) throws IOException { @@ -74,6 +77,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(byte[] raw : b.getMessages()) out.write(raw); w.writeListEnd(); + if(flush) out.flush(); } public void writeOffer(Offer o) throws IOException { @@ -81,6 +85,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeListStart(); for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes()); w.writeListEnd(); + if(flush) out.flush(); } public void writeRequest(Request r) throws IOException { @@ -100,6 +105,7 @@ class ProtocolWriterImpl implements ProtocolWriter { w.writeStructId(Types.REQUEST); w.writeUint7((byte) (bytes * 8 - length)); w.writeBytes(bitmap); + if(flush) out.flush(); } public void writeSubscriptionUpdate(SubscriptionUpdate s) @@ -112,6 +118,7 @@ class ProtocolWriterImpl implements ProtocolWriter { } w.writeMapEnd(); w.writeInt64(s.getTimestamp()); + if(flush) out.flush(); } private void writeGroup(Writer w, Group g) throws IOException { @@ -133,5 +140,10 @@ class ProtocolWriterImpl implements ProtocolWriter { } w.writeListEnd(); w.writeInt64(t.getTimestamp()); + if(flush) out.flush(); + } + + public void flush() throws IOException { + out.flush(); } } diff --git a/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java b/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java index 1e78871fb8..720b0d3235 100644 --- a/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java +++ b/components/net/sf/briar/protocol/batch/OutgoingBatchConnection.java @@ -55,7 +55,8 @@ class OutgoingBatchConnection { transport.getOutputStream(), transport.getCapacity(), ctx.getSecret()); OutputStream out = conn.getOutputStream(); - ProtocolWriter writer = protoFactory.createProtocolWriter(out); + ProtocolWriter writer = protoFactory.createProtocolWriter(out, + transport.shouldFlush()); // There should be enough space for a packet long capacity = conn.getRemainingCapacity(); if(capacity < MAX_PACKET_LENGTH) throw new EOFException(); @@ -88,8 +89,7 @@ class OutgoingBatchConnection { capacity = writer.getMessageCapacityForBatch(capacity); b = db.generateBatch(contactId, (int) capacity); } - // Flush the output stream - out.flush(); + writer.flush(); transport.dispose(false); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); diff --git a/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java b/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java index e7229ab7ac..846ac80de2 100644 --- a/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/protocol/stream/IncomingStreamConnection.java @@ -26,11 +26,11 @@ class IncomingStreamConnection extends StreamConnection { ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, - ConnectionContext ctx, StreamTransportConnection connection, + ConnectionContext ctx, StreamTransportConnection transport, byte[] tag) { super(dbExecutor, verificationExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, - ctx.getContactId(), connection); + ctx.getContactId(), transport); this.ctx = ctx; this.tag = tag; } diff --git a/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java b/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java index c27724f142..68c33157ad 100644 --- a/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/protocol/stream/OutgoingStreamConnection.java @@ -31,10 +31,10 @@ class OutgoingStreamConnection extends StreamConnection { ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, TransportIndex transportIndex, - StreamTransportConnection connection) { + StreamTransportConnection transport) { super(dbExecutor, verificationExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, - contactId, connection); + contactId, transport); this.transportIndex = transportIndex; } diff --git a/components/net/sf/briar/protocol/stream/StreamConnection.java b/components/net/sf/briar/protocol/stream/StreamConnection.java index 22486cdc06..7d378db49f 100644 --- a/components/net/sf/briar/protocol/stream/StreamConnection.java +++ b/components/net/sf/briar/protocol/stream/StreamConnection.java @@ -191,7 +191,8 @@ abstract class StreamConnection implements DatabaseListener { try { db.addListener(this); OutputStream out = createConnectionWriter().getOutputStream(); - writer = protoWriterFactory.createProtocolWriter(out); + writer = protoWriterFactory.createProtocolWriter(out, + transport.shouldFlush()); // Send the initial packets: transports, subs, acks, offer dbExecutor.execute(new GenerateTransportUpdate()); dbExecutor.execute(new GenerateSubscriptionUpdate()); @@ -203,6 +204,7 @@ abstract class StreamConnection implements DatabaseListener { if(task == CLOSE) break; task.run(); } + writer.flush(); if(!disposed.getAndSet(true)) transport.dispose(false, true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); diff --git a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java index b78c2a55c5..8a71392856 100644 --- a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java +++ b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java @@ -77,53 +77,58 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher { private class DispatchBatchConnection implements Runnable { - private final TransportId t; - private final BatchTransportReader r; + private final TransportId transportId; + private final BatchTransportReader transport; - private DispatchBatchConnection(TransportId t, BatchTransportReader r) { - this.t = t; - this.r = r; + private DispatchBatchConnection(TransportId transportId, + BatchTransportReader transport) { + this.transportId = transportId; + this.transport = transport; } public void run() { try { - byte[] tag = readTag(r.getInputStream()); - ConnectionContext ctx = recogniser.acceptConnection(t, tag); - if(ctx == null) r.dispose(false, false); - else batchConnFactory.createIncomingConnection(ctx, r, tag); + byte[] tag = readTag(transport.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(transportId, + tag); + if(ctx == null) transport.dispose(false, false); + else batchConnFactory.createIncomingConnection(ctx, transport, + tag); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - r.dispose(true, false); + transport.dispose(true, false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - r.dispose(true, false); + transport.dispose(true, false); } } } private class DispatchStreamConnection implements Runnable { - private final TransportId t; - private final StreamTransportConnection s; + private final TransportId transportId; + private final StreamTransportConnection transport; - private DispatchStreamConnection(TransportId t, - StreamTransportConnection s) { - this.t = t; - this.s = s; + private DispatchStreamConnection(TransportId transportId, + StreamTransportConnection transport) { + this.transportId = transportId; + this.transport = transport; } public void run() { try { - byte[] tag = readTag(s.getInputStream()); - ConnectionContext ctx = recogniser.acceptConnection(t, tag); - if(ctx == null) s.dispose(false, false); - else streamConnFactory.createIncomingConnection(ctx, s, tag); + byte[] tag = readTag(transport.getInputStream()); + ConnectionContext ctx = recogniser.acceptConnection(transportId, + tag); + if(ctx == null) transport.dispose(false, false); + else streamConnFactory.createIncomingConnection(ctx, transport, + tag); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - s.dispose(true, false); + transport.dispose(true, false); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString()); - s.dispose(true, false); + transport.dispose(true, false); } } } diff --git a/test/net/sf/briar/ProtocolIntegrationTest.java b/test/net/sf/briar/ProtocolIntegrationTest.java index 6e45595385..43590eec30 100644 --- a/test/net/sf/briar/ProtocolIntegrationTest.java +++ b/test/net/sf/briar/ProtocolIntegrationTest.java @@ -139,10 +139,11 @@ public class ProtocolIntegrationTest extends TestCase { ConnectionWriter conn = connectionWriterFactory.createConnectionWriter( out, Long.MAX_VALUE, secret.clone()); OutputStream out1 = conn.getOutputStream(); - ProtocolWriter proto = protocolWriterFactory.createProtocolWriter(out1); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out1, + false); Ack a = packetFactory.createAck(Collections.singletonList(ack)); - proto.writeAck(a); + writer.writeAck(a); Collection<byte[]> batch = Arrays.asList(new byte[][] { message.getSerialised(), @@ -151,7 +152,7 @@ public class ProtocolIntegrationTest extends TestCase { message3.getSerialised() }); RawBatch b = packetFactory.createBatch(batch); - proto.writeBatch(b); + writer.writeBatch(b); Collection<MessageId> offer = Arrays.asList(new MessageId[] { message.getId(), @@ -160,13 +161,13 @@ public class ProtocolIntegrationTest extends TestCase { message3.getId() }); Offer o = packetFactory.createOffer(offer); - proto.writeOffer(o); + writer.writeOffer(o); BitSet requested = new BitSet(4); requested.set(1); requested.set(3); Request r = packetFactory.createRequest(requested, 4); - proto.writeRequest(r); + writer.writeRequest(r); // Use a LinkedHashMap for predictable iteration order Map<Group, Long> subs = new LinkedHashMap<Group, Long>(); @@ -174,13 +175,13 @@ public class ProtocolIntegrationTest extends TestCase { subs.put(group1, 0L); SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs, timestamp); - proto.writeSubscriptionUpdate(s); + writer.writeSubscriptionUpdate(s); TransportUpdate t = packetFactory.createTransportUpdate(transports, timestamp); - proto.writeTransportUpdate(t); + writer.writeTransportUpdate(t); - out1.flush(); + writer.flush(); return out.toByteArray(); } @@ -188,20 +189,19 @@ public class ProtocolIntegrationTest extends TestCase { InputStream in = new ByteArrayInputStream(connectionData); byte[] tag = new byte[TAG_LENGTH]; assertEquals(TAG_LENGTH, in.read(tag, 0, TAG_LENGTH)); - ConnectionReader r = connectionReaderFactory.createConnectionReader(in, - secret.clone(), tag); - in = r.getInputStream(); - ProtocolReader protocolReader = - protocolReaderFactory.createProtocolReader(in); + ConnectionReader conn = connectionReaderFactory.createConnectionReader( + in, secret.clone(), tag); + InputStream in1 = conn.getInputStream(); + ProtocolReader reader = protocolReaderFactory.createProtocolReader(in1); // Read the ack - assertTrue(protocolReader.hasAck()); - Ack a = protocolReader.readAck(); + assertTrue(reader.hasAck()); + Ack a = reader.readAck(); assertEquals(Collections.singletonList(ack), a.getBatchIds()); // Read and verify the batch - assertTrue(protocolReader.hasBatch()); - Batch b = protocolReader.readBatch().verify(); + assertTrue(reader.hasBatch()); + Batch b = reader.readBatch().verify(); Collection<Message> messages = b.getMessages(); assertEquals(4, messages.size()); Iterator<Message> it = messages.iterator(); @@ -211,8 +211,8 @@ public class ProtocolIntegrationTest extends TestCase { checkMessageEquality(message3, it.next()); // Read the offer - assertTrue(protocolReader.hasOffer()); - Offer o = protocolReader.readOffer(); + assertTrue(reader.hasOffer()); + Offer o = reader.readOffer(); Collection<MessageId> offered = o.getMessageIds(); assertEquals(4, offered.size()); Iterator<MessageId> it1 = offered.iterator(); @@ -222,8 +222,8 @@ public class ProtocolIntegrationTest extends TestCase { assertEquals(message3.getId(), it1.next()); // Read the request - assertTrue(protocolReader.hasRequest()); - Request req = protocolReader.readRequest(); + assertTrue(reader.hasRequest()); + Request req = reader.readRequest(); BitSet requested = req.getBitmap(); assertFalse(requested.get(0)); assertTrue(requested.get(1)); @@ -233,8 +233,8 @@ public class ProtocolIntegrationTest extends TestCase { assertEquals(2, requested.cardinality()); // Read the subscription update - assertTrue(protocolReader.hasSubscriptionUpdate()); - SubscriptionUpdate s = protocolReader.readSubscriptionUpdate(); + assertTrue(reader.hasSubscriptionUpdate()); + SubscriptionUpdate s = reader.readSubscriptionUpdate(); Map<Group, Long> subs = s.getSubscriptions(); assertEquals(2, subs.size()); assertEquals(Long.valueOf(0L), subs.get(group)); @@ -242,8 +242,8 @@ public class ProtocolIntegrationTest extends TestCase { assertTrue(s.getTimestamp() == timestamp); // Read the transport update - assertTrue(protocolReader.hasTransportUpdate()); - TransportUpdate t = protocolReader.readTransportUpdate(); + assertTrue(reader.hasTransportUpdate()); + TransportUpdate t = reader.readTransportUpdate(); assertEquals(transports, t.getTransports()); assertTrue(t.getTimestamp() == timestamp); diff --git a/test/net/sf/briar/protocol/ConstantsTest.java b/test/net/sf/briar/protocol/ConstantsTest.java index 14b5e3ca56..2d02098f07 100644 --- a/test/net/sf/briar/protocol/ConstantsTest.java +++ b/test/net/sf/briar/protocol/ConstantsTest.java @@ -84,7 +84,8 @@ public class ConstantsTest extends TestCase { private void testBatchesFitIntoAck(int length) throws Exception { // Create an ack with as many batch IDs as possible ByteArrayOutputStream out = new ByteArrayOutputStream(length); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); int maxBatches = writer.getMaxBatchesForAck(length); Collection<BatchId> acked = new ArrayList<BatchId>(); for(int i = 0; i < maxBatches; i++) { @@ -116,7 +117,8 @@ public class ConstantsTest extends TestCase { // Add the message to a batch ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); RawBatch b = packetFactory.createBatch(Collections.singletonList( message.getSerialised())); writer.writeBatch(b); @@ -140,7 +142,8 @@ public class ConstantsTest extends TestCase { private void testMessagesFitIntoOffer(int length) throws Exception { // Create an offer with as many message IDs as possible ByteArrayOutputStream out = new ByteArrayOutputStream(length); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); int maxMessages = writer.getMaxMessagesForOffer(length); Collection<MessageId> offered = new ArrayList<MessageId>(); for(int i = 0; i < maxMessages; i++) { @@ -165,7 +168,8 @@ public class ConstantsTest extends TestCase { // Add the subscriptions to an update ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); SubscriptionUpdate s = packetFactory.createSubscriptionUpdate(subs, Long.MAX_VALUE); writer.writeSubscriptionUpdate(s); @@ -194,7 +198,8 @@ public class ConstantsTest extends TestCase { // Add the transports to an update ByteArrayOutputStream out = new ByteArrayOutputStream(MAX_PACKET_LENGTH); - ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out); + ProtocolWriter writer = protocolWriterFactory.createProtocolWriter(out, + true); TransportUpdate t = packetFactory.createTransportUpdate(transports, Long.MAX_VALUE); writer.writeTransportUpdate(t); diff --git a/test/net/sf/briar/protocol/ProtocolReadWriteTest.java b/test/net/sf/briar/protocol/ProtocolReadWriteTest.java index 880e45438e..d74d910814 100644 --- a/test/net/sf/briar/protocol/ProtocolReadWriteTest.java +++ b/test/net/sf/briar/protocol/ProtocolReadWriteTest.java @@ -80,7 +80,7 @@ public class ProtocolReadWriteTest extends TestCase { public void testWriteAndRead() throws Exception { // Write ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter writer = writerFactory.createProtocolWriter(out); + ProtocolWriter writer = writerFactory.createProtocolWriter(out, true); Ack a = packetFactory.createAck(Collections.singletonList(batchId)); writer.writeAck(a); diff --git a/test/net/sf/briar/protocol/ProtocolWriterImplTest.java b/test/net/sf/briar/protocol/ProtocolWriterImplTest.java index c558bda32e..fa1139e859 100644 --- a/test/net/sf/briar/protocol/ProtocolWriterImplTest.java +++ b/test/net/sf/briar/protocol/ProtocolWriterImplTest.java @@ -37,7 +37,8 @@ public class ProtocolWriterImplTest extends TestCase { @Test public void testWriteBitmapNoPadding() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out); + ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out, + true); BitSet b = new BitSet(); // 11011001 = 0xD9 b.set(0); @@ -61,7 +62,8 @@ public class ProtocolWriterImplTest extends TestCase { @Test public void testWriteBitmapWithPadding() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out); + ProtocolWriter w = new ProtocolWriterImpl(serial, writerFactory, out, + true); BitSet b = new BitSet(); // 01011001 = 0x59 b.set(1); diff --git a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java index 2df1d1035b..d2916e7bda 100644 --- a/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java +++ b/test/net/sf/briar/protocol/batch/BatchConnectionReadWriteTest.java @@ -112,7 +112,7 @@ public class BatchConnectionReadWriteTest extends TestCase { ProtocolWriterFactory protoFactory = alice.getInstance(ProtocolWriterFactory.class); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - Long.MAX_VALUE); + Long.MAX_VALUE, false); OutgoingBatchConnection batchOut = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); diff --git a/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java b/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java index 0e8759eaf0..7fbf325b66 100644 --- a/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java +++ b/test/net/sf/briar/protocol/batch/OutgoingBatchConnectionTest.java @@ -73,7 +73,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testConnectionTooShort() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - ProtocolConstants.MAX_PACKET_LENGTH); + ProtocolConstants.MAX_PACKET_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); @@ -97,7 +97,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testNothingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - TransportConstants.MIN_CONNECTION_LENGTH); + TransportConstants.MIN_CONNECTION_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); @@ -133,7 +133,7 @@ public class OutgoingBatchConnectionTest extends TestCase { public void testSomethingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); TestBatchTransportWriter transport = new TestBatchTransportWriter(out, - TransportConstants.MIN_CONNECTION_LENGTH); + TransportConstants.MIN_CONNECTION_LENGTH, true); OutgoingBatchConnection connection = new OutgoingBatchConnection(db, connFactory, protoFactory, contactId, transportIndex, transport); diff --git a/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java b/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java index 80bafcb4f0..c3afcd2906 100644 --- a/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java +++ b/test/net/sf/briar/protocol/batch/TestBatchTransportWriter.java @@ -9,12 +9,15 @@ class TestBatchTransportWriter implements BatchTransportWriter { private final ByteArrayOutputStream out; private final long capacity; + private final boolean flush; private boolean disposed = false, exception = false; - TestBatchTransportWriter(ByteArrayOutputStream out, long capacity) { + TestBatchTransportWriter(ByteArrayOutputStream out, long capacity, + boolean flush) { this.out = out; this.capacity = capacity; + this.flush = flush; } public long getCapacity() { @@ -25,6 +28,10 @@ class TestBatchTransportWriter implements BatchTransportWriter { return out; } + public boolean shouldFlush() { + return flush; + } + public void dispose(boolean exception) { assert !disposed; disposed = true; -- GitLab