From 0d11553134178e74dae1244234b334cf71dd6bd7 Mon Sep 17 00:00:00 2001
From: akwizgran <akwizgran@users.sourceforge.net>
Date: Fri, 14 Oct 2011 21:07:49 +0100
Subject: [PATCH] StreamConnectionFactory and ConnectionDispatcherImpl
 (untested).

---
 .../transport/ConnectionReaderFactory.java    |   5 +
 .../transport/ConnectionWriterFactory.java    |   8 +-
 .../batch/BatchConnectionFactory.java         |   6 +-
 .../stream/StreamConnectionFactory.java       |  14 ++
 .../transport/ConnectionDecrypterImpl.java    |  10 ++
 .../transport/ConnectionDispatcherImpl.java   | 142 ++++++++++++++++++
 .../ConnectionReaderFactoryImpl.java          |  15 ++
 .../ConnectionWriterFactoryImpl.java          |  26 ++++
 .../net/sf/briar/transport/IvEncoder.java     |  15 ++
 .../batch/BatchConnectionFactoryImpl.java     |  12 +-
 .../stream/IncomingStreamConnection.java      |  46 ++++++
 .../stream/OutgoingStreamConnection.java      |  54 +++++++
 .../transport/stream/StreamConnection.java    |  51 ++++---
 .../stream/StreamConnectionFactoryImpl.java   |  73 +++++++++
 14 files changed, 441 insertions(+), 36 deletions(-)
 create mode 100644 api/net/sf/briar/api/transport/stream/StreamConnectionFactory.java
 create mode 100644 components/net/sf/briar/transport/ConnectionDispatcherImpl.java
 create mode 100644 components/net/sf/briar/transport/stream/IncomingStreamConnection.java
 create mode 100644 components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
 create mode 100644 components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java

diff --git a/api/net/sf/briar/api/transport/ConnectionReaderFactory.java b/api/net/sf/briar/api/transport/ConnectionReaderFactory.java
index 9c09ac2342..084387eb02 100644
--- a/api/net/sf/briar/api/transport/ConnectionReaderFactory.java
+++ b/api/net/sf/briar/api/transport/ConnectionReaderFactory.java
@@ -2,8 +2,13 @@ package net.sf.briar.api.transport;
 
 import java.io.InputStream;
 
+import net.sf.briar.api.TransportId;
+
 public interface ConnectionReaderFactory {
 
 	ConnectionReader createConnectionReader(InputStream in, byte[] encryptedIv,
 			byte[] secret);
+
+	ConnectionReader createConnectionReader(InputStream in, boolean initiator,
+			TransportId t, long connection, byte[] secret);
 }
diff --git a/api/net/sf/briar/api/transport/ConnectionWriterFactory.java b/api/net/sf/briar/api/transport/ConnectionWriterFactory.java
index a543e2df77..d61b16658e 100644
--- a/api/net/sf/briar/api/transport/ConnectionWriterFactory.java
+++ b/api/net/sf/briar/api/transport/ConnectionWriterFactory.java
@@ -6,7 +6,9 @@ import net.sf.briar.api.TransportId;
 
 public interface ConnectionWriterFactory {
 
-	ConnectionWriter createConnectionWriter(OutputStream out, 
-			long capacity, boolean initiator, TransportId t, long connection,
-			byte[] secret);
+	ConnectionWriter createConnectionWriter(OutputStream out, long capacity,
+			boolean initiator, TransportId t, long connection, byte[] secret);
+
+	ConnectionWriter createConnectionWriter(OutputStream out, long capacity,
+			byte[] encryptedIv, byte[] secret);
 }
diff --git a/api/net/sf/briar/api/transport/batch/BatchConnectionFactory.java b/api/net/sf/briar/api/transport/batch/BatchConnectionFactory.java
index 28148f94bf..590ffc25de 100644
--- a/api/net/sf/briar/api/transport/batch/BatchConnectionFactory.java
+++ b/api/net/sf/briar/api/transport/batch/BatchConnectionFactory.java
@@ -7,9 +7,9 @@ import net.sf.briar.api.transport.BatchTransportWriter;
 
 public interface BatchConnectionFactory {
 
-	Runnable createOutgoingConnection(TransportId t, ContactId c,
-			BatchTransportWriter w);
-
 	Runnable createIncomingConnection(ContactId c, BatchTransportReader r,
 			byte[] encryptedIv);
+
+	Runnable createOutgoingConnection(TransportId t, ContactId c,
+			BatchTransportWriter w);
 }
