From 7f2fd9f943b222ce24e24f51b846dd1b2df2bb8b Mon Sep 17 00:00:00 2001 From: bontric <bontric@riseup.net> Date: Fri, 14 Dec 2018 13:32:52 +0100 Subject: [PATCH] Implement chunking requests, update protocol integration test and remove old code from mailbox session --- .../mailbox/protocol/MailboxMessage.java | 12 +- .../mailbox/protocol/MailboxProtocol.java | 55 ++----- .../mailbox/protocol/MailboxRequest.java | 72 +++++---- .../mailbox/protocol/MailboxRequestChunk.java | 50 +++++++ .../mailbox/protocol/MailboxRequestEnd.java | 9 +- .../protocol/MailboxRequestHandler.java | 9 +- .../mailbox/protocol/MailboxRequestOffer.java | 75 ++++++++++ .../mailbox/protocol/MailboxRequestSync.java | 11 +- .../mailbox/protocol/MailboxResponse.java | 32 ++-- .../sessions/AbstractMailboxSession.java | 9 +- .../sessions/ContactMailboxSession.java | 82 +--------- .../sessions/MailboxContactSession.java | 52 +------ .../mailbox/sessions/MailboxOwnerSession.java | 51 +------ .../sessions/PrivateMailboxSession.java | 64 +------- .../MailboxProtocolIntegrationTest.java | 140 ++++++------------ 15 files changed, 272 insertions(+), 451 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestChunk.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestOffer.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxMessage.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxMessage.java index 3d02636d72..251a2a8a3e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxMessage.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxMessage.java @@ -30,12 +30,12 @@ public interface MailboxMessage { return new MailboxResponse(msg); case SYNC: return new MailboxRequestSync(msg); - case TAKE: - return new MailboxRequestTake(msg); case END: return new MailboxRequestEnd(msg); - case STORE: - return new MailboxRequestStore(msg); + case OFFER: + return new MailboxRequestOffer(msg); + case CHUNK: + return new MailboxRequestChunk(msg); default: throw new ProtocolException( "Unknown message Type received"); @@ -69,8 +69,8 @@ public interface MailboxMessage { enum TYPE { RESPONSE(0), END(1), - STORE(2), - TAKE(3), + OFFER(2), + CHUNK(3), SYNC(4); private int value; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java index c72d12c0f1..8c3afa3d4c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java @@ -72,23 +72,6 @@ public class MailboxProtocol implements Runnable { } - public void writeSucessResponse(MailboxRequest req) - throws InterruptedException, IOException { - if (!req.hasResponse()) - throw new RuntimeException( - "Trying to send a response for a request that does not expect a response"); - writeMailboxMessage(req.createSuccessResponse()); - } - - public void writeErrorResponse(MailboxRequest req, String error) - throws InterruptedException, IOException { - if (!req.hasResponse()) - throw new RuntimeException( - "Trying to send a response for a request that does not expect a response"); - - writeMailboxMessage(req.createErrorResponse(error)); - } - private void writeMailboxMessage(MailboxMessage msg) throws InterruptedException, ProtocolException { synchronized (outQueue) { @@ -158,35 +141,27 @@ public class MailboxProtocol implements Runnable { private void handleRequest(MailboxRequest req) { // TODO: Limit number of requests which are handled parallel - String error = null; MailboxRequestHandler handler = requestHandlers.get(req.getType()); - if (handler != null) { - try { - handler.handleRequest(req); - } catch (ProtocolException e) { - if (!req.hasResponse()) - throw new RuntimeException( - "Protocol exception was thrown for request without response"); - error = e.toString(); - } - } else { - error = "Unsupported Request"; + + if (handler == null) { if (LOG.isLoggable(WARNING)) LOG.warning("Received unsupported Request: " + req.getType().name()); + return; } - if (!req.hasResponse()) + if (!req.hasResponse()) { + handler.handleRequest(req); return; + } + + MailboxResponse rsp = handler.handleRequest(req); try { - if (error == null) - writeSucessResponse(req); - else - writeErrorResponse(req, error); - } catch (InterruptedException | IOException e) { + writeMailboxMessage(rsp); + } catch (InterruptedException | ProtocolException e) { if (LOG.isLoggable(INFO)) LOG.info(e.toString()); } @@ -201,11 +176,7 @@ public class MailboxProtocol implements Runnable { return; } - if (msg.isSuccess()) - req.signalSucess(); - else - req.signalError(msg.getErrorMessage()); - + req.signalResponse(msg); } private void writeOutgoingMessages() { @@ -255,9 +226,7 @@ public class MailboxProtocol implements Runnable { // Signal the error to anyone waiting for a response for (Entry<Long, MailboxRequest> entry : pendingRequests.entrySet()) { - MailboxResponse r = new MailboxResponse(entry.getKey(), - "Connection closed"); - entry.getValue().signalError(r.getErrorMessage()); + entry.getValue().signalError("Connection closed"); } // notify all handlers diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java index f913cf7b4f..2641ee012e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java @@ -3,6 +3,7 @@ package org.briarproject.bramble.mailbox.protocol; import org.briarproject.bramble.api.FormatException; import org.briarproject.bramble.api.data.BdfList; +import java.net.ProtocolException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -10,26 +11,22 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class MailboxRequest implements MailboxMessage { protected long msgId; - private TYPE type; - private String error; private AtomicBoolean responseReceived = new AtomicBoolean(false); - private boolean wasSuccessful = false; static AtomicLong msgIdCounter = new AtomicLong(); + private String error = null; - public MailboxRequest(TYPE type) { - this.type = type; + public MailboxRequest() { this.msgId = msgIdCounter.getAndIncrement(); } - public MailboxRequest(TYPE type, long msgId) throws FormatException { - this.type = type; + public MailboxRequest(long msgId) { this.msgId = msgId; } @Override public BdfList toBdfList() { - return BdfList.of(type.getValue(), getId(), getRequestBody()); + return BdfList.of(getType().getValue(), getId(), getRequestBody()); } @Override @@ -37,29 +34,39 @@ public abstract class MailboxRequest implements MailboxMessage { return msgId; } - @Override - public TYPE getType() { - return type; - } - - - public void signalSucess() { + public void signalResponse(MailboxResponse rsp) { + try { + handleResponse(rsp); + } catch (FormatException e) { + error ="Received invalid response to " + this.getType().toString() + "Request"; + } synchronized (responseReceived) { responseReceived.set(true); - wasSuccessful = true; responseReceived.notifyAll(); } } - public void signalError(String error) { + public void signalError(String err) { synchronized (responseReceived) { - responseReceived.set(true); - wasSuccessful = false; - this.error = error; - responseReceived.notifyAll(); + if (responseReceived.get() != true) { + responseReceived.set(true); + this.error = err; + responseReceived.notifyAll(); + } } } + /** + * Handles an incoming response for the given request + * + * @param rsp + * @NOTE: Override for response handling + */ + public void handleResponse(MailboxResponse rsp) + throws FormatException { + return; + } + /** * @return true if the message expects a response */ @@ -71,7 +78,7 @@ public abstract class MailboxRequest implements MailboxMessage { * @return true if response indicates success, false if not * @throws InterruptedException */ - public boolean awaitAndGetResponse() throws InterruptedException { + public void awaitResponse() throws InterruptedException, ProtocolException { if (!hasResponse()) throw new RuntimeException( "Attempting to wait for a response of a request with no response"); @@ -81,25 +88,12 @@ public abstract class MailboxRequest implements MailboxMessage { responseReceived.wait(); } - return wasSuccessful; - } - - public String getError() { - if (!responseReceived.get() || - (responseReceived.get() && wasSuccessful)) - throw new RuntimeException( - "Trying to get Error from unfinished or successful request"); - return error; - } - - public MailboxResponse createSuccessResponse() { - return new MailboxResponse(msgId, null); - } - - public MailboxResponse createErrorResponse(String error) { - return new MailboxResponse(msgId, error); + if (error != null) { + throw new ProtocolException(error); + } } protected abstract BdfList getRequestBody(); + } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestChunk.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestChunk.java new file mode 100644 index 0000000000..275a7eac54 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestChunk.java @@ -0,0 +1,50 @@ +package org.briarproject.bramble.mailbox.protocol; + +import org.briarproject.bramble.api.FormatException; +import org.briarproject.bramble.api.data.BdfList; + +public class MailboxRequestChunk extends MailboxRequest { + private byte[] chunk; + private long chunkIndex; + + public MailboxRequestChunk(BdfList msg) throws FormatException { + super(msg.getLong(1)); + BdfList body = msg.getList(2); + + if(body.size() != 2) { + throw new FormatException(); + } + + this.chunkIndex = body.getLong(0); + this.chunk = body.getRaw(1); + } + + public MailboxRequestChunk( long chunkIndex, byte[] chunk) { + super(); + this.chunk = chunk; + this.chunkIndex = chunkIndex; + } + + @Override + boolean hasResponse() { + return true; + } + + @Override + protected BdfList getRequestBody() { + return BdfList.of(chunkIndex, chunk); + } + + @Override + public TYPE getType() { + return TYPE.CHUNK; + } + + public byte[] getChunk() { + return chunk; + } + + public MailboxResponse makeResponse() { + return new MailboxResponse(getId(), null); + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java index f64f48bb49..6f6a17c4dc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java @@ -8,12 +8,11 @@ import org.briarproject.bramble.api.data.BdfList; */ public class MailboxRequestEnd extends MailboxRequest { public MailboxRequestEnd() { - super(TYPE.END); + super(); } - // Parse END request from bdfList public MailboxRequestEnd(BdfList msg) throws FormatException { - super(TYPE.END, msg.getLong(1)); + super(msg.getLong(1)); if(msg.getList(2).size() != 0) throw new FormatException(); } @@ -28,4 +27,8 @@ public class MailboxRequestEnd extends MailboxRequest { return false; } + @Override + public TYPE getType() { + return TYPE.END; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java index 4cd8b45938..d117ed544a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestHandler.java @@ -1,7 +1,6 @@ package org.briarproject.bramble.mailbox.protocol; - -import java.net.ProtocolException; +import javax.annotation.Nullable; public interface MailboxRequestHandler { /** @@ -9,11 +8,9 @@ public interface MailboxRequestHandler { * {@link MailboxRequestHandler#getType()} return * * @param request - * @throws ProtocolException If this function throws a Protocol exception, - * an error response will be sent - * NOTE: Ensure that the ProtocolException does not include sensitive information */ - void handleRequest(MailboxRequest request) throws ProtocolException; + @Nullable + MailboxResponse handleRequest(MailboxRequest request); /** * @return Must return the {@link MailboxMessage.TYPE} which can be handled by this handler diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestOffer.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestOffer.java new file mode 100644 index 0000000000..513408e358 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestOffer.java @@ -0,0 +1,75 @@ +package org.briarproject.bramble.mailbox.protocol; + +import org.briarproject.bramble.api.FormatException; +import org.briarproject.bramble.api.data.BdfList; + +import java.util.ArrayList; +import java.util.List; + +class MailboxRequestOffer extends MailboxRequest { + private List<Long> requestedChunks = new ArrayList<>(); + private long fileId; + private long numChunks; + + public MailboxRequestOffer(long fileId, long numChunks) { + super(); + this.numChunks = numChunks; + this.fileId = fileId; + } + + public MailboxRequestOffer(BdfList msg) throws FormatException { + super(msg.getLong(1)); + BdfList body = msg.getList(2); + + if (body.size() != 2) { + throw new FormatException(); + } + this.fileId = body.getLong(0); + this.numChunks = body.getLong(1); + } + + @Override + boolean hasResponse() { + return true; + } + + @Override + protected BdfList getRequestBody() { + return BdfList.of(fileId, numChunks); + } + + @Override + public TYPE getType() { + return TYPE.OFFER; + } + + @Override + public void handleResponse(MailboxResponse rsp) + throws FormatException { + BdfList chunks = rsp.getBody().getList(0); + + if (chunks == null){ + throw new FormatException(); + } + + for (int i = 0 ; i < chunks.size(); i ++){ + requestedChunks.add(chunks.getLong(i)); + } + } + + public MailboxResponse makeResponse(List<Long> chunks){ + return new MailboxResponse(getId(), BdfList.of(chunks)); + } + + public long getFileId() { + return this.fileId; + } + + public long getNumChunks() { + return numChunks; + } + + public List<Long> getRequestedChunks() { + return requestedChunks; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestSync.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestSync.java index 9d171b77d3..36e8a5cb3f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestSync.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestSync.java @@ -10,16 +10,16 @@ public class MailboxRequestSync extends MailboxRequest { private byte[] syncStream; public MailboxRequestSync(byte[] syncStream) { - super(TYPE.SYNC); + super(); this.syncStream = syncStream; } public MailboxRequestSync() { - super(TYPE.SYNC); + super(); this.syncStream = null; } public MailboxRequestSync(BdfList msg) throws FormatException { - super(TYPE.SYNC, msg.getLong(1)); + super(msg.getLong(1)); BdfList body = msg.getList(2); @@ -46,4 +46,9 @@ public class MailboxRequestSync extends MailboxRequest { public boolean isEndOfStream() { return syncStream == null; } + + @Override + public TYPE getType() { + return TYPE.SYNC; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxResponse.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxResponse.java index 60837bfdb3..41588b5bef 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxResponse.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxResponse.java @@ -8,23 +8,25 @@ import javax.annotation.Nullable; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.RESPONSE; public class MailboxResponse implements MailboxMessage { + @Nullable - private String errorMessage; + private BdfList body; + private long msgId; - public MailboxResponse(long msgId, @Nullable String errorMessage) { + public MailboxResponse(long msgId, @Nullable BdfList body) { this.msgId = msgId; - this.errorMessage = errorMessage; + this.body = body; } public MailboxResponse(BdfList msg) throws FormatException { this.msgId = msg.getLong(1); - BdfList body = msg.getList(2); - - if(body.size() != 1) - throw new FormatException(); + this.body = msg.getOptionalList(2); + } - errorMessage = body.getOptionalString(0); + @Override + public BdfList toBdfList() { + return BdfList.of(RESPONSE.getValue(), this.msgId, this.body); } @Override @@ -37,16 +39,8 @@ public class MailboxResponse implements MailboxMessage { return RESPONSE; } - @Override - public BdfList toBdfList() { - return BdfList.of(RESPONSE.getValue(), msgId, BdfList.of(errorMessage)); - } - - public String getErrorMessage() { - return errorMessage; - } - - public boolean isSuccess() { - return errorMessage == null; + @Nullable + public BdfList getBody() { + return body; } } \ No newline at end of file diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/AbstractMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/AbstractMailboxSession.java index d39e05cf54..fa2c92099e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/AbstractMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/AbstractMailboxSession.java @@ -19,6 +19,7 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestEnd; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync; +import org.briarproject.bramble.mailbox.protocol.MailboxResponse; import java.io.ByteArrayOutputStream; import java.io.EOFException; @@ -100,11 +101,12 @@ public abstract class AbstractMailboxSession implements MailboxSession { private class ENDHandler implements MailboxRequestHandler { @Override - public void handleRequest(MailboxRequest request) { + public MailboxResponse handleRequest(MailboxRequest request) { synchronized (remoteSessionFinished) { remoteSessionFinished.set(true); remoteSessionFinished.notifyAll(); } + return null; } @Override @@ -188,12 +190,12 @@ public abstract class AbstractMailboxSession implements MailboxSession { * @param request MailboxRequest (SYNC) */ @Override - public void handleRequest(MailboxRequest request) { + public MailboxResponse handleRequest(MailboxRequest request) { MailboxRequestSync syncReq = (MailboxRequestSync) request; if (syncReq.isEndOfStream()) { syncInputStream.close(); - return; + return null; } try { @@ -202,6 +204,7 @@ public abstract class AbstractMailboxSession implements MailboxSession { if (LOG.isLoggable(WARNING)) LOG.warning(e.toString()); } + return null; } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/ContactMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/ContactMailboxSession.java index 9ba85e25c7..429445d994 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/ContactMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/ContactMailboxSession.java @@ -12,8 +12,6 @@ import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -24,7 +22,6 @@ import java.util.logging.Logger; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; import static org.briarproject.bramble.util.LogUtils.logException; /** @@ -55,38 +52,12 @@ public class ContactMailboxSession extends AbstractMailboxSession { this.streamReaderFactory = streamReaderFactory; this.mailboxProtocol = mailboxProtocol; this.contactId = contactId; - - mailboxProtocol.registerRequestHandler(new TAKEHandler()); + //TODO: register CHUNK/OFFER handler } @Override public void run() { - // Get messages to send and formulate a STORE request - byte[] encryptedStream; - try { - // Get sync stream if available - encryptedStream = getSyncStreamToStore(contactId); - } catch (IOException | DbException e) { - logException(LOG, WARNING, e); - return; - } - - if (encryptedStream == null) - return; - - // Send a STORE request and wait for a response - MailboxRequestStore req = new MailboxRequestStore(encryptedStream); - try { - mailboxProtocol.writeRequest(req); - if (!req.awaitAndGetResponse()) - throw new ProtocolException(req.getError()); - } catch (InterruptedException | IOException e) { - // TODO: Handle STORE request error! - if (LOG.isLoggable(WARNING)) - LOG.info(e.toString()); - return; - } - + // TODO: Implement chunking store/send // End the session by issuing and END request and wait for peer // to send END request try { @@ -98,52 +69,5 @@ public class ContactMailboxSession extends AbstractMailboxSession { } - /** - * Handles an incoming TAKE request. The request contains an encrypted BSP - * stream. From this stream, the tag is read and a StreamReader is created. - * A BSP IncomingSession is created for the stream - */ - private class TAKEHandler implements MailboxRequestHandler { - - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - MailboxRequestTake takeRequest = (MailboxRequestTake) request; - - if (takeRequest.hasContactId()) - throw new ProtocolException( - "TAKE request from contact mailbox must not have contactId"); - - InputStream in = new ByteArrayInputStream( - takeRequest.getEncryptedSyncStream()); - try { - StreamContext ctx = readTag(in); - - if (!ctx.getContactId().equals(contactId)) - throw new ProtocolException( - "Contact Id from stream does not match expected contactId!"); - - InputStream reader = - streamReaderFactory.createStreamReader(in, ctx); - - syncSessionFactory - .createIncomingSession(ctx.getContactId(), reader) - .run(); - - } catch (DbException | IOException e) { - throw new ProtocolException("Received invalid Stream"); - } - - } - - @Override - public MailboxMessage.TYPE getType() { - return TAKE; - } - - @Override - public void protocolFinished() { - } - } - + // Implement CHUNK/OFFER handlers } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxContactSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxContactSession.java index 04c9336870..893b70ccca 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxContactSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxContactSession.java @@ -12,8 +12,6 @@ import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; import java.io.IOException; import java.net.ProtocolException; @@ -22,7 +20,6 @@ import java.util.logging.Logger; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; import static org.briarproject.bramble.util.LogUtils.logException; /** @@ -52,7 +49,8 @@ public class MailboxContactSession extends AbstractMailboxSession { this.contactId = contactId; this.mailboxStorage = mailboxStorage; - mailboxProtocol.registerRequestHandler(new STOREHandler()); + + // TODO: register offer/chunk handlers } @Override @@ -82,49 +80,9 @@ public class MailboxContactSession extends AbstractMailboxSession { // Get stored sync stream if available MailboxStorageStream nextStream = mailboxStorage.getStream(contactId, false); - - while (nextStream != null) { - // Send TAKE request and delete stream if request was successfull - MailboxRequestTake req = - new MailboxRequestTake(nextStream.getStream()); - mailboxProtocol.writeRequest(req); - - if (req.awaitAndGetResponse()) - // delete stream if request was successful - mailboxStorage.deleteStream(nextStream); - else - //TODO: stop sending after failed request? - throw new ProtocolException(req.getError()); - - // Get next stored sync stream if available - nextStream = mailboxStorage.getStream(contactId, false); - - } + // TODO: implement chunking store/ send } +} - private class STOREHandler implements MailboxRequestHandler { - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - MailboxRequestStore storeReq = (MailboxRequestStore) request; - - if (storeReq.hasContactId()) - throw new ProtocolException( - "Contact Id is implicit for contact to mailbox STORE requests"); - - mailboxStorage - .storeStream(contactId, storeReq.getEncryptedSyncStream(), - true); - } - - @Override - public MailboxMessage.TYPE getType() { - return STORE; - } - - @Override - public void protocolFinished() { - } - } -} +// TODO: implement offer/chunk handlers diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxOwnerSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxOwnerSession.java index 505aa71f81..5c430013f8 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxOwnerSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/MailboxOwnerSession.java @@ -12,8 +12,6 @@ import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; import java.io.IOException; import java.net.ProtocolException; @@ -21,8 +19,6 @@ import java.util.concurrent.Executor; import java.util.logging.Logger; import static java.util.logging.Level.INFO; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; - /** * This session represents the mailbox side of a connection from the owner * of the mailbox @@ -50,8 +46,8 @@ public class MailboxOwnerSession extends AbstractMailboxSession { this.mailboxProtocol = mailboxProtocol; this.mailboxStorage = mailboxStorage; + //TODO register CHUNK/OFFER handlers enableSYNCHandling(); - mailboxProtocol.registerRequestHandler(new STOREHandler()); } @Override @@ -72,49 +68,8 @@ public class MailboxOwnerSession extends AbstractMailboxSession { // Get stored sync stream if available MailboxStorageStream nextStream = mailboxStorage.getStreamForOwner(); - - // FIXME: If there are no more streams to send at this time, we should - // wait - while (nextStream != null) { - // Send TAKE request and delete stream if request was successfull - MailboxRequestTake req = - new MailboxRequestTake(nextStream.getContactId(), nextStream.getStream()); - mailboxProtocol.writeRequest(req); - - if (req.awaitAndGetResponse()) - // delete stream if request was successful - mailboxStorage.deleteStream(nextStream); - else - //TODO: stop sending after failed request? - throw new ProtocolException(req.getError()); - - // Get next stored sync stream if available - nextStream = mailboxStorage.getStreamForOwner(); - } + // TODO implement Chunk send/store } - private class STOREHandler implements MailboxRequestHandler { - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - MailboxRequestStore storeReq = (MailboxRequestStore) request; - - if (!storeReq.hasContactId()) - throw new ProtocolException( - "Contact Id must be set for owner to mailbox STORE requests"); - - mailboxStorage - .storeStream(storeReq.getContactId(), - storeReq.getEncryptedSyncStream(), false); - } - - @Override - public MailboxMessage.TYPE getType() { - return STORE; - } - - @Override - public void protocolFinished() { - } - } + //TODO implement CHUNK/OFFER handlers } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/PrivateMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/PrivateMailboxSession.java index 6ef03aebdd..8bffebb127 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/PrivateMailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/sessions/PrivateMailboxSession.java @@ -13,8 +13,6 @@ import org.briarproject.bramble.mailbox.protocol.MailboxMessage; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; import org.briarproject.bramble.mailbox.protocol.MailboxRequest; import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; -import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -25,7 +23,6 @@ import java.util.logging.Logger; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; import static org.briarproject.bramble.util.LogUtils.logException; /** @@ -55,7 +52,8 @@ public class PrivateMailboxSession extends AbstractMailboxSession { this.streamReaderFactory = streamReaderFactory; this.mailboxProtocol = mailboxProtocol; - mailboxProtocol.registerRequestHandler(new TAKEHandler()); + //TODO: Register CHUNK/OFFER handlers + // Register SYNC handler and run outgoing SYNC session enableSYNCHandling(); } @@ -73,21 +71,7 @@ public class PrivateMailboxSession extends AbstractMailboxSession { logException(LOG, WARNING, e); return; } - - if (encryptedStream == null) - return; - MailboxRequest req = - new MailboxRequestStore(mailboxInfo.getAliasId(), - encryptedStream); - try { - mailboxProtocol.writeRequest(req); - if (!req.awaitAndGetResponse()) - throw new ProtocolException(req.getError()); - } catch (InterruptedException | IOException e) { - // TODO: Handle STORE error! - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - } + // TODO: Implement Chunking store/ send } @Override @@ -101,45 +85,5 @@ public class PrivateMailboxSession extends AbstractMailboxSession { logException(LOG, WARNING, e); } } - - /** - * Handles an incoming TAKE request. The request contains an encrypted BSP - * stream. From this stream, the tag is read and a StreamReader is created. - * A BSP IncomingSession is created for the stream - */ - private class TAKEHandler implements MailboxRequestHandler { - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - MailboxRequestTake takeRequest = (MailboxRequestTake) request; - InputStream in = new ByteArrayInputStream( - takeRequest.getEncryptedSyncStream()); - try { - StreamContext ctx = readTag(in); - - if (ctx.getContactId() != (takeRequest.getContactId())) - throw new ProtocolException( - "Stream does not match expected contactId"); - - InputStream reader = - streamReaderFactory.createStreamReader(in, ctx); - - syncSessionFactory - .createIncomingSession(ctx.getContactId(), reader) - .run(); - } catch (DbException | IOException e) { - logException(LOG, WARNING, e); - throw new ProtocolException("Received invalid stream"); - } - } - - @Override - public MailboxMessage.TYPE getType() { - return TAKE; - } - - @Override - public void protocolFinished() { - } - } + // TODO: Implement handlers for OFFER/CHUNK requests } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java index c7072d8f02..f0d76247bf 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java @@ -14,6 +14,8 @@ import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.net.ProtocolException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionHandler; @@ -25,10 +27,11 @@ import javax.inject.Inject; import static java.util.concurrent.TimeUnit.SECONDS; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.CHUNK; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.SYNC; -import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.OFFER; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class MailboxProtocolIntegrationTest extends BrambleTestCase { @@ -85,7 +88,7 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { @Override - public void handleRequest(MailboxRequest request) { + public MailboxResponse handleRequest(MailboxRequest request) { synchronized (finished) { while (finished.get() != true) { try { @@ -94,11 +97,12 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { } } } + return null; } @Override public MailboxMessage.TYPE getType() { - return STORE; + return CHUNK; } @Override @@ -109,8 +113,7 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { ioExecutor.execute(mailboxProtocol); - MailboxRequest req = - new MailboxRequestStore(new ContactId(123), "test".getBytes()); + MailboxRequest req = new MailboxRequestChunk(0, new byte[] {0}); mailboxProtocol.writeRequest(req); // close "connection" @@ -122,59 +125,33 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { } // pending request should be marked as failed - assertEquals(false, req.awaitAndGetResponse()); - assertEquals("Connection closed", req.getError()); - } - - - @Test - public void errorResponse() throws InterruptedException, IOException { - mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { - @Override - public void handleRequest(MailboxRequest request) - throws ProtocolException { - throw new ProtocolException("Error"); - } - - @Override - public MailboxMessage.TYPE getType() { - return TAKE; - } - - @Override - public void protocolFinished() { - } - }); - - - ioExecutor.execute(mailboxProtocol); - // Test Error Response - MailboxRequestTake req = - new MailboxRequestTake("Test".getBytes()); - mailboxProtocol.writeRequest(req); + try { + req.awaitResponse(); + fail("Exception not thrown // Connection not closed"); + } catch (ProtocolException e) { + assertEquals( "Connection closed", e.getMessage()); + } - assertEquals(false, req.awaitAndGetResponse()); - assertEquals("java.net.ProtocolException: Error", req.getError()); - pipedOS.close(); } @Test - public void takeRequest() throws IOException, InterruptedException { - // Generate and write TAKE request - MailboxRequestTake req = - new MailboxRequestTake("Test".getBytes()); + public void chunkRequest() throws IOException, InterruptedException { + // Generate and write CHUNK request + MailboxRequestChunk req = + new MailboxRequestChunk(0, "Test".getBytes()); mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { @Override - public void handleRequest(MailboxRequest request) { + public MailboxResponse handleRequest(MailboxRequest request) { assertEquals(req.getId(), request.getId()); - assertEquals(TAKE, request.getType()); - assertEquals("Test", new String(req.getEncryptedSyncStream())); + assertEquals(CHUNK, request.getType()); + assertEquals("Test", new String(req.getChunk())); + return req.makeResponse(); } @Override public MailboxMessage.TYPE getType() { - return TAKE; + return CHUNK; } @Override @@ -186,7 +163,8 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { mailboxProtocol.writeRequest(req); - assertEquals(true, req.awaitAndGetResponse()); + req.awaitResponse(); + pipedOS.close(); } @@ -195,10 +173,10 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { MailboxRequest req = new MailboxRequestEnd(); mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { @Override - public void handleRequest(MailboxRequest request) { - + public MailboxResponse handleRequest(MailboxRequest request) { assertEquals(req.getId(), request.getId()); assertEquals(END, request.getType()); + return null; } @Override @@ -219,25 +197,26 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { } @Test - public void storeRequestWithId() throws IOException, InterruptedException { - // Generate and write TAKE request - MailboxRequestStore req = - new MailboxRequestStore(new ContactId(123), "Test".getBytes()); + public void offerRequest() + throws IOException, InterruptedException { + + MailboxRequestOffer req = + new MailboxRequestOffer(0, 1); mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { @Override - public void handleRequest(MailboxRequest request) { - MailboxRequestStore recvReq = (MailboxRequestStore) request; + public MailboxResponse handleRequest(MailboxRequest request) { + MailboxRequestOffer recvReq = (MailboxRequestOffer) request; assertEquals(req.getId(), recvReq.getId()); assertEquals(req.getType(), recvReq.getType()); - assertEquals(true, recvReq.hasContactId()); - assertEquals(req.getContactId(), recvReq.getContactId()); - assertEquals("Test", new String(req.getEncryptedSyncStream())); + assertEquals(recvReq.getFileId(), 0); + assertEquals(recvReq.getNumChunks(), 1); + return recvReq.makeResponse(Arrays.asList((long) 0)); } @Override public MailboxMessage.TYPE getType() { - return STORE; + return OFFER; } @Override @@ -248,41 +227,12 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { ioExecutor.execute(mailboxProtocol); mailboxProtocol.writeRequest(req); - assertEquals(true, req.awaitAndGetResponse()); - - pipedOS.close(); - } - - @Test - public void storeRequestWithoutId() throws IOException, InterruptedException { - // Generate and write TAKE request - MailboxRequestStore req = - new MailboxRequestStore("Test".getBytes()); - - mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { - @Override - public void handleRequest(MailboxRequest request) { - MailboxRequestStore recvReq = (MailboxRequestStore) request; - assertEquals(req.getId(), recvReq.getId()); - assertEquals(req.getType(), recvReq.getType()); - assertEquals(false, recvReq.hasContactId()); - assertEquals("Test", new String(req.getEncryptedSyncStream())); - } - @Override - public MailboxMessage.TYPE getType() { - return STORE; - } - - @Override - public void protocolFinished() { - } - }); - - ioExecutor.execute(mailboxProtocol); + req.awaitResponse(); - mailboxProtocol.writeRequest(req); - assertEquals(true, req.awaitAndGetResponse()); + assertEquals(req.getRequestedChunks().size(), 1); + long index = req.getRequestedChunks().get(0); + assertEquals(index, 0); pipedOS.close(); } @@ -294,13 +244,13 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { mailboxProtocol.registerRequestHandler(new MailboxRequestHandler() { @Override - public void handleRequest(MailboxRequest request) { + public MailboxResponse handleRequest(MailboxRequest request) { MailboxRequestSync recvReq = (MailboxRequestSync) request; assertEquals(req.getId(), recvReq.getId()); assertEquals(SYNC, recvReq.getType()); assertEquals("Test", new String(recvReq.getSyncStream())); assertEquals(false, recvReq.isEndOfStream()); - + return null; } @Override -- GitLab