diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java index ca7c10d7bd3292216e3bfb9812314263ec814f59..ddd5cf20a259d69198da11f1dcd77806d178f7aa 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java @@ -22,7 +22,11 @@ import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.net.ProtocolException; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -46,6 +50,9 @@ abstract class AbstractMailboxSession { private final int transportMaxLatency; private final int transportMaxIdleTime; + private LinkedHashMap<ContactId, PipedOutputStream> + incomingSyncSessionWriter = new LinkedHashMap<>(); + // Used to handle graceful session termination with END request private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false); protected AtomicBoolean terminated = new AtomicBoolean(false); @@ -147,8 +154,27 @@ abstract class AbstractMailboxSession { LOG.info(e.toString()); handleProtocolException(); } finally { - synchronized (remoteSessionFinished){ - remoteSessionFinished.set(true); + readingFinished(); + } + } + } + + /** + * Called when no more requests will be received from the remote + */ + protected void readingFinished() { + synchronized (remoteSessionFinished) { + remoteSessionFinished.set(true); + } + + synchronized (incomingSyncSessionWriter) { + //Close all Incoming SyncSessions assosciated with this MailboxSession + for (Map.Entry<ContactId, PipedOutputStream> entry : incomingSyncSessionWriter + .entrySet()) { + try { + entry.getValue().close(); + } catch (IOException e) { + logException(LOG, WARNING, e); } } } @@ -228,14 +254,62 @@ abstract class AbstractMailboxSession { * @param req MailboxRequestTake */ public void handleSync(MailboxRequestSync req) { + // + if (req.isEndOfStream()) + closeStreamForIncomingSession(contactId); + try { - InputStream is = new ByteArrayInputStream(req.getSyncStream()); - syncSessionFactory.createIncomingSession(contactId, is).run(); + receiveSyncStream(contactId, req.getSyncStream()); } catch (IOException e) { logException(LOG, WARNING, e); } } + private void closeStreamForIncomingSession(ContactId id) { + synchronized (incomingSyncSessionWriter) { + if (!incomingSyncSessionWriter.containsKey(contactId)) { + if (LOG.isLoggable(WARNING)) + LOG.warning( + "Received end of stream for nonexistent session"); + return; + } + + try { + incomingSyncSessionWriter.get(contactId).close(); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + incomingSyncSessionWriter.remove(id); + } + } + + private void receiveSyncStream(ContactId id, byte[] syncStream) + throws IOException { + PipedOutputStream os; + synchronized (incomingSyncSessionWriter) { + if (!incomingSyncSessionWriter.containsKey(contactId)) { + os = new PipedOutputStream(); + PipedInputStream is = new PipedInputStream(os); + incomingSyncSessionWriter.put(contactId, os); + ioExecutor.execute(() -> { + try { + syncSessionFactory.createIncomingSession(contactId, is) + .run(); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + }); + } else { + os = incomingSyncSessionWriter.get(id); + } + } + synchronized (os) { + os.write(syncStream); + } + } + /** * Handles an incoming TAKE request. The request contains an encrypted BSP * stream. From this stream, the tag is read and a StreamReader is created. @@ -265,11 +339,13 @@ abstract class AbstractMailboxSession { "Received stream with unrecognisable tag"); } + InputStream reader = streamReaderFactory.createStreamReader(in, ctx); - syncSessionFactory.createIncomingSession(ctx.getContactId(), reader) - .run(); + byte[] syncStream = new byte[reader.available()]; + reader.read(syncStream); + receiveSyncStream(ctx.getContactId(), syncStream); } catch (DbException | IOException e) { throw new MailboxSessionHandleException(e.toString()); } @@ -280,10 +356,13 @@ abstract class AbstractMailboxSession { // TODO: mailbox storage for Mailbox implementation } - public abstract void run() throws IOException; - + /** + * Called when a protocol exception occurs when reading a request + */ protected abstract void handleProtocolException(); + public abstract void run() throws IOException; + protected class MailboxSessionHandleException extends Exception { public MailboxSessionHandleException(String error) { super(error); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java index afe06b5903779e5b02275bab7fd9f2b48e02e5f0..a0610dfe03848229558d0b9afd6e10e698338757 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java @@ -48,6 +48,7 @@ import static org.briarproject.bramble.util.LogUtils.logException; /** * The Mailbox Service runs to poll mailboxes + * TODO/FIXME REFACTOR! */ public class MailboxServiceImpl implements MailboxService, EventListener { private static final Logger LOG =