diff --git a/api/net/sf/briar/api/transport/stream/StreamConnectionFactory.java b/api/net/sf/briar/api/transport/stream/StreamConnectionFactory.java
new file mode 100644
index 0000000000..e255c4acc0
--- /dev/null
+++ b/api/net/sf/briar/api/transport/stream/StreamConnectionFactory.java
@@ -0,0 +1,14 @@
+package net.sf.briar.api.transport.stream;
+
+import net.sf.briar.api.ContactId;
+import net.sf.briar.api.TransportId;
+import net.sf.briar.api.transport.StreamTransportConnection;
+
+public interface StreamConnectionFactory {
+
+	Runnable[] createIncomingConnection(ContactId c,
+			StreamTransportConnection s, byte[] encryptedIv);
+
+	Runnable[] createOutgoingConnection(TransportId t, ContactId c,
+			StreamTransportConnection s);
+}
diff --git a/components/net/sf/briar/transport/ConnectionDecrypterImpl.java b/components/net/sf/briar/transport/ConnectionDecrypterImpl.java
index dc48931a4d..6510538a7f 100644
--- a/components/net/sf/briar/transport/ConnectionDecrypterImpl.java
+++ b/components/net/sf/briar/transport/ConnectionDecrypterImpl.java
@@ -48,6 +48,16 @@ implements ConnectionDecrypter {
 		buf = new byte[IV_LENGTH];
 	}
 
+	ConnectionDecrypterImpl(InputStream in, byte[] iv, Cipher frameCipher,
+			SecretKey frameKey) {
+		super(in);
+		if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
+		this.iv = iv;
+		this.frameCipher = frameCipher;
+		this.frameKey = frameKey;
+		buf = new byte[IV_LENGTH];
+	}
+
 	public InputStream getInputStream() {
 		return this;
 	}
