Commit 7f2fd9f9 authored by bontric's avatar bontric

Implement chunking requests, update protocol integration test and

remove old code from mailbox session
parent 92e47a16
Pipeline #2907 passed with stage
in 14 minutes and 44 seconds
......@@ -30,12 +30,12 @@ public interface MailboxMessage {
return new MailboxResponse(msg);
case SYNC:
return new MailboxRequestSync(msg);
case TAKE:
return new MailboxRequestTake(msg);
case END:
return new MailboxRequestEnd(msg);
case STORE:
return new MailboxRequestStore(msg);
case OFFER:
return new MailboxRequestOffer(msg);
case CHUNK:
return new MailboxRequestChunk(msg);
default:
throw new ProtocolException(
"Unknown message Type received");
......@@ -69,8 +69,8 @@ public interface MailboxMessage {
enum TYPE {
RESPONSE(0),
END(1),
STORE(2),
TAKE(3),
OFFER(2),
CHUNK(3),
SYNC(4);
private int value;
......
......@@ -72,23 +72,6 @@ public class MailboxProtocol implements Runnable {
}
public void writeSucessResponse(MailboxRequest req)
throws InterruptedException, IOException {
if (!req.hasResponse())
throw new RuntimeException(
"Trying to send a response for a request that does not expect a response");
writeMailboxMessage(req.createSuccessResponse());
}
public void writeErrorResponse(MailboxRequest req, String error)
throws InterruptedException, IOException {
if (!req.hasResponse())
throw new RuntimeException(
"Trying to send a response for a request that does not expect a response");
writeMailboxMessage(req.createErrorResponse(error));
}
private void writeMailboxMessage(MailboxMessage msg)
throws InterruptedException, ProtocolException {
synchronized (outQueue) {
......@@ -158,35 +141,27 @@ public class MailboxProtocol implements Runnable {
private void handleRequest(MailboxRequest req) {
// TODO: Limit number of requests which are handled parallel
String error = null;
MailboxRequestHandler handler =
requestHandlers.get(req.getType());
if (handler != null) {
try {
handler.handleRequest(req);
} catch (ProtocolException e) {
if (!req.hasResponse())
throw new RuntimeException(
"Protocol exception was thrown for request without response");
error = e.toString();
}
} else {
error = "Unsupported Request";
if (handler == null) {
if (LOG.isLoggable(WARNING))
LOG.warning("Received unsupported Request: " +
req.getType().name());
return;
}
if (!req.hasResponse())
if (!req.hasResponse()) {
handler.handleRequest(req);
return;
}
MailboxResponse rsp = handler.handleRequest(req);
try {
if (error == null)
writeSucessResponse(req);
else
writeErrorResponse(req, error);
} catch (InterruptedException | IOException e) {
writeMailboxMessage(rsp);
} catch (InterruptedException | ProtocolException e) {
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
}
......@@ -201,11 +176,7 @@ public class MailboxProtocol implements Runnable {
return;
}
if (msg.isSuccess())
req.signalSucess();
else
req.signalError(msg.getErrorMessage());
req.signalResponse(msg);
}
private void writeOutgoingMessages() {
......@@ -255,9 +226,7 @@ public class MailboxProtocol implements Runnable {
// Signal the error to anyone waiting for a response
for (Entry<Long, MailboxRequest> entry : pendingRequests.entrySet()) {
MailboxResponse r = new MailboxResponse(entry.getKey(),
"Connection closed");
entry.getValue().signalError(r.getErrorMessage());
entry.getValue().signalError("Connection closed");
}
// notify all handlers
......
......@@ -3,6 +3,7 @@ package org.briarproject.bramble.mailbox.protocol;
import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.data.BdfList;
import java.net.ProtocolException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
......@@ -10,26 +11,22 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class MailboxRequest implements MailboxMessage {
protected long msgId;
private TYPE type;
private String error;
private AtomicBoolean responseReceived = new AtomicBoolean(false);
private boolean wasSuccessful = false;
static AtomicLong msgIdCounter = new AtomicLong();
private String error = null;
public MailboxRequest(TYPE type) {
this.type = type;
public MailboxRequest() {
this.msgId = msgIdCounter.getAndIncrement();
}
public MailboxRequest(TYPE type, long msgId) throws FormatException {
this.type = type;
public MailboxRequest(long msgId) {
this.msgId = msgId;
}
@Override
public BdfList toBdfList() {
return BdfList.of(type.getValue(), getId(), getRequestBody());
return BdfList.of(getType().getValue(), getId(), getRequestBody());
}
@Override
......@@ -37,29 +34,39 @@ public abstract class MailboxRequest implements MailboxMessage {
return msgId;
}
@Override
public TYPE getType() {
return type;
}
public void signalSucess() {
public void signalResponse(MailboxResponse rsp) {
try {
handleResponse(rsp);
} catch (FormatException e) {
error ="Received invalid response to " + this.getType().toString() + "Request";
}
synchronized (responseReceived) {
responseReceived.set(true);
wasSuccessful = true;
responseReceived.notifyAll();
}
}
public void signalError(String error) {
public void signalError(String err) {
synchronized (responseReceived) {
responseReceived.set(true);
wasSuccessful = false;
this.error = error;
responseReceived.notifyAll();
if (responseReceived.get() != true) {
responseReceived.set(true);
this.error = err;
responseReceived.notifyAll();
}
}
}
/**
* Handles an incoming response for the given request
*
* @param rsp
* @NOTE: Override for response handling
*/
public void handleResponse(MailboxResponse rsp)
throws FormatException {
return;
}
/**
* @return true if the message expects a response
*/
......@@ -71,7 +78,7 @@ public abstract class MailboxRequest implements MailboxMessage {
* @return true if response indicates success, false if not
* @throws InterruptedException
*/
public boolean awaitAndGetResponse() throws InterruptedException {
public void awaitResponse() throws InterruptedException, ProtocolException {
if (!hasResponse())
throw new RuntimeException(
"Attempting to wait for a response of a request with no response");
......@@ -81,25 +88,12 @@ public abstract class MailboxRequest implements MailboxMessage {
responseReceived.wait();
}
return wasSuccessful;
}
public String getError() {
if (!responseReceived.get() ||
(responseReceived.get() && wasSuccessful))
throw new RuntimeException(
"Trying to get Error from unfinished or successful request");
return error;
}
public MailboxResponse createSuccessResponse() {
return new MailboxResponse(msgId, null);
}
public MailboxResponse createErrorResponse(String error) {
return new MailboxResponse(msgId, error);
if (error != null) {
throw new ProtocolException(error);
}
}
protected abstract BdfList getRequestBody();
}
package org.briarproject.bramble.mailbox.protocol;
import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.data.BdfList;
public class MailboxRequestChunk extends MailboxRequest {
private byte[] chunk;
private long chunkIndex;
public MailboxRequestChunk(BdfList msg) throws FormatException {
super(msg.getLong(1));
BdfList body = msg.getList(2);
if(body.size() != 2) {
throw new FormatException();
}
this.chunkIndex = body.getLong(0);
this.chunk = body.getRaw(1);
}
public MailboxRequestChunk( long chunkIndex, byte[] chunk) {
super();
this.chunk = chunk;
this.chunkIndex = chunkIndex;
}
@Override
boolean hasResponse() {
return true;
}
@Override
protected BdfList getRequestBody() {
return BdfList.of(chunkIndex, chunk);
}
@Override
public TYPE getType() {
return TYPE.CHUNK;
}
public byte[] getChunk() {
return chunk;
}
public MailboxResponse makeResponse() {
return new MailboxResponse(getId(), null);
}
}
......@@ -8,12 +8,11 @@ import org.briarproject.bramble.api.data.BdfList;
*/
public class MailboxRequestEnd extends MailboxRequest {
public MailboxRequestEnd() {
super(TYPE.END);
super();
}
// Parse END request from bdfList
public MailboxRequestEnd(BdfList msg) throws FormatException {
super(TYPE.END, msg.getLong(1));
super(msg.getLong(1));
if(msg.getList(2).size() != 0)
throw new FormatException();
}
......@@ -28,4 +27,8 @@ public class MailboxRequestEnd extends MailboxRequest {
return false;
}
@Override
public TYPE getType() {
return TYPE.END;
}
}
package org.briarproject.bramble.mailbox.protocol;
import java.net.ProtocolException;
import javax.annotation.Nullable;
public interface MailboxRequestHandler {
/**
......@@ -9,11 +8,9 @@ public interface MailboxRequestHandler {
* {@link MailboxRequestHandler#getType()} return
*
* @param request
* @throws ProtocolException If this function throws a Protocol exception,
* an error response will be sent
* NOTE: Ensure that the ProtocolException does not include sensitive information
*/
void handleRequest(MailboxRequest request) throws ProtocolException;
@Nullable
MailboxResponse handleRequest(MailboxRequest request);
/**
* @return Must return the {@link MailboxMessage.TYPE} which can be handled by this handler
......
package org.briarproject.bramble.mailbox.protocol;
import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.data.BdfList;
import java.util.ArrayList;
import java.util.List;
class MailboxRequestOffer extends MailboxRequest {
private List<Long> requestedChunks = new ArrayList<>();
private long fileId;
private long numChunks;
public MailboxRequestOffer(long fileId, long numChunks) {
super();
this.numChunks = numChunks;
this.fileId = fileId;
}
public MailboxRequestOffer(BdfList msg) throws FormatException {
super(msg.getLong(1));
BdfList body = msg.getList(2);
if (body.size() != 2) {
throw new FormatException();
}
this.fileId = body.getLong(0);
this.numChunks = body.getLong(1);
}
@Override
boolean hasResponse() {
return true;
}
@Override
protected BdfList getRequestBody() {
return BdfList.of(fileId, numChunks);
}
@Override
public TYPE getType() {
return TYPE.OFFER;
}
@Override
public void handleResponse(MailboxResponse rsp)
throws FormatException {
BdfList chunks = rsp.getBody().getList(0);
if (chunks == null){
throw new FormatException();
}
for (int i = 0 ; i < chunks.size(); i ++){
requestedChunks.add(chunks.getLong(i));
}
}
public MailboxResponse makeResponse(List<Long> chunks){
return new MailboxResponse(getId(), BdfList.of(chunks));
}
public long getFileId() {
return this.fileId;
}
public long getNumChunks() {
return numChunks;
}
public List<Long> getRequestedChunks() {
return requestedChunks;
}
}
......@@ -10,16 +10,16 @@ public class MailboxRequestSync extends MailboxRequest {
private byte[] syncStream;
public MailboxRequestSync(byte[] syncStream) {
super(TYPE.SYNC);
super();
this.syncStream = syncStream;
}
public MailboxRequestSync() {
super(TYPE.SYNC);
super();
this.syncStream = null;
}
public MailboxRequestSync(BdfList msg) throws FormatException {
super(TYPE.SYNC, msg.getLong(1));
super(msg.getLong(1));
BdfList body = msg.getList(2);
......@@ -46,4 +46,9 @@ public class MailboxRequestSync extends MailboxRequest {
public boolean isEndOfStream() {
return syncStream == null;
}
@Override
public TYPE getType() {
return TYPE.SYNC;
}
}
......@@ -8,23 +8,25 @@ import javax.annotation.Nullable;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.RESPONSE;
public class MailboxResponse implements MailboxMessage {
@Nullable
private String errorMessage;
private BdfList body;
private long msgId;
public MailboxResponse(long msgId, @Nullable String errorMessage) {
public MailboxResponse(long msgId, @Nullable BdfList body) {
this.msgId = msgId;
this.errorMessage = errorMessage;
this.body = body;
}
public MailboxResponse(BdfList msg) throws FormatException {
this.msgId = msg.getLong(1);
BdfList body = msg.getList(2);
if(body.size() != 1)
throw new FormatException();
this.body = msg.getOptionalList(2);
}
errorMessage = body.getOptionalString(0);
@Override
public BdfList toBdfList() {
return BdfList.of(RESPONSE.getValue(), this.msgId, this.body);
}
@Override
......@@ -37,16 +39,8 @@ public class MailboxResponse implements MailboxMessage {
return RESPONSE;
}
@Override
public BdfList toBdfList() {
return BdfList.of(RESPONSE.getValue(), msgId, BdfList.of(errorMessage));
}
public String getErrorMessage() {
return errorMessage;
}
public boolean isSuccess() {
return errorMessage == null;
@Nullable
public BdfList getBody() {
return body;
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequest;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestEnd;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync;
import org.briarproject.bramble.mailbox.protocol.MailboxResponse;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
......@@ -100,11 +101,12 @@ public abstract class AbstractMailboxSession implements MailboxSession {
private class ENDHandler implements MailboxRequestHandler {
@Override
public void handleRequest(MailboxRequest request) {
public MailboxResponse handleRequest(MailboxRequest request) {
synchronized (remoteSessionFinished) {
remoteSessionFinished.set(true);
remoteSessionFinished.notifyAll();
}
return null;
}
@Override
......@@ -188,12 +190,12 @@ public abstract class AbstractMailboxSession implements MailboxSession {
* @param request MailboxRequest (SYNC)
*/
@Override
public void handleRequest(MailboxRequest request) {
public MailboxResponse handleRequest(MailboxRequest request) {
MailboxRequestSync syncReq = (MailboxRequestSync) request;
if (syncReq.isEndOfStream()) {
syncInputStream.close();
return;
return null;
}
try {
......@@ -202,6 +204,7 @@ public abstract class AbstractMailboxSession implements MailboxSession {
if (LOG.isLoggable(WARNING))
LOG.warning(e.toString());
}
return null;
}
@Override
......
......@@ -12,8 +12,6 @@ import org.briarproject.bramble.mailbox.protocol.MailboxMessage;
import org.briarproject.bramble.mailbox.protocol.MailboxProtocol;
import org.briarproject.bramble.mailbox.protocol.MailboxRequest;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake;
import java.io.ByteArrayInputStream;
import java.io.IOException;
......@@ -24,7 +22,6 @@ import java.util.logging.Logger;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE;
import static org.briarproject.bramble.util.LogUtils.logException;
/**
......@@ -55,38 +52,12 @@ public class ContactMailboxSession extends AbstractMailboxSession {
this.streamReaderFactory = streamReaderFactory;
this.mailboxProtocol = mailboxProtocol;
this.contactId = contactId;
mailboxProtocol.registerRequestHandler(new TAKEHandler());
//TODO: register CHUNK/OFFER handler
}
@Override
public void run() {
// Get messages to send and formulate a STORE request
byte[] encryptedStream;
try {
// Get sync stream if available
encryptedStream = getSyncStreamToStore(contactId);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
return;
}
if (encryptedStream == null)
return;
// Send a STORE request and wait for a response
MailboxRequestStore req = new MailboxRequestStore(encryptedStream);
try {
mailboxProtocol.writeRequest(req);
if (!req.awaitAndGetResponse())
throw new ProtocolException(req.getError());
} catch (InterruptedException | IOException e) {
// TODO: Handle STORE request error!
if (LOG.isLoggable(WARNING))
LOG.info(e.toString());
return;
}
// TODO: Implement chunking store/send
// End the session by issuing and END request and wait for peer
// to send END request
try {
......@@ -98,52 +69,5 @@ public class ContactMailboxSession extends AbstractMailboxSession {
}
/**
* Handles an incoming TAKE request. The request contains an encrypted BSP
* stream. From this stream, the tag is read and a StreamReader is created.
* A BSP IncomingSession is created for the stream
*/
private class TAKEHandler implements MailboxRequestHandler {
@Override
public void handleRequest(MailboxRequest request)
throws ProtocolException {
MailboxRequestTake takeRequest = (MailboxRequestTake) request;
if (takeRequest.hasContactId())
throw new ProtocolException(
"TAKE request from contact mailbox must not have contactId");
InputStream in = new ByteArrayInputStream(
takeRequest.getEncryptedSyncStream());
try {
StreamContext ctx = readTag(in);
if (!ctx.getContactId().equals(contactId))
throw new ProtocolException(