diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java index ca7c10d7bd3292216e3bfb9812314263ec814f59..e92034dd24a9211ecd6f779a7052a00d07227312 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java @@ -28,10 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import static java.util.logging.Level.INFO; -import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; -import static org.briarproject.bramble.util.LogUtils.logException; abstract class AbstractMailboxSession { private static final Logger LOG = @@ -50,6 +48,10 @@ abstract class AbstractMailboxSession { private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false); protected AtomicBoolean terminated = new AtomicBoolean(false); + private AtomicBoolean hasActiveIncomingSyncSession = + new AtomicBoolean(false); + private FeedableSyncInputStream syncInputStream = null; + public AbstractMailboxSession(Executor ioExecutor, KeyManager keyManager, SyncSessionFactory syncSessionFactory, @@ -133,27 +135,38 @@ abstract class AbstractMailboxSession { * Must be called if the session wants to receive and handle requests */ public void readRequests() { - while (!terminated.get()) { - try { + try { + while (!terminated.get()) { MailboxRequest req = mailboxProtocol.getNextRequest(); if (req.getType() == END) return; ioExecutor.execute(() -> handleRequest(req)); - } catch (InterruptedException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - } catch (ProtocolException e) { + + } + } catch (InterruptedException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } catch (ProtocolException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + handleProtocolException(); + } finally { + synchronized (remoteSessionFinished) { + remoteSessionFinished.set(true); + } + + if (hasActiveIncomingSyncSession.get()) { + syncInputStream.close(); if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - handleProtocolException(); - } finally { - synchronized (remoteSessionFinished){ - remoteSessionFinished.set(true); - } + LOG.info("Stopping incoming session"); } } } + protected void handleMailboxProtocolReadingFinished(){ + + } + /** * Calls the appropriate handler for an incoming request * <p> @@ -228,11 +241,37 @@ abstract class AbstractMailboxSession { * @param req MailboxRequestTake */ public void handleSync(MailboxRequestSync req) { - try { - InputStream is = new ByteArrayInputStream(req.getSyncStream()); - syncSessionFactory.createIncomingSession(contactId, is).run(); - } catch (IOException e) { - logException(LOG, WARNING, e); + synchronized (hasActiveIncomingSyncSession) { + if (!hasActiveIncomingSyncSession.get()) { + syncInputStream = new FeedableSyncInputStream(); + ioExecutor.execute(() -> { + try { + syncSessionFactory + .createIncomingSession(contactId, + syncInputStream) + .run(); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + }); + hasActiveIncomingSyncSession.set(true); + } + + if (req.isEndOfStream()) { + syncInputStream.close(); + return; + } + + if (syncInputStream.isClosed()) + return; + + try { + syncInputStream.feed(req.getSyncStream()); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/FeedableSyncInputStream.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/FeedableSyncInputStream.java new file mode 100644 index 0000000000000000000000000000000000000000..645be7191273e3686fc2c0005c29a2f17892f0f1 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/FeedableSyncInputStream.java @@ -0,0 +1,62 @@ +package org.briarproject.bramble.mailbox; + +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public class FeedableSyncInputStream extends InputStream { + private ByteArrayOutputStream os = new ByteArrayOutputStream(); + private boolean isEOF = false; + private byte[] activeBuffer = null; + private int activeBufferPointer; + + @Override + public synchronized int read() throws IOException { + while (!hasBytes()) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new IOException(e.toString()); + } + } + + if (isEOF) + throw new EOFException(); + + if (activeBufferPointer >= activeBuffer.length) { + activeBuffer = os.toByteArray(); + activeBufferPointer = 0; + os.reset(); + } + + return activeBuffer[activeBufferPointer++]; + } + + private boolean hasBytes() { + return (activeBuffer != null && + activeBufferPointer < activeBuffer.length) || os.size() > 0; + } + + public synchronized void feed(byte[] buffer) throws IOException { + if (isEOF) + throw new EOFException(); + + if (activeBuffer == null) + activeBuffer = buffer; + + os.write(buffer); + this.notifyAll(); + } + + public synchronized void close() { + isEOF = true; + this.notifyAll(); + } + + public synchronized boolean isClosed() { + return isEOF; + } + + +}