diff --git a/components/net/sf/briar/transport/ConnectionDispatcherImpl.java b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java
new file mode 100644
index 0000000000..1f1185bb9c
--- /dev/null
+++ b/components/net/sf/briar/transport/ConnectionDispatcherImpl.java
@@ -0,0 +1,142 @@
+package net.sf.briar.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.sf.briar.api.ContactId;
+import net.sf.briar.api.TransportId;
+import net.sf.briar.api.db.DbException;
+import net.sf.briar.api.transport.BatchTransportReader;
+import net.sf.briar.api.transport.BatchTransportWriter;
+import net.sf.briar.api.transport.ConnectionDispatcher;
+import net.sf.briar.api.transport.ConnectionRecogniser;
+import net.sf.briar.api.transport.ConnectionRecogniserFactory;
+import net.sf.briar.api.transport.StreamTransportConnection;
+import net.sf.briar.api.transport.TransportConstants;
+import net.sf.briar.api.transport.batch.BatchConnectionFactory;
+import net.sf.briar.api.transport.stream.StreamConnectionFactory;
+
+public class ConnectionDispatcherImpl implements ConnectionDispatcher {
+
+	private static final Logger LOG =
+		Logger.getLogger(ConnectionDispatcherImpl.class.getName());
+
+	private final Executor executor;
+	private final ConnectionRecogniserFactory recFactory;
+	private final BatchConnectionFactory batchConnFactory;
+	private final StreamConnectionFactory streamConnFactory;
+	private final Map<TransportId, ConnectionRecogniser> recognisers;
+
+	ConnectionDispatcherImpl(Executor executor,
+			ConnectionRecogniserFactory recFactory,
+			BatchConnectionFactory batchConnFactory,
+			StreamConnectionFactory streamConnFactory) {
+		this.executor = executor;
+		this.recFactory = recFactory;
+		this.batchConnFactory = batchConnFactory;
+		this.streamConnFactory = streamConnFactory;
+		recognisers = new HashMap<TransportId, ConnectionRecogniser>();
+	}
+
+	public void dispatchReader(TransportId t, BatchTransportReader r) {
+		// Read the encrypted IV
+		byte[] encryptedIv;
+		try {
+			encryptedIv = readIv(r.getInputStream());
+		} catch(IOException e) {
+			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			r.dispose(false);
+			return;
+		}
+		// Get the contact ID, or null if the IV wasn't expected
+		ContactId c;
+		try {
+			ConnectionRecogniser rec = getRecogniser(t);
+			c = rec.acceptConnection(encryptedIv);
+		} catch(DbException e) {
+			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			r.dispose(false);
+			return;
+		}
+		if(c == null) {
+			r.dispose(false);
+			return;
+		}
+		// Pass the connection to the executor and return
+		executor.execute(batchConnFactory.createIncomingConnection(c, r,
+				encryptedIv));
+	}
+
+	private byte[] readIv(InputStream in) throws IOException {
+		byte[] b = new byte[TransportConstants.IV_LENGTH];
+		int offset = 0;
+		while(offset < b.length) {
+			int read = in.read(b, offset, b.length - offset);
+			if(read == -1) throw new IOException();
+			offset += read;
+		}
+		return b;
+	}
+
+	private ConnectionRecogniser getRecogniser(TransportId t) {
+		synchronized(recognisers) {
+			ConnectionRecogniser rec = recognisers.get(t);
+			if(rec == null) {
+				rec = recFactory.createConnectionRecogniser(t);
+				recognisers.put(t, rec);
+			}
+			return rec;
+		}
+	}
+
+	public void dispatchWriter(TransportId t, ContactId c,
+			BatchTransportWriter w) {
+		executor.execute(batchConnFactory.createOutgoingConnection(t, c, w));
+	}
+
+	public void dispatchIncomingConnection(TransportId t,
+			StreamTransportConnection s) {
+		// Read the encrypted IV
+		byte[] encryptedIv;
+		try {
+			encryptedIv = readIv(s.getInputStream());
+		} catch(IOException e) {
+			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			s.dispose(false);
+			return;
+		}
+		// Get the contact ID, or null if the IV wasn't expected
+		ContactId c;
+		try {
+			ConnectionRecogniser rec = getRecogniser(t);
+			c = rec.acceptConnection(encryptedIv);
+		} catch(DbException e) {
+			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			s.dispose(false);
+			return;
+		}
+		if(c == null) {
+			s.dispose(false);
+			return;
+		}
+		// Pass the connection to the executor and return
+		Runnable[] r = streamConnFactory.createIncomingConnection(c, s,
+				encryptedIv);
+		assert r.length == 2;
+		executor.execute(r[0]); // Write
+		executor.execute(r[1]); // Read
+	}
+
+	public void dispatchOutgoingConnection(TransportId t, ContactId c,
+			StreamTransportConnection s) {
+		Runnable[] r = streamConnFactory.createOutgoingConnection(t, c, s);
+		assert r.length == 2;
+		executor.execute(r[0]); // Write
+		executor.execute(r[1]); // Read
+	}
+}
diff --git a/components/net/sf/briar/transport/ConnectionReaderFactoryImpl.java b/components/net/sf/briar/transport/ConnectionReaderFactoryImpl.java
index df592f7602..23c5354a98 100644
--- a/components/net/sf/briar/transport/ConnectionReaderFactoryImpl.java
+++ b/components/net/sf/briar/transport/ConnectionReaderFactoryImpl.java
@@ -6,6 +6,7 @@ import javax.crypto.Cipher;
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
 
+import net.sf.briar.api.TransportId;
 import net.sf.briar.api.crypto.CryptoComponent;
 import net.sf.briar.api.transport.ConnectionReader;
 import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -35,4 +36,18 @@ class ConnectionReaderFactoryImpl implements ConnectionReaderFactory {
 		SecretKey macKey = crypto.deriveIncomingMacKey(secret);
 		return new ConnectionReaderImpl(decrypter, mac, macKey);
 	}
