From 5b8eab60352cc6f544c1aca16fd08d1ee997104e Mon Sep 17 00:00:00 2001
From: akwizgran <akwizgran@users.sourceforge.net>
Date: Wed, 5 Nov 2014 18:28:05 +0000
Subject: [PATCH] Interrupt the other side of a duplex connection if an
 exception occurs.

---
 .../plugins/TransportConnectionReader.java    |   4 +-
 .../transport/ConnectionDispatcherImpl.java   | 346 +++++++++++-------
 2 files changed, 218 insertions(+), 132 deletions(-)

diff --git a/briar-api/src/org/briarproject/api/plugins/TransportConnectionReader.java b/briar-api/src/org/briarproject/api/plugins/TransportConnectionReader.java
index 71bf0cc03f..c4c99dfa37 100644
--- a/briar-api/src/org/briarproject/api/plugins/TransportConnectionReader.java
+++ b/briar-api/src/org/briarproject/api/plugins/TransportConnectionReader.java
@@ -25,8 +25,8 @@ public interface TransportConnectionReader {
 	 * the connection has been marked as closed.
 	 * @param exception true if the connection is being closed because of an
 	 * exception. This may affect how resources are disposed of.
-	 * @param recognised true if the pseudo-random tag was recognised. This may
-	 * affect how resources are disposed of.
+	 * @param recognised true if the connection is definitely a Briar transport
+	 * connection. This may affect how resources are disposed of.
 	 */
 	void dispose(boolean exception, boolean recognised) throws IOException;
 }
diff --git a/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java b/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
index abae50bff7..63f0503f6d 100644
--- a/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
+++ b/briar-core/src/org/briarproject/transport/ConnectionDispatcherImpl.java
@@ -69,88 +69,18 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 		ioExecutor.execute(new DispatchOutgoingDuplexConnection(c, t, d));
 	}
 
-	private StreamContext readAndRecogniseTag(TransportId t,
-			TransportConnectionReader r) {
+	private byte[] readTag(TransportId t, TransportConnectionReader r)
+			throws IOException {
 		// Read the tag
 		byte[] tag = new byte[TAG_LENGTH];
-		try {
-			InputStream in = r.getInputStream();
-			int offset = 0;
-			while(offset < tag.length) {
-				int read = in.read(tag, offset, tag.length - offset);
-				if(read == -1) throw new EOFException();
-				offset += read;
-			}
-		} catch(IOException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
-			dispose(r, true, false);
-			return null;
-		}
-		// Recognise the tag
-		StreamContext ctx = null;
-		try {
-			ctx = tagRecogniser.recogniseTag(t, tag);
-		} catch(DbException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
-			dispose(r, true, false);
-			return null;
-		}
-		if(ctx == null) dispose(r, false, false);
-		return ctx;
-	}
-
-	private void runAndDispose(StreamContext ctx, TransportConnectionReader r) {
-		MessagingSession in =
-				messagingSessionFactory.createIncomingSession(ctx, r);
-		ContactId contactId = ctx.getContactId();
-		TransportId transportId = ctx.getTransportId();
-		connectionRegistry.registerConnection(contactId, transportId);
-		try {
-			in.run();
-		} catch(IOException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
-			dispose(r, true, true);
-			return;
-		} finally {
-			connectionRegistry.unregisterConnection(contactId, transportId);
-		}
-		dispose(r, false, true);
-	}
-
-	private void dispose(TransportConnectionReader r, boolean exception,
-			boolean recognised) {
-		try {
-			r.dispose(exception, recognised);
-		} catch(IOException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
-		}
-	}
-
-	private void runAndDispose(StreamContext ctx, TransportConnectionWriter w,
-			boolean duplex) {
-		MessagingSession out =
-				messagingSessionFactory.createOutgoingSession(ctx, w, duplex);
-		ContactId contactId = ctx.getContactId();
-		TransportId transportId = ctx.getTransportId();
-		connectionRegistry.registerConnection(contactId, transportId);
-		try {
-			out.run();
-		} catch(IOException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
-			dispose(w, true);
-			return;
-		} finally {
-			connectionRegistry.unregisterConnection(contactId, transportId);
-		}
-		dispose(w, false);
-	}
-
-	private void dispose(TransportConnectionWriter w, boolean exception) {
-		try {
-			w.dispose(exception);
-		} catch(IOException e) {
-			if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+		InputStream in = r.getInputStream();
+		int offset = 0;
+		while(offset < tag.length) {
+			int read = in.read(tag, offset, tag.length - offset);
+			if(read == -1) throw new EOFException();
+			offset += read;
 		}
+		return tag;
 	}
 
 	private class DispatchIncomingSimplexConnection implements Runnable {
@@ -166,10 +96,46 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 
 		public void run() {
 			// Read and recognise the tag
-			StreamContext ctx = readAndRecogniseTag(transportId, reader);
-			if(ctx == null) return;
+			StreamContext ctx;
+			try {
+				byte[] tag = readTag(transportId, reader);
+				ctx = tagRecogniser.recogniseTag(transportId, tag);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, false);
+				return;
+			} catch(DbException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, false);
+				return;
+			}
+			if(ctx == null) {
+				LOG.info("Unrecognised tag");
+				disposeReader(true, false);
+				return;
+			}
+			ContactId contactId = ctx.getContactId();
+			connectionRegistry.registerConnection(contactId, transportId);
 			// Run the incoming session
-			runAndDispose(ctx, reader);
+			MessagingSession incomingSession =
+					messagingSessionFactory.createIncomingSession(ctx, reader);
+			try {
+				incomingSession.run();
+				disposeReader(false, true);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, true);
+			} finally {
+				connectionRegistry.unregisterConnection(contactId, transportId);
+			}
+		}
+
+		private void disposeReader(boolean exception, boolean recognised) {
+			try {
+				reader.dispose(exception, recognised);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
 		}
 	}
 
