diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java index a7f16952cf43a3a6fa4a69a3e904e504f694eaa7..410fa78c2ffd3a218e87e29d54aac7acfbc49d47 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java @@ -16,31 +16,28 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestEnd; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.net.ProtocolException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; +import javax.annotation.Nullable; + import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.SYNC; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; -abstract class AbstractMailboxSession { +public abstract class AbstractMailboxSession implements Runnable { private static final Logger LOG = Logger.getLogger(AbstractMailboxSession.class.getName()); - protected final Executor ioExecutor; - protected final ContactId contactId; + private final Executor ioExecutor; + private final ContactId contactId; private KeyManager keyManager; private SyncSessionFactory syncSessionFactory; private final StreamWriterFactory streamWriterFactory; @@ -51,7 +48,9 @@ abstract class AbstractMailboxSession { // Used to handle graceful session termination with END request private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false); - protected AtomicInteger activeHandlers = new AtomicInteger(0); + + @Nullable + private SYNCHandler syncHandler = null; public AbstractMailboxSession(Executor ioExecutor, @@ -74,11 +73,10 @@ abstract class AbstractMailboxSession { this.transportMaxIdleTime = transportMaxIdleTime; this.contactId = contactId; - registerSupportedRequest(END); + // All sessions must enable END handling + mailboxProtocol.registerRequestHandler(new ENDHandler()); } - abstract void run(); - /** * Must be called once at the end of a AbstractMailboxSession to signal the end of * the session to the peer. This call blocks until the remote session @@ -96,41 +94,6 @@ abstract class AbstractMailboxSession { } } - protected void waitForHandlersToFinish() throws InterruptedException { - synchronized (activeHandlers) { - while (activeHandlers.get() > 0) { - activeHandlers.wait(); - } - } - } - - protected void registerSupportedRequest(MailboxMessage.TYPE t) { - LOG.info("Handler registered: " + activeHandlers.get()); - activeHandlers.incrementAndGet(); - switch (t) { - case SYNC: - mailboxProtocol.registerRequestHandler(new SYNCHandler()); - break; - case TAKE: - mailboxProtocol.registerRequestHandler(new TAKEHandler()); - break; - case END: - mailboxProtocol.registerRequestHandler(new ENDHandler()); - break; - default: - throw new RuntimeException( - "Unable to register request: " + t.toString()); - } - } - - private void unregisterHandler() { - LOG.info("Handler unregistered: " + activeHandlers.get()); - synchronized (activeHandlers) { - if (activeHandlers.decrementAndGet() <= 0) - activeHandlers.notifyAll(); - } - } - private class ENDHandler implements MailboxRequestHandler { @Override public void handleRequest(MailboxRequest request) { @@ -147,7 +110,6 @@ abstract class AbstractMailboxSession { @Override public void protocolFinished() { - unregisterHandler(); } } @@ -156,14 +118,34 @@ abstract class AbstractMailboxSession { * this runs an incoming session which is fed data from incoming * requests and runs and outgoing Session which writes SYNC requests * <p> - * Note: The stream is not encrypted, since the MailboxProtocol is responsible - * for encrypting/decrypting requests + * NOTE: This function terminates until the connection is closed */ + protected void enableSYNCHandling() { + syncHandler = new SYNCHandler(); + mailboxProtocol.registerRequestHandler(syncHandler); + } + + protected void awaitSYNCHandlerFinished() throws InterruptedException { + if (syncHandler == null) + throw new RuntimeException("SYNC handling is not enabled"); + + synchronized (syncHandler.isFinished) { + while (!syncHandler.isFinished.get()) + syncHandler.isFinished.wait(); + } + } + + private class SYNCHandler implements MailboxRequestHandler { private FeedableSyncInputStream syncInputStream; private MailboxSyncRequestWriter syncWriter; private SyncSession outgoingSession; + private AtomicBoolean isFinished = new AtomicBoolean(false); + /* + * Note: The stream is not encrypted, since the MailboxProtocol is responsible + * for encrypting/decrypting requests + */ private SYNCHandler() { syncWriter = new MailboxSyncRequestWriter(mailboxProtocol); syncInputStream = new FeedableSyncInputStream(); @@ -226,71 +208,12 @@ abstract class AbstractMailboxSession { @Override public void protocolFinished() { syncInputStream.close(); - try { - syncWriter.close(); - } catch (IOException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - } outgoingSession.interrupt(); LOG.info("SYNC request handler terminated"); - unregisterHandler(); - - } - } - - private class TAKEHandler implements MailboxRequestHandler { - /** - * Handles an incoming TAKE request. The request contains an encrypted BSP - * stream. From this stream, the tag is read and a StreamReader is created. - * A BSP IncomingSession is created for the stream - * - * @param request MailboxRequestTake - */ - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - MailboxRequestTake takeRequest = (MailboxRequestTake) request; - InputStream in = new ByteArrayInputStream( - takeRequest.getEncryptedSyncStream()); - byte[] tag = new byte[TAG_LENGTH]; - try { - // read tag from input stream - int offset = 0; - while (offset < tag.length) { - int read = in.read(tag, offset, tag.length - offset); - if (read == -1) throw new EOFException(); - offset += read; - } - - StreamContext ctx = - keyManager.getStreamContext(MailboxConstants.ID, tag); - - if (ctx == null) { - throw new IOException( - "Received stream with unrecognisable tag"); - } - - InputStream reader = - streamReaderFactory.createStreamReader(in, ctx); - - syncSessionFactory - .createIncomingSession(ctx.getContactId(), reader) - .run(); - } catch (DbException | IOException e) { - throw new ProtocolException(e.toString()); + synchronized (isFinished) { + isFinished.set(true); + isFinished.notifyAll(); } - - } - - @Override - public MailboxMessage.TYPE getType() { - return TAKE; - } - - @Override - public void protocolFinished() { - unregisterHandler(); } }