+
+	public ConnectionReader createConnectionReader(InputStream in,
+			boolean initiator, TransportId t, long connection, byte[] secret) {
+		byte[] iv = IvEncoder.encodeIv(initiator, t, connection);
+		// Create the decrypter
+		Cipher frameCipher = crypto.getFrameCipher();
+		SecretKey frameKey = crypto.deriveIncomingFrameKey(secret);
+		ConnectionDecrypter decrypter = new ConnectionDecrypterImpl(in, iv,
+				frameCipher, frameKey);
+		// Create the reader
+		Mac mac = crypto.getMac();
+		SecretKey macKey = crypto.deriveIncomingMacKey(secret);
+		return new ConnectionReaderImpl(decrypter, mac, macKey);
+	}
 }
diff --git a/components/net/sf/briar/transport/ConnectionWriterFactoryImpl.java b/components/net/sf/briar/transport/ConnectionWriterFactoryImpl.java
index ab3b3e3de8..883d129a3c 100644
--- a/components/net/sf/briar/transport/ConnectionWriterFactoryImpl.java
+++ b/components/net/sf/briar/transport/ConnectionWriterFactoryImpl.java
@@ -1,8 +1,11 @@
 package net.sf.briar.transport;
 
 import java.io.OutputStream;
+import java.security.InvalidKeyException;
 
+import javax.crypto.BadPaddingException;
 import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
 
@@ -38,4 +41,27 @@ class ConnectionWriterFactoryImpl implements ConnectionWriterFactory {
 		SecretKey macKey = crypto.deriveOutgoingMacKey(secret);
 		return new ConnectionWriterImpl(encrypter, mac, macKey);
 	}
+
+	public ConnectionWriter createConnectionWriter(OutputStream out,
+			long capacity, byte[] encryptedIv, byte[] secret) {
+		// Decrypt the IV
+		Cipher ivCipher = crypto.getIvCipher();
+		SecretKey ivKey = crypto.deriveIncomingIvKey(secret);
+		byte[] iv;
+		try {
+			ivCipher.init(Cipher.DECRYPT_MODE, ivKey);
+			iv = ivCipher.doFinal(encryptedIv);
+		} catch(BadPaddingException badCipher) {
+			throw new RuntimeException(badCipher);
+		} catch(IllegalBlockSizeException badCipher) {
+			throw new RuntimeException(badCipher);
+		} catch(InvalidKeyException badKey) {
+			throw new RuntimeException(badKey);
+		}
+		boolean initiator = IvEncoder.getInitiatorFlag(iv);
+		TransportId t = new TransportId(IvEncoder.getTransportId(iv));
+		long connection = IvEncoder.getConnectionNumber(iv);
+		return createConnectionWriter(out, capacity, initiator, t, connection,
+				secret);
+	}
 }
diff --git a/components/net/sf/briar/transport/IvEncoder.java b/components/net/sf/briar/transport/IvEncoder.java
index 48276760ca..eb18824014 100644
--- a/components/net/sf/briar/transport/IvEncoder.java
+++ b/components/net/sf/briar/transport/IvEncoder.java
@@ -23,4 +23,19 @@ class IvEncoder {
 		// Encode the frame number as an unsigned 32-bit integer
 		ByteUtils.writeUint32(frame, iv, 10);
 	}
+
+	static boolean getInitiatorFlag(byte[] iv) {
+		if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
+		return (iv[3] & 1) == 1;
+	}
+
+	static int getTransportId(byte[] iv) {
+		if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
+		return ByteUtils.readUint16(iv, 4);
+	}
+
+	static long getConnectionNumber(byte[] iv) {
+		if(iv.length != IV_LENGTH) throw new IllegalArgumentException();
+		return ByteUtils.readUint32(iv, 6);
+	}
 }
diff --git a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
index 598e83e2d6..bc741c2f6e 100644
--- a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
+++ b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
@@ -33,15 +33,15 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
 		this.protoWriterFactory = protoWriterFactory;
 	}
 
-	public Runnable createOutgoingConnection(TransportId t, ContactId c,
-			BatchTransportWriter w) {
-		return new OutgoingBatchConnection(connWriterFactory, db,
-				protoWriterFactory, t, c, w);
-	}
-
 	public Runnable createIncomingConnection(ContactId c,
 			BatchTransportReader r, byte[] encryptedIv) {
 		return new IncomingBatchConnection(connReaderFactory, db,
 				protoReaderFactory, c, r, encryptedIv);
 	}