@@ -191,11 +157,32 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 			StreamContext ctx = keyManager.getStreamContext(contactId,
 					transportId);
 			if(ctx == null) {
-				dispose(writer, false);
+				LOG.warning("Could not allocate stream context");
+				disposeWriter(true);
 				return;
 			}
+			connectionRegistry.registerConnection(contactId, transportId);
 			// Run the outgoing session
-			runAndDispose(ctx, writer, false);
+			MessagingSession outgoingSession =
+					messagingSessionFactory.createOutgoingSession(ctx,
+							writer, false);
+			try {
+				outgoingSession.run();
+				disposeWriter(false);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeWriter(true);
+			} finally {
+				connectionRegistry.unregisterConnection(contactId, transportId);
+			}
+		}
+
+		private void disposeWriter(boolean exception) {
+			try {
+				writer.dispose(exception);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
 		}
 	}
 
@@ -205,6 +192,10 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 		private final TransportConnectionReader reader;
 		private final TransportConnectionWriter writer;
 
+		private volatile ContactId contactId = null;
+		private volatile MessagingSession incomingSession = null;
+		private volatile MessagingSession outgoingSession = null;
+
 		private DispatchIncomingDuplexConnection(TransportId transportId,
 				DuplexTransportConnection transport) {
 			this.transportId = transportId;
@@ -214,39 +205,85 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 
 		public void run() {
 			// Read and recognise the tag
-			StreamContext ctx = readAndRecogniseTag(transportId, reader);
-			if(ctx == null) return;
+			StreamContext ctx;
+			try {
+				byte[] tag = readTag(transportId, reader);
+				ctx = tagRecogniser.recogniseTag(transportId, tag);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, false);
+				return;
+			} catch(DbException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, false);
+				return;
+			}
+			if(ctx == null) {
+				LOG.info("Unrecognised tag");
+				disposeReader(true, false);
+				return;
+			}
+			contactId = ctx.getContactId();
+			connectionRegistry.registerConnection(contactId, transportId);
 			// Start the outgoing session on another thread
-			ioExecutor.execute(new DispatchIncomingDuplexConnectionSide2(
-					ctx.getContactId(), transportId, writer));
+			ioExecutor.execute(new Runnable() {
+				public void run() {
+					runOutgoingSession();
+				}
+			});
 			// Run the incoming session
-			runAndDispose(ctx, reader);
-		}
-	}
-
-	private class DispatchIncomingDuplexConnectionSide2 implements Runnable {
-
-		private final ContactId contactId;
-		private final TransportId transportId;
-		private final TransportConnectionWriter writer;
-
-		private DispatchIncomingDuplexConnectionSide2(ContactId contactId,
-				TransportId transportId, TransportConnectionWriter writer) {
-			this.contactId = contactId;
-			this.transportId = transportId;
-			this.writer = writer;
+			incomingSession = messagingSessionFactory.createIncomingSession(ctx,
+					reader);
+			try {
+				incomingSession.run();
+				disposeReader(false, true);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, true);
+			} finally {
+				connectionRegistry.unregisterConnection(contactId, transportId);
+			}
 		}
 
