From ccfdcfc2769b372e663b9655180f1370f92d4d27 Mon Sep 17 00:00:00 2001
From: bontric <benjohnwie@gmail.com>
Date: Wed, 12 Sep 2018 18:52:48 +0200
Subject: [PATCH] implementsyncsession reuse (WIP) (abandoning this task, maybe
 useful in the future)

---
 .../mailbox/AbstractMailboxSession.java       | 95 +++++++++++++++++--
 .../bramble/mailbox/MailboxServiceImpl.java   |  1 +
 2 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java
index ca7c10d7b..ddd5cf20a 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java
@@ -22,7 +22,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.net.ProtocolException;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
@@ -46,6 +50,9 @@ abstract class AbstractMailboxSession {
 	private final int transportMaxLatency;
 	private final int transportMaxIdleTime;
 
+	private LinkedHashMap<ContactId, PipedOutputStream>
+			incomingSyncSessionWriter = new LinkedHashMap<>();
+
 	// Used to handle graceful session termination with END request
 	private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false);
 	protected AtomicBoolean terminated = new AtomicBoolean(false);
@@ -147,8 +154,27 @@ abstract class AbstractMailboxSession {
 					LOG.info(e.toString());
 				handleProtocolException();
 			} finally {
-				synchronized (remoteSessionFinished){
-					remoteSessionFinished.set(true);
+				readingFinished();
+			}
+		}
+	}
+
+	/**
+	 * Called when no more requests will be received from the remote
+	 */
+	protected void readingFinished() {
+		synchronized (remoteSessionFinished) {
+			remoteSessionFinished.set(true);
+		}
+
+		synchronized (incomingSyncSessionWriter) {
+			//Close all Incoming SyncSessions assosciated with this MailboxSession
+			for (Map.Entry<ContactId, PipedOutputStream> entry : incomingSyncSessionWriter
+					.entrySet()) {
+				try {
+					entry.getValue().close();
+				} catch (IOException e) {
+					logException(LOG, WARNING, e);
 				}
 			}
 		}
@@ -228,14 +254,62 @@ abstract class AbstractMailboxSession {
 	 * @param req MailboxRequestTake
 	 */
 	public void handleSync(MailboxRequestSync req) {
+		//
+		if (req.isEndOfStream())
+			closeStreamForIncomingSession(contactId);
+
 		try {
-			InputStream is = new ByteArrayInputStream(req.getSyncStream());
-			syncSessionFactory.createIncomingSession(contactId, is).run();
+			receiveSyncStream(contactId, req.getSyncStream());
 		} catch (IOException e) {
 			logException(LOG, WARNING, e);
 		}
 	}
 
+	private void closeStreamForIncomingSession(ContactId id) {
+		synchronized (incomingSyncSessionWriter) {
+			if (!incomingSyncSessionWriter.containsKey(contactId)) {
+				if (LOG.isLoggable(WARNING))
+					LOG.warning(
+							"Received end of stream for nonexistent session");
+				return;
+			}
+
+			try {
+				incomingSyncSessionWriter.get(contactId).close();
+			} catch (IOException e) {
+				if (LOG.isLoggable(INFO))
+					LOG.info(e.toString());
+			}
+			incomingSyncSessionWriter.remove(id);
+		}
+	}
+
+	private void receiveSyncStream(ContactId id, byte[] syncStream)
+			throws IOException {
+		PipedOutputStream os;
+		synchronized (incomingSyncSessionWriter) {
+			if (!incomingSyncSessionWriter.containsKey(contactId)) {
+				os = new PipedOutputStream();
+				PipedInputStream is = new PipedInputStream(os);
+				incomingSyncSessionWriter.put(contactId, os);
+				ioExecutor.execute(() -> {
+					try {
+						syncSessionFactory.createIncomingSession(contactId, is)
+								.run();
+					} catch (IOException e) {
+						if (LOG.isLoggable(INFO))
+							LOG.info(e.toString());
+					}
+				});
+			} else {
+				os = incomingSyncSessionWriter.get(id);
+			}
+		}
+		synchronized (os) {
+			os.write(syncStream);
+		}
+	}
+
 	/**
 	 * Handles an incoming TAKE request. The request contains an encrypted BSP
 	 * stream. From this stream, the tag is read and a StreamReader is created.
@@ -265,11 +339,13 @@ abstract class AbstractMailboxSession {
 						"Received stream with unrecognisable tag");
 			}
 
+
 			InputStream reader =
 					streamReaderFactory.createStreamReader(in, ctx);
 
-			syncSessionFactory.createIncomingSession(ctx.getContactId(), reader)
-					.run();
+			byte[] syncStream = new byte[reader.available()];
+			reader.read(syncStream);
+			receiveSyncStream(ctx.getContactId(), syncStream);
 		} catch (DbException | IOException e) {
 			throw new MailboxSessionHandleException(e.toString());
 		}
@@ -280,10 +356,13 @@ abstract class AbstractMailboxSession {
 		// TODO: mailbox storage for Mailbox implementation
 	}
 
-	public abstract void run() throws IOException;
-
+	/**
+	 * Called when a protocol exception occurs when reading a request
+	 */
 	protected abstract void handleProtocolException();
 
+	public abstract void run() throws IOException;
+
 	protected class MailboxSessionHandleException extends Exception {
 		public MailboxSessionHandleException(String error) {
 			super(error);
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java
index afe06b590..a0610dfe0 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java
@@ -48,6 +48,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
 
 /**
  * The Mailbox Service runs to poll mailboxes
+ * TODO/FIXME REFACTOR!
  */
 public class MailboxServiceImpl implements MailboxService, EventListener {
 	private static final Logger LOG =
-- 
GitLab