+
+	public Runnable createOutgoingConnection(TransportId t, ContactId c,
+			BatchTransportWriter w) {
+		return new OutgoingBatchConnection(connWriterFactory, db,
+				protoWriterFactory, t, c, w);
+	}
 }
diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java
new file mode 100644
index 0000000000..a8b4575728
--- /dev/null
+++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java
@@ -0,0 +1,46 @@
+package net.sf.briar.transport.stream;
+
+import java.io.IOException;
+
+import net.sf.briar.api.ContactId;
+import net.sf.briar.api.db.DatabaseComponent;
+import net.sf.briar.api.db.DbException;
+import net.sf.briar.api.protocol.ProtocolReaderFactory;
+import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
+import net.sf.briar.api.transport.ConnectionReader;
+import net.sf.briar.api.transport.ConnectionReaderFactory;
+import net.sf.briar.api.transport.ConnectionWriter;
+import net.sf.briar.api.transport.ConnectionWriterFactory;
+import net.sf.briar.api.transport.StreamTransportConnection;
+
+public class IncomingStreamConnection extends StreamConnection {
+
+	private final byte[] encryptedIv;
+
+	IncomingStreamConnection(ConnectionReaderFactory connReaderFactory,
+			ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
+			ProtocolReaderFactory protoReaderFactory,
+			ProtocolWriterFactory protoWriterFactory, ContactId contactId,
+			StreamTransportConnection connection, byte[] encryptedIv) {
+		super(connReaderFactory, connWriterFactory, db, protoReaderFactory,
+				protoWriterFactory, contactId, connection);
+		this.encryptedIv = encryptedIv;
+	}
+
+	@Override
+	protected ConnectionReader createConnectionReader() throws DbException,
+	IOException {
+		byte[] secret = db.getSharedSecret(contactId);
+		return connReaderFactory.createConnectionReader(
+				connection.getInputStream(), encryptedIv, secret);
+	}
+
+	@Override
+	protected ConnectionWriter createConnectionWriter() throws DbException,
+	IOException {
+		byte[] secret = db.getSharedSecret(contactId);
+		return connWriterFactory.createConnectionWriter(
+				connection.getOutputStream(), Long.MAX_VALUE, encryptedIv,
+				secret);
+	}
+}
diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
new file mode 100644
index 0000000000..146f925851
--- /dev/null
+++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
@@ -0,0 +1,54 @@
+package net.sf.briar.transport.stream;
+
+import java.io.IOException;
+
+import net.sf.briar.api.ContactId;
+import net.sf.briar.api.TransportId;
+import net.sf.briar.api.db.DatabaseComponent;
+import net.sf.briar.api.db.DbException;
+import net.sf.briar.api.protocol.ProtocolReaderFactory;
+import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
+import net.sf.briar.api.transport.ConnectionReader;
+import net.sf.briar.api.transport.ConnectionReaderFactory;
+import net.sf.briar.api.transport.ConnectionWriter;
+import net.sf.briar.api.transport.ConnectionWriterFactory;
+import net.sf.briar.api.transport.StreamTransportConnection;
+
+public class OutgoingStreamConnection extends StreamConnection {
+
+	private final TransportId transportId;
+
+	private long connectionNum = -1L;
+
+	OutgoingStreamConnection(ConnectionReaderFactory connReaderFactory,
+			ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
+			ProtocolReaderFactory protoReaderFactory,
+			ProtocolWriterFactory protoWriterFactory, ContactId contactId,
+			StreamTransportConnection connection, TransportId transportId) {
+		super(connReaderFactory, connWriterFactory, db, protoReaderFactory,
+				protoWriterFactory, contactId, connection);
+		this.transportId = transportId;
+	}
+
+	@Override
+	protected ConnectionReader createConnectionReader() throws DbException,
+	IOException {
+		if(connectionNum == -1L)
+			connectionNum = db.getConnectionNumber(contactId, transportId);
+		byte[] secret = db.getSharedSecret(contactId);
+		return connReaderFactory.createConnectionReader(
+				connection.getInputStream(), false, transportId, connectionNum,
+				secret);
+	}
+
+	@Override
+	protected ConnectionWriter createConnectionWriter() throws DbException,
+	IOException {
+		if(connectionNum == -1L)
+			connectionNum = db.getConnectionNumber(contactId, transportId);
+		byte[] secret = db.getSharedSecret(contactId);
+		return connWriterFactory.createConnectionWriter(
+				connection.getOutputStream(), Long.MAX_VALUE, true, transportId,
+				connectionNum, secret);
+	}
+}
diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java
index 057459e754..652b44c0f1 100644
--- a/components/net/sf/briar/transport/stream/StreamConnection.java
+++ b/components/net/sf/briar/transport/stream/StreamConnection.java
@@ -37,7 +37,7 @@ import net.sf.briar.api.transport.ConnectionWriter;
 import net.sf.briar.api.transport.ConnectionWriterFactory;
 import net.sf.briar.api.transport.StreamTransportConnection;
 