-		public void run() {
+		private void runOutgoingSession() {
 			// Allocate a stream context
 			StreamContext ctx = keyManager.getStreamContext(contactId,
 					transportId);
 			if(ctx == null) {
-				dispose(writer, false);
+				LOG.warning("Could not allocate stream context");
+				disposeWriter(true);
 				return;
 			}
 			// Run the outgoing session
-			runAndDispose(ctx, writer, true);
+			outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
+					writer, true);
+			try {
+				outgoingSession.run();
+				disposeWriter(false);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeWriter(true);
+			}
+		}
+
+		private void disposeReader(boolean exception, boolean recognised) {
+			if(exception && outgoingSession != null)
+				outgoingSession.interrupt();
+			try {
+				reader.dispose(exception, recognised);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
+		}
+
+		private void disposeWriter(boolean exception) {
+			if(exception && incomingSession != null)
+				incomingSession.interrupt();
+			try {
+				writer.dispose(exception);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
 		}
 	}
 
@@ -257,6 +294,9 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 		private final TransportConnectionReader reader;
 		private final TransportConnectionWriter writer;
 
+		private volatile MessagingSession incomingSession = null;
+		private volatile MessagingSession outgoingSession = null;
+
 		private DispatchOutgoingDuplexConnection(ContactId contactId,
 				TransportId transportId, DuplexTransportConnection transport) {
 			this.contactId = contactId;
@@ -270,42 +310,88 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
 			StreamContext ctx = keyManager.getStreamContext(contactId,
 					transportId);
 			if(ctx == null) {
-				dispose(writer, false);
+				LOG.warning("Could not allocate stream context");
+				disposeWriter(true);
 				return;
 			}
+			connectionRegistry.registerConnection(contactId, transportId);
 			// Start the incoming session on another thread
-			ioExecutor.execute(new DispatchOutgoingDuplexConnectionSide2(
-					contactId, transportId, reader));
+			ioExecutor.execute(new Runnable() {
+				public void run() {
+					runIncomingSession();
+				}
+			});
 			// Run the outgoing session
-			runAndDispose(ctx, writer, true);
-		}
-	}
-
-	private class DispatchOutgoingDuplexConnectionSide2 implements Runnable {
-
-		private final ContactId contactId;
-		private final TransportId transportId;
-		private final TransportConnectionReader reader;
-
-		private DispatchOutgoingDuplexConnectionSide2(ContactId contactId,
-				TransportId transportId, TransportConnectionReader reader) {
-			this.contactId = contactId;
-			this.transportId = transportId;
-			this.reader = reader;
+			outgoingSession = messagingSessionFactory.createOutgoingSession(ctx,
+					writer, true);
+			try {
+				outgoingSession.run();
+				disposeWriter(false);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeWriter(true);
+			} finally {
+				connectionRegistry.unregisterConnection(contactId, transportId);
+			}
 		}
 
-		public void run() {
+		private void runIncomingSession() {
 			// Read and recognise the tag
-			StreamContext ctx = readAndRecogniseTag(transportId, reader);
-			if(ctx == null) return;
+			StreamContext ctx;
+			try {
+				byte[] tag = readTag(transportId, reader);
+				ctx = tagRecogniser.recogniseTag(transportId, tag);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, true);
+				return;
+			} catch(DbException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, true);
+				return;
+			}
+			// Unrecognised tags are suspicious in this case
+			if(ctx == null) {
+				LOG.warning("Unrecognised tag for returning stream");
+				disposeReader(true, true);
+				return;
+			}
 			// Check that the stream comes from the expected contact
 			if(!ctx.getContactId().equals(contactId)) {
-				LOG.warning("Wrong contact ID for duplex connection");
-				dispose(reader, true, true);
+				LOG.warning("Wrong contact ID for returning stream");
+				disposeReader(true, true);
 				return;
 			}
 			// Run the incoming session
-			runAndDispose(ctx, reader);
+			incomingSession = messagingSessionFactory.createIncomingSession(ctx,
+					reader);
+			try {
+				incomingSession.run();
+				disposeReader(false, true);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+				disposeReader(true, true);
+			}
+		}
+
+		private void disposeReader(boolean exception, boolean recognised) {
+			if(exception && outgoingSession != null)
+				outgoingSession.interrupt();
+			try {
+				reader.dispose(exception, recognised);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
+		}
+
+		private void disposeWriter(boolean exception) {
+			if(exception && incomingSession != null)
+				incomingSession.interrupt();
+			try {
+				writer.dispose(exception);
+			} catch(IOException e) {
+				if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
+			}
 		}
 	}
 }
\ No newline at end of file
-- 
GitLab