diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java index e92034dd24a9211ecd6f779a7052a00d07227312..60eac959b30906c0554d445665ca2572c1fc0c9d 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/AbstractMailboxSession.java @@ -10,10 +10,11 @@ import org.briarproject.bramble.api.transport.StreamContext; import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestEnd; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync; import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; @@ -25,11 +26,15 @@ import java.io.InputStream; import java.net.ProtocolException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.SYNC; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; abstract class AbstractMailboxSession { private static final Logger LOG = @@ -46,11 +51,8 @@ abstract class AbstractMailboxSession { // Used to handle graceful session termination with END request private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false); - protected AtomicBoolean terminated = new AtomicBoolean(false); + protected AtomicInteger activeHandlers = new AtomicInteger(0); - private AtomicBoolean hasActiveIncomingSyncSession = - new AtomicBoolean(false); - private FeedableSyncInputStream syncInputStream = null; public AbstractMailboxSession(Executor ioExecutor, KeyManager keyManager, @@ -71,261 +73,255 @@ abstract class AbstractMailboxSession { this.transportMaxLatency = transportMaxLatency; this.transportMaxIdleTime = transportMaxIdleTime; this.contactId = contactId; - } - /** - * @return True if the session is running - */ - public boolean isTerminated() { - return terminated.get(); + registerSupportetRequest(END); } + abstract void run(); /** - * Creates and runs a duplex session which is used for sync streams between - * mailbox and owner (user) - * - * @return An Outgoing Duplex Session which writes MailboxProtocol SYNC requests - * using a MailboxSyncRequestWriter - * <p> - * Note: The stream is not encrypted, since the MailboxProtocol is responsible - * for encrypting/decrypting requests - */ - protected SyncSession createDuplexOutgoingSession() { - MailboxSyncRequestWriter mbWriter = - new MailboxSyncRequestWriter(mailboxProtocol); - - return syncSessionFactory - .createDuplexOutgoingSession(contactId, transportMaxLatency, - transportMaxIdleTime, mbWriter); - } - - /** - * Get an encrypted Sync stream which can be stored on the mailbox by - * issuing a STORE request - * - * @param targetContactId The encrypted sync stream is generated for the given - * contactId - * @return Encrypted Sync stream - * @throws DbException - * @throws IOException + * Must be called once at the end of a AbstractMailboxSession to signal the end of + * the session to the peer. This call blocks until the remote session + * signals that it was ended. */ - protected byte[] getSyncStreamToStore(ContactId targetContactId) - throws DbException, IOException { - StreamContext ctx = - keyManager - .getStreamContext(targetContactId, MailboxConstants.ID); - - if (ctx == null) - throw new IOException("Could not allocated stream context"); - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - - StreamWriter streamWriter = streamWriterFactory - .createStreamWriter(os, ctx); - - syncSessionFactory.createSimplexOutgoingSession(targetContactId, - MailboxConstants.MAX_LATENCY, streamWriter); - - return os.toByteArray(); - } - + protected void endSession() + throws InterruptedException, IOException { - /** - * Must be called if the session wants to receive and handle requests - */ - public void readRequests() { - try { - while (!terminated.get()) { - MailboxRequest req = mailboxProtocol.getNextRequest(); - if (req.getType() == END) - return; - ioExecutor.execute(() -> handleRequest(req)); + mailboxProtocol.writeRequest(new MailboxRequestEnd()); + synchronized (remoteSessionFinished) { + while (!remoteSessionFinished.get()) { + remoteSessionFinished.wait(); } - } catch (InterruptedException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - } catch (ProtocolException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - handleProtocolException(); - } finally { - synchronized (remoteSessionFinished) { - remoteSessionFinished.set(true); - } + } + } - if (hasActiveIncomingSyncSession.get()) { - syncInputStream.close(); - if (LOG.isLoggable(INFO)) - LOG.info("Stopping incoming session"); + protected void waitForHandlersToFinish() throws InterruptedException { + synchronized (activeHandlers) { + while (activeHandlers.get() > 0) { + activeHandlers.wait(); } } } - protected void handleMailboxProtocolReadingFinished(){ + protected void registerSupportetRequest(MailboxMessage.TYPE t) { + LOG.info("Handler registered: " + activeHandlers.get()); + activeHandlers.incrementAndGet(); + switch (t) { + case SYNC: + mailboxProtocol.registerRequestHandler(new SYNCHandler()); + break; + case TAKE: + mailboxProtocol.registerRequestHandler(new TAKEHandler()); + break; + case END: + mailboxProtocol.registerRequestHandler(new ENDHandler()); + break; + default: + throw new RuntimeException( + "Unable to register request: " + t.toString()); + } + } + private void unregisterHandler() { + LOG.info("Handler unregistered: " + activeHandlers.get()); + synchronized (activeHandlers) { + if (activeHandlers.decrementAndGet() <= 0) + activeHandlers.notifyAll(); + } } - /** - * Calls the appropriate handler for an incoming request - * <p> - * NOTE: If a session does not support a specific request the appropriate - * handler needs to be overwritten. This handler should throw a - * {@link MailboxSessionHandleException}. - * - * @param req - */ - private void handleRequest(MailboxRequest req) { - - String error = null; - try { - switch (req.getType()) { - case STORE: - handleStore((MailboxRequestStore) req); - break; - case TAKE: - handleTake((MailboxRequestTake) req); - break; - case SYNC: - handleSync((MailboxRequestSync) req); - break; - default: - throw new MailboxSessionHandleException( - "Unsupported request type"); + private class ENDHandler implements MailboxRequestHandler { + @Override + public void handleRequest(MailboxRequest request) { + synchronized (remoteSessionFinished) { + remoteSessionFinished.set(true); + remoteSessionFinished.notifyAll(); } - } catch (MailboxSessionHandleException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - error = e.toString(); } - if (!req.hasResponse()) - return; - - try { - if (error == null) - mailboxProtocol.writeSucessResponse(req); - else - mailboxProtocol.writeErrorResponse(req, error); - } catch (InterruptedException | IOException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); + @Override + public MailboxMessage.TYPE getType() { + return END; } - } - - /** - * Must be called once at the end of a AbstractMailboxSession to signal the end of - * the session to the peer. This call blocks until the remote session - * signals that it was ended. - */ - protected void endSession() - throws InterruptedException, IOException { - mailboxProtocol.writeRequest(new MailboxRequestEnd()); - - synchronized (remoteSessionFinished) { - while (!remoteSessionFinished.get()) { - remoteSessionFinished.wait(); - } + @Override + public void protocolFinished() { + unregisterHandler(); } - - terminated.set(true); } - /** - * Handles an incoming SYNC request. The request contains an unencrypted - * BSP stream which is passed to a BSP IncomingSession - * - * @param req MailboxRequestTake + * Enables support for SYNC requests between mailbox and client + * this runs an incoming session which is fed data from incoming + * requests and runs and outgoing Session which writes SYNC requests + * <p> + * Note: The stream is not encrypted, since the MailboxProtocol is responsible + * for encrypting/decrypting requests */ - public void handleSync(MailboxRequestSync req) { - synchronized (hasActiveIncomingSyncSession) { - if (!hasActiveIncomingSyncSession.get()) { - syncInputStream = new FeedableSyncInputStream(); - ioExecutor.execute(() -> { - try { - syncSessionFactory - .createIncomingSession(contactId, - syncInputStream) - .run(); - } catch (IOException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - } - }); - hasActiveIncomingSyncSession.set(true); - } + private class SYNCHandler implements MailboxRequestHandler { + private FeedableSyncInputStream syncInputStream; + private MailboxSyncRequestWriter syncWriter; + private SyncSession outgoingSession; + + private SYNCHandler() { + syncWriter = new MailboxSyncRequestWriter(mailboxProtocol); + syncInputStream = new FeedableSyncInputStream(); + + // Run an incoming session to handle received SYNC requests + ioExecutor.execute(() -> { + try { + syncSessionFactory + .createIncomingSession(contactId, + syncInputStream) + .run(); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + }); + outgoingSession = syncSessionFactory + .createDuplexOutgoingSession(contactId, + transportMaxLatency, + transportMaxIdleTime, syncWriter); + // Run an outgoing session to send SYNC requests + ioExecutor.execute(() -> { + try { + outgoingSession.run(); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + }); + } - if (req.isEndOfStream()) { + /** + * Handles an incoming SYNC request. The request contains an unencrypted + * BSP stream which is passed to a BSP IncomingSession + * + * @param request MailboxRequest (SYNC) + */ + @Override + public void handleRequest(MailboxRequest request) { + MailboxRequestSync syncReq = (MailboxRequestSync) request; + + if (syncReq.isEndOfStream()) { syncInputStream.close(); return; } - if (syncInputStream.isClosed()) - return; + try { + syncInputStream.feed(syncReq.getSyncStream()); + } catch (IOException e) { + if (LOG.isLoggable(WARNING)) + LOG.warning(e.toString()); + } + } + + @Override + public MailboxMessage.TYPE getType() { + return SYNC; + } + @Override + public void protocolFinished() { + syncInputStream.close(); try { - syncInputStream.feed(req.getSyncStream()); + syncWriter.close(); } catch (IOException e) { if (LOG.isLoggable(INFO)) LOG.info(e.toString()); } + outgoingSession.interrupt(); + LOG.info("SYNC request handler terminated"); + unregisterHandler(); + } } - /** - * Handles an incoming TAKE request. The request contains an encrypted BSP - * stream. From this stream, the tag is read and a StreamReader is created. - * A BSP IncomingSession is created for the stream - * - * @param req MailboxRequestTake - * @throws MailboxSessionHandleException - */ - public void handleTake(MailboxRequestTake req) - throws MailboxSessionHandleException { - InputStream in = new ByteArrayInputStream(req.getEncryptedSyncStream()); - byte[] tag = new byte[TAG_LENGTH]; - try { - // read tag from input stream - int offset = 0; - while (offset < tag.length) { - int read = in.read(tag, offset, tag.length - offset); - if (read == -1) throw new EOFException(); - offset += read; + private class TAKEHandler implements MailboxRequestHandler { + /** + * Handles an incoming TAKE request. The request contains an encrypted BSP + * stream. From this stream, the tag is read and a StreamReader is created. + * A BSP IncomingSession is created for the stream + * + * @param request MailboxRequestTake + */ + @Override + public void handleRequest(MailboxRequest request) + throws ProtocolException { + MailboxRequestTake takeRequest = (MailboxRequestTake) request; + InputStream in = new ByteArrayInputStream( + takeRequest.getEncryptedSyncStream()); + byte[] tag = new byte[TAG_LENGTH]; + try { + // read tag from input stream + int offset = 0; + while (offset < tag.length) { + int read = in.read(tag, offset, tag.length - offset); + if (read == -1) throw new EOFException(); + offset += read; + } + + StreamContext ctx = + keyManager.getStreamContext(MailboxConstants.ID, tag); + + if (ctx == null) { + throw new IOException( + "Received stream with unrecognisable tag"); + } + + InputStream reader = + streamReaderFactory.createStreamReader(in, ctx); + + syncSessionFactory + .createIncomingSession(ctx.getContactId(), reader) + .run(); + } catch (DbException | IOException e) { + throw new ProtocolException(e.toString()); } - StreamContext ctx = - keyManager.getStreamContext(MailboxConstants.ID, tag); - - if (ctx == null) { - throw new IOException( - "Received stream with unrecognisable tag"); - } + } - InputStream reader = - streamReaderFactory.createStreamReader(in, ctx); + @Override + public MailboxMessage.TYPE getType() { + return TAKE; + } - syncSessionFactory.createIncomingSession(ctx.getContactId(), reader) - .run(); - } catch (DbException | IOException e) { - throw new MailboxSessionHandleException(e.toString()); + @Override + public void protocolFinished() { + unregisterHandler(); } } - public void handleStore(MailboxRequestStore req) - throws MailboxSessionHandleException { - // TODO: mailbox storage for Mailbox implementation - } + /** + * Get an encrypted Sync stream which can be stored on the mailbox by + * issuing a STORE request + * + * @param targetContactId The encrypted sync stream is generated for the given + * contactId + * @return Encrypted Sync stream + * @throws DbException + * @throws IOException + */ + protected byte[] getSyncStreamToStore(ContactId targetContactId) + throws DbException, IOException { + StreamContext ctx = + keyManager + .getStreamContext(targetContactId, MailboxConstants.ID); - public abstract void run() throws IOException; + if (ctx == null) + throw new IOException("Could not allocated stream context"); - protected abstract void handleProtocolException(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); - protected class MailboxSessionHandleException extends Exception { - public MailboxSessionHandleException(String error) { - super(error); - } + StreamWriter streamWriter = streamWriterFactory + .createStreamWriter(os, ctx); + + syncSessionFactory.createSimplexOutgoingSession(targetContactId, + MailboxConstants.MAX_LATENCY, streamWriter); + + return os.toByteArray(); } + } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java index 05fa3a3d2a49d9ea148a2ed6bca0987526f6b9f6..7538b14519953d8573b511143e67f0860b0270a6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java @@ -100,10 +100,8 @@ public class MailboxManagerImpl implements MailboxManager { @Override public void handleOwnerContactWithoutMailbox(MailboxInfo mailboxInfo) { - if (null == privateMailboxSession || - privateMailboxSession.isTerminated()) { + if (null == privateMailboxSession) return; - } ioExecutor.execute( () -> privateMailboxSession.handleOwnerContact(mailboxInfo)); @@ -192,15 +190,9 @@ public class MailboxManagerImpl implements MailboxManager { if (contactType == ContactType.PRIVATE_MAILBOX) privateMailboxSession = (PrivateMailboxSession) mailboxSession; - try { - mailboxSession.run(); - mailboxProtocol.stop(); - disposeConnection(false); - } catch (IOException e) { - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - disposeConnection(true); - } + mailboxSession.run(); + disposeConnection(false); + connectionRegistry .unregisterConnection(contactId, MailboxConstants.ID, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java index c794731a9eddb17bb319218b99b7102917bfb5dd..03e72afa6e30add9380f0b8982d7989741e60fdc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java @@ -6,20 +6,21 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; import java.io.IOException; import java.util.concurrent.Executor; +import java.util.logging.Level; import java.util.logging.Logger; +import static java.util.logging.Level.INFO; + class MailboxOwnerSession extends AbstractMailboxSession { private static final Logger LOG = Logger.getLogger(MailboxOwnerSession.class.getName()); - private final Executor ioExecutor; - private final MailboxProtocol mailboxProtocol; - private SyncSession duplexOutgoingSession; public MailboxOwnerSession(ContactId contactId, Executor ioExecutor, KeyManager keyManager, @@ -33,32 +34,19 @@ class MailboxOwnerSession extends AbstractMailboxSession { streamReaderFactory, mailboxProtocol, transportMaxLatency, transportMaxIdleTime, contactId); - this.ioExecutor = ioExecutor; - this.mailboxProtocol = mailboxProtocol; + registerSupportetRequest(MailboxMessage.TYPE.SYNC); + registerSupportetRequest(MailboxMessage.TYPE.TAKE); } @Override - public void run() throws IOException { - ioExecutor.execute(() -> readRequests()); - duplexOutgoingSession = createDuplexOutgoingSession(); - duplexOutgoingSession.run(); + public void run() { try { - endSession(); + waitForHandlersToFinish(); } catch (InterruptedException e) { - throw new IOException("Interrupted while ending session"); + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); } } - @Override - protected void handleProtocolException() { - if (duplexOutgoingSession != null) - duplexOutgoingSession.interrupt(); - } - @Override - public void handleTake(MailboxRequestTake mailboxRequestTake) - throws MailboxSessionHandleException { - throw new MailboxSessionHandleException( - "MailboxOwnerSession does not support the TAKE request"); - } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/PrivateMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/PrivateMailboxSession.java index 9085b834b89af45e0990361f98c821173d97dc0a..8212361ca80d540d76e6318bdd39eabe591ccf71 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/PrivateMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/PrivateMailboxSession.java @@ -8,6 +8,7 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; import org.briarproject.bramble.api.transport.StreamWriterFactory; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; @@ -25,7 +26,6 @@ class PrivateMailboxSession extends AbstractMailboxSession { private static final Logger LOG = Logger.getLogger(PrivateMailboxSession.class.getName()); private MailboxProtocol mailboxProtocol; - private SyncSession duplexOutgoingSession; public PrivateMailboxSession(ContactId contactId, Executor ioExecutor, KeyManager keyManager, @@ -39,6 +39,8 @@ class PrivateMailboxSession extends AbstractMailboxSession { streamReaderFactory, mailboxProtocol, transportMaxLatency, transportMaxIdleTime, contactId); this.mailboxProtocol = mailboxProtocol; + + registerSupportetRequest(MailboxMessage.TYPE.SYNC); } /** @@ -70,28 +72,12 @@ class PrivateMailboxSession extends AbstractMailboxSession { } @Override - public void run() throws IOException { - ioExecutor.execute(() -> super.readRequests()); - duplexOutgoingSession = createDuplexOutgoingSession(); - duplexOutgoingSession.run(); + public void run() { try { - endSession(); + waitForHandlersToFinish(); } catch (InterruptedException e) { - throw new IOException("Interrupted while ending session"); + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); } } - - @Override - protected void handleProtocolException() { - if (duplexOutgoingSession != null) - duplexOutgoingSession.interrupt(); - } - - @Override - public void handleStore(MailboxRequestStore req) - throws MailboxSessionHandleException { - throw new MailboxSessionHandleException( - "Private Mailbox Session does not support the TAKE request"); - } - } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java index e5db5b2f1e94b0a479772591a59df7cbe8885945..ad4916151a5961dcdb22e1f30c97acef19e81d08 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java @@ -3,10 +3,12 @@ package org.briarproject.bramble.mailbox.protocol; import org.briarproject.bramble.api.data.BdfList; import org.briarproject.bramble.api.data.BdfReader; import org.briarproject.bramble.api.data.BdfWriter; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE; import java.io.EOFException; import java.io.IOException; import java.net.ProtocolException; +import java.util.LinkedHashMap; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +40,9 @@ public class MailboxProtocol implements Runnable { volatile ConcurrentHashMap<Long, MailboxRequest> pendingRequests = new ConcurrentHashMap<>(); + private LinkedHashMap<TYPE, MailboxRequestHandler> requestHandlers = + new LinkedHashMap<>(); + private volatile AtomicBoolean stopped = new AtomicBoolean(false); private volatile Thread writingThread; private volatile Thread readingThread; @@ -51,6 +56,10 @@ public class MailboxProtocol implements Runnable { } + public void registerRequestHandler(MailboxRequestHandler handler) { + requestHandlers.put(handler.getType(), handler); + } + public void sendKeepAlive() throws IOException { // flush the writer without pending data to send a keepalive if (stopped.get()) @@ -94,35 +103,22 @@ public class MailboxProtocol implements Runnable { } } - public MailboxRequest getNextRequest() - throws InterruptedException, ProtocolException { - MailboxRequest req = inQueue.take(); - if (stopped.get()) - throw new ProtocolException("Protocol has stopped"); - - return req; - } - + /** + * Run the mailbox protocol. This function terminates when the underlying + * input/output streams are closed + */ @Override public void run() { ioExecutor.execute(() -> writeOutgoingMessages()); readIncomingMessages(); } - /** - * Call once the protocol should terminate, before the connection is closed - */ - public void stop() { - if (!stopped.compareAndSet(false, true)) - throw new RuntimeException("Stopping already stopped protocol"); - } - private void readIncomingMessages() { readingThread = Thread.currentThread(); BdfList bdfMsg; - MailboxMessage mailboxMessage; while (!stopped.get()) { + MailboxMessage mailboxMessage; try { if (mailboxBdfReader.eof()) throw new EOFException(); @@ -143,7 +139,40 @@ public class MailboxProtocol implements Runnable { if (mailboxMessage.getType() == RESPONSE) handleResponse((MailboxResponse) mailboxMessage); else - inQueue.add((MailboxRequest) mailboxMessage); + ioExecutor.execute(() -> handleRequest( + (MailboxRequest) mailboxMessage)); + } + } + + private void handleRequest(MailboxRequest req) { + // TODO: Limit number of requests which are handled parallel + String error = null; + MailboxRequestHandler handler = + requestHandlers.get(req.getType()); + if (handler != null) { + try { + handler.handleRequest(req); + } catch (ProtocolException e) { + error = e.toString(); + } + } else { + error = "Unsupported Request"; + if (LOG.isLoggable(WARNING)) + LOG.warning("Received unsupported Request: " + + req.getType().name()); + } + + if (!req.hasResponse()) + return; + + try { + if (error == null) + writeSucessResponse(req); + else + writeErrorResponse(req, error); + } catch (InterruptedException | IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); } } @@ -214,7 +243,9 @@ public class MailboxProtocol implements Runnable { "Connection closed"); entry.getValue().signalError(r.getErrorMessage()); } - } - + // notify all handlers + for (MailboxRequestHandler handler : requestHandlers.values()) + handler.protocolFinished(); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..18051b38e811feff777b40b39a8d951994d40dc7 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java @@ -0,0 +1,26 @@ +package org.briarproject.bramble.mailbox.protocol; + + +import java.net.ProtocolException; + +public interface MailboxRequestHandler { + /** + * Called if a request is received which matches the {@link MailboxMessage.TYPE} of the + * {@link MailboxRequestHandler#getType()} return + * + * @param request + * @throws ProtocolException If this function throws a Protocol exception, + * an error response will be sent + */ + void handleRequest(MailboxRequest request) throws ProtocolException; + + /** + * @return Must return the {@link MailboxMessage.TYPE} which can be handled by this handler + */ + MailboxMessage.TYPE getType(); + + /** + * Is called when the protocol is finished (once the underlying connection is closed) + */ + void protocolFinished(); +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java index d1e02bbbb2c846a391431f5355ee847855ac6da7..7ae21d14690dedf3bb0968090fc4cc380dd0787b 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java @@ -19,11 +19,15 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; import static java.util.concurrent.TimeUnit.SECONDS; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.SYNC; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; import static org.junit.Assert.assertEquals; public class MailboxProtocolIntegrationTest extends BrambleTestCase { @@ -74,28 +78,50 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { mailboxProtocol = new MailboxProtocol(ioExecutor, bdfWriter, bdfReader); } - @Test(expected = ProtocolException.class) + @Test public void testProtocolTerminationOnClosedConnection() throws IOException, InterruptedException { + AtomicBoolean finished = new AtomicBoolean(false); + + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request) { + synchronized (finished) { + while (finished.get() != true) { + try { + finished.wait(); + } catch (InterruptedException e) { + } + } + } + } + + @Override + public MailboxMessage.TYPE getType() { + return STORE; + } + + @Override + public void protocolFinished() { + + } + }); ioExecutor.execute(mailboxProtocol); MailboxRequest req = new MailboxRequestStore(new ContactId(123), "test".getBytes()); mailboxProtocol.writeRequest(req); - // read request to ensure writing thread is running - mailboxProtocol.getNextRequest(); - // close "connection" pipedOS.close(); - // And "End" request should be the next read request - try { - MailboxRequest recvReq = mailboxProtocol.getNextRequest(); - }finally { - // pending request should be marked as failed - assertEquals(false, req.awaitAndGetResponse()); - assertEquals("Connection closed", req.getError()); + // ensure that no response is written before IS is closed + synchronized (finished) { + finished.set(true); } + + // pending request should be marked as failed + assertEquals(false, req.awaitAndGetResponse()); + assertEquals("Connection closed", req.getError()); } @@ -110,50 +136,88 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { takeRequest(); endRequest(); } finally { - mailboxProtocol.stop(); pipedOS.close(); } } private void errorResponse() throws InterruptedException, IOException { + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request) + throws ProtocolException { + throw new ProtocolException("Error"); + } + + @Override + public MailboxMessage.TYPE getType() { + return TAKE; + } + + @Override + public void protocolFinished() { + } + }); // Test Error Response MailboxRequestTake req = new MailboxRequestTake(new ContactId(123), "Test".getBytes()); mailboxProtocol.writeRequest(req); - MailboxRequestTake recvReq = - (MailboxRequestTake) mailboxProtocol.getNextRequest(); - mailboxProtocol.writeErrorResponse(recvReq, "Error"); assertEquals(false, req.awaitAndGetResponse()); - assertEquals("Error", req.getError()); + assertEquals("java.net.ProtocolException: Error", req.getError()); } private void takeRequest() throws IOException, InterruptedException { + // Generate and write TAKE request MailboxRequestTake req = new MailboxRequestTake(new ContactId(123), "Test".getBytes()); - mailboxProtocol.writeRequest(req); - MailboxRequestTake recvReq = - (MailboxRequestTake) mailboxProtocol.getNextRequest(); + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request) { + assertEquals(req.getId(), request.getId()); + assertEquals(TAKE, request.getType()); + assertEquals(req.getContactId(), + ((MailboxRequestTake) request).getContactId()); + assertEquals("Test", new String(req.getEncryptedSyncStream())); + } + + @Override + public MailboxMessage.TYPE getType() { + return TAKE; + } + + @Override + public void protocolFinished() { + } + }); - assertEquals(req.getId(), recvReq.getId()); - assertEquals(req.getType(), recvReq.getType()); - assertEquals(req.getContactId(), recvReq.getContactId()); - assertEquals("Test", new String(req.getEncryptedSyncStream())); - - // Write Success Response - mailboxProtocol.writeSucessResponse(recvReq); + mailboxProtocol.writeRequest(req); assertEquals(true, req.awaitAndGetResponse()); } private void endRequest() throws IOException, InterruptedException { MailboxRequest req = new MailboxRequestEnd(); + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request){ + + assertEquals(req.getId(), request.getId()); + assertEquals(END, request.getType()); + } + + @Override + public MailboxMessage.TYPE getType() { + return END; + } + + @Override + public void protocolFinished() { + } + }); + mailboxProtocol.writeRequest(req); - MailboxRequest recvReq = mailboxProtocol.getNextRequest(); - assertEquals(req.getId(), recvReq.getId()); - assertEquals(req.getType(), recvReq.getType()); } @@ -161,31 +225,56 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { // Generate and write TAKE request MailboxRequestStore req = new MailboxRequestStore(new ContactId(123), "Test".getBytes()); - mailboxProtocol.writeRequest(req); - - MailboxRequestStore recvReq = - (MailboxRequestStore) mailboxProtocol.getNextRequest(); - assertEquals(req.getId(), recvReq.getId()); - assertEquals(req.getType(), recvReq.getType()); - assertEquals(req.getContactId(), recvReq.getContactId()); - assertEquals("Test", new String(req.getEncryptedSyncStream())); + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request) { + MailboxRequestStore recvReq = (MailboxRequestStore) request; + assertEquals(req.getId(), recvReq.getId()); + assertEquals(req.getType(), recvReq.getType()); + assertEquals(req.getContactId(), recvReq.getContactId()); + assertEquals("Test", new String(req.getEncryptedSyncStream())); + } + + @Override + public MailboxMessage.TYPE getType() { + return STORE; + } + + @Override + public void protocolFinished() { + } + }); - // Write Success Response - mailboxProtocol.writeSucessResponse(recvReq); + mailboxProtocol.writeRequest(req); assertEquals(true, req.awaitAndGetResponse()); } private void syncRequest() throws IOException, InterruptedException { MailboxRequestSync req = new MailboxRequestSync("Test".getBytes(), false); + + mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { + @Override + public void handleRequest(MailboxRequest request){ + MailboxRequestSync recvReq = (MailboxRequestSync)request; + assertEquals(req.getId(), recvReq.getId()); + assertEquals(SYNC, recvReq.getType()); + assertEquals("Test", new String(recvReq.getSyncStream())); + assertEquals(false, recvReq.isEndOfStream()); + + } + + @Override + public MailboxMessage.TYPE getType() { + return SYNC; + } + + @Override + public void protocolFinished() { + } + }); mailboxProtocol.writeRequest(req); - MailboxRequestSync recvReq = - (MailboxRequestSync) mailboxProtocol.getNextRequest(); - assertEquals(req.getId(), recvReq.getId()); - assertEquals(req.getType(), recvReq.getType()); - assertEquals("Test", new String(recvReq.getSyncStream())); - assertEquals(false, recvReq.isEndOfStream()); }