-abstract class StreamConnection implements Runnable, DatabaseListener {
+abstract class StreamConnection implements DatabaseListener {
 
 	private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES };
 
@@ -113,8 +113,8 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 						notifyAll();
 					}
 				} else if(proto.hasRequest()) {
-					Collection<MessageId> offered, seen, unseen;
 					Request r = proto.readRequest();
+					Collection<MessageId> offered, seen, unseen;
 					synchronized(this) {
 						if(outgoingOffer == null)
 							throw new IOException("Unexpected request packet");
@@ -200,23 +200,21 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 						flags = writerFlags;
 						writerFlags = 0;
 					}
-					if((flags & Flags.BATCH_RECEIVED) != 0) {
-						sendAcks(ackWriter);
-					}
+					// Handle the flags in approximate order of urgency
 					if((flags & Flags.CONTACTS_UPDATED) != 0) {
 						if(!db.getContacts().contains(contactId)) {
 							close = true;
 							break;
 						}
 					}
-					if((flags & Flags.MESSAGES_ADDED) != 0) {
-						state = State.SEND_OFFER;
+					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
+						sendTransports(transportWriter);
 					}
 					if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
 						sendSubscriptions(subscriptionWriter);
 					}
-					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
-						sendTransports(transportWriter);
+					if((flags & Flags.BATCH_RECEIVED) != 0) {
+						sendAcks(ackWriter);
 					}
 					if((flags & Flags.OFFER_RECEIVED) != 0) {
 						sendRequest(requestWriter);
@@ -224,6 +222,9 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 					if((flags & Flags.REQUEST_RECEIVED) != 0) {
 						throw new IOException("Unexpected request packet");
 					}
+					if((flags & Flags.MESSAGES_ADDED) != 0) {
+						state = State.SEND_OFFER;
+					}
 					break;
 
 				case AWAIT_REQUEST:
@@ -237,23 +238,21 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 						flags = writerFlags;
 						writerFlags = 0;
 					}
-					if((flags & Flags.BATCH_RECEIVED) != 0) {
-						sendAcks(ackWriter);
-					}
+					// Handle the flags in approximate order of urgency
 					if((flags & Flags.CONTACTS_UPDATED) != 0) {
 						if(!db.getContacts().contains(contactId)) {
 							close = true;
 							break;
 						}
 					}
-					if((flags & Flags.MESSAGES_ADDED) != 0) {
-						// Ignored in this state
+					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
+						sendTransports(transportWriter);
 					}
 					if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
 						sendSubscriptions(subscriptionWriter);
 					}
-					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
-						sendTransports(transportWriter);
+					if((flags & Flags.BATCH_RECEIVED) != 0) {
+						sendAcks(ackWriter);
 					}
 					if((flags & Flags.OFFER_RECEIVED) != 0) {
 						sendRequest(requestWriter);
@@ -261,31 +260,32 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 					if((flags & Flags.REQUEST_RECEIVED) != 0) {
 						state = State.SEND_BATCHES;
 					}
+					if((flags & Flags.MESSAGES_ADDED) != 0) {
+						// Ignored in this state
+					}
 					break;
 
 				case SEND_BATCHES:
-					// Deal with any flags that have been raised
+					// Check whether any flags have been raised
 					synchronized(this) {
 						flags = writerFlags;
 						writerFlags = 0;
 					}
-					if((flags & Flags.BATCH_RECEIVED) != 0) {
-						sendAcks(ackWriter);
-					}
+					// Handle the flags in approximate order of urgency
 					if((flags & Flags.CONTACTS_UPDATED) != 0) {
 						if(!db.getContacts().contains(contactId)) {
 							close = true;
 							break;
 						}
 					}
-					if((flags & Flags.MESSAGES_ADDED) != 0) {
-						// Ignored in this state
+					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
+						sendTransports(transportWriter);
 					}
 					if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
 						sendSubscriptions(subscriptionWriter);
 					}
-					if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
-						sendTransports(transportWriter);
+					if((flags & Flags.BATCH_RECEIVED) != 0) {
+						sendAcks(ackWriter);
 					}
 					if((flags & Flags.OFFER_RECEIVED) != 0) {
 						sendRequest(requestWriter);
@@ -293,6 +293,9 @@ abstract class StreamConnection implements Runnable, DatabaseListener {
 					if((flags & Flags.REQUEST_RECEIVED) != 0) {
 						throw new IOException("Unexpected request packet");
 					}
+					if((flags & Flags.MESSAGES_ADDED) != 0) {
+						// Ignored in this state
+					}
 					// Send a batch if possible, otherwise an offer
 					if(!sendBatch(batchWriter)) state = State.SEND_OFFER;
 					break;
diff --git a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java
new file mode 100644
index 0000000000..0bf511f85f
--- /dev/null
+++ b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java
@@ -0,0 +1,73 @@
+package net.sf.briar.transport.stream;
+
+import net.sf.briar.api.ContactId;
+import net.sf.briar.api.TransportId;
+import net.sf.briar.api.db.DatabaseComponent;
+import net.sf.briar.api.protocol.ProtocolReaderFactory;
+import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
+import net.sf.briar.api.transport.ConnectionReaderFactory;
+import net.sf.briar.api.transport.ConnectionWriterFactory;
+import net.sf.briar.api.transport.StreamTransportConnection;
+import net.sf.briar.api.transport.stream.StreamConnectionFactory;
+
+import com.google.inject.Inject;
+
+public class StreamConnectionFactoryImpl implements StreamConnectionFactory {
+
+	private final ConnectionReaderFactory connReaderFactory;
+	private final ConnectionWriterFactory connWriterFactory;
+	private final DatabaseComponent db;
+	private final ProtocolReaderFactory protoReaderFactory;
+	private final ProtocolWriterFactory protoWriterFactory;
+
+	@Inject
+	StreamConnectionFactoryImpl(ConnectionReaderFactory connReaderFactory,
+			ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
+			ProtocolReaderFactory protoReaderFactory,
+			ProtocolWriterFactory protoWriterFactory) {
+		this.connReaderFactory = connReaderFactory;
+		this.connWriterFactory = connWriterFactory;
+		this.db = db;
+		this.protoReaderFactory = protoReaderFactory;
+		this.protoWriterFactory = protoWriterFactory;
+	}
+
+	public Runnable[] createIncomingConnection(ContactId c,
+			StreamTransportConnection s, byte[] encryptedIv) {
+		final StreamConnection conn = new IncomingStreamConnection(
+				connReaderFactory, connWriterFactory, db, protoReaderFactory,
+				protoWriterFactory, c, s, encryptedIv);
+		Runnable[] runnables = new Runnable[2];
+		runnables[0] = new Runnable() {
+			public void run() {
+				conn.write();
+			}
+		};
+		runnables[1] = new Runnable() {
+			public void run() {
+				conn.read();
+			}
+		};
+		return runnables;
+	}
+
+	public Runnable[] createOutgoingConnection(TransportId t, ContactId c,
+			StreamTransportConnection s) {
+		final StreamConnection conn = new OutgoingStreamConnection(
+				connReaderFactory, connWriterFactory, db, protoReaderFactory,
+				protoWriterFactory, c, s, t);
+		Runnable[] runnables = new Runnable[2];
+		runnables[0] = new Runnable() {
+			public void run() {
+				conn.write();
+			}
+		};
+		runnables[1] = new Runnable() {
+			public void run() {
+				conn.read();
+			}
+		};
+		return runnables;
+	}
+
+}
-- 
GitLab