Skip to content
Snippets Groups Projects
Commit ee457142 authored by bontric's avatar bontric
Browse files

Overhaul mailboxprotocol concurrency and add a test

parent acdefddd
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,9 @@ import org.briarproject.bramble.api.data.BdfReader;
import org.briarproject.bramble.api.data.BdfWriter;
import org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
......@@ -17,6 +19,7 @@ import java.util.logging.Logger;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.RESPONSE;
public class MailboxProtocol implements Runnable {
......@@ -38,7 +41,7 @@ public class MailboxProtocol implements Runnable {
volatile ConcurrentHashMap<Long, MailboxRequest> pendingRequests =
new ConcurrentHashMap<>();
private volatile AtomicBoolean aborted = new AtomicBoolean(false);
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
private volatile Thread writingThread;
private volatile Thread readingThread;
......@@ -53,32 +56,41 @@ public class MailboxProtocol implements Runnable {
public void writeRequest(MailboxMessage req)
throws InterruptedException, IOException {
if (aborted.get())
throw new IOException("Connection closed");
outQueue.put(req);
if (req.hasResponse())
pendingRequests.put(req.getId(), (MailboxRequest) req);
writeMailboxMessage(req);
}
public void writeSucessResponse(MailboxRequest request)
public void writeSucessResponse(MailboxRequest req)
throws InterruptedException, IOException {
if (!request.hasResponse())
if (!req.hasResponse())
throw new RuntimeException(
"Trying to send a response for a request that does not expect a response");
if (aborted.get())
throw new IOException("Connection closed");
outQueue.put(request.createSuccessResponse());
writeMailboxMessage(req.createSuccessResponse());
}
public void writeErrorResponse(MailboxRequest request, String error)
public void writeErrorResponse(MailboxRequest req, String error)
throws InterruptedException, IOException {
if (!request.hasResponse())
if (!req.hasResponse())
throw new RuntimeException(
"Trying to send a response for a request that does not expect a response");
if (aborted.get())
throw new IOException("Connection closed");
writeMailboxMessage(req.createErrorResponse(error));
}
private void writeMailboxMessage(MailboxMessage msg)
throws InterruptedException, ProtocolException {
synchronized (outQueue) {
if (stopped.get())
throw new ProtocolException("Protocol has stopped");
outQueue.put(msg);
}
}
outQueue.put(request.createErrorResponse(error));
public MailboxRequest getNextRequest()
throws InterruptedException {
return inQueue.take();
}
@Override
......@@ -87,94 +99,119 @@ public class MailboxProtocol implements Runnable {
readIncomingMessages();
}
private void readIncomingMessages() {
synchronized (aborted) {
if (aborted.get())
return;
}
/**
* Call once the protocol should terminate, before the connection is closed
*/
public void stop() {
if (!stopped.compareAndSet(false, true))
throw new RuntimeException("Stopping already stopped protocol");
}
private void readIncomingMessages() {
readingThread = Thread.currentThread();
try {
BdfList msg;
while (!aborted.get()) {
try {
msg = mailboxBdfReader.readList();
TYPE mType = TYPE.fromLong(msg.getLong(0));
switch (mType) {
case RESPONSE:
handleResponse(new MailboxResponse(msg));
continue;
case SYNC:
inQueue.put(new MailboxRequestSync(msg));
break;
case TAKE:
inQueue.put(new MailboxRequestTake(msg));
break;
case END:
inQueue.put(new MailboxRequestEnd(msg));
break;
case STORE:
inQueue.put(new MailboxRequestStore(msg));
break;
default:
if (LOG.isLoggable(INFO))
LOG.warning("Invalid message Type received");
throw new FormatException();
}
} catch (IOException e) {
writingThread.interrupt();
throw e;
}
BdfList bdfMsg;
MailboxMessage mailboxMessage;
while (!stopped.get()) {
try {
if (mailboxBdfReader.eof())
throw new EOFException();
bdfMsg = mailboxBdfReader.readList();
} catch (IOException e) {
readingThread.interrupt();
handleIOException(true, e);
return;
}
try {
mailboxMessage = parseMessage(bdfMsg);
} catch (ProtocolException e) {
LOG.info(e.toString());
continue;
}
} catch (IOException | InterruptedException e) {
if (aborted.compareAndSet(false, true)) {
writingThread.interrupt();
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
terminatePendingRequests();
if (mailboxMessage.getType() == RESPONSE) {
handleResponse((MailboxResponse) mailboxMessage);
continue;
}
inQueue.add((MailboxRequest) mailboxMessage);
}
}
private void writeOutgoingMessages() {
synchronized (aborted) {
if (aborted.get())
return;
}
writingThread = Thread.currentThread();
MailboxMessage message;
try {
while (!aborted.get()) {
message = outQueue.take();
if (message.hasResponse())
pendingRequests
.put(message.getId(), (MailboxRequest) message);
while (!stopped.get()) {
try {
message = outQueue.take();
} catch (InterruptedException e) {
writingThread.interrupt();
handleIOException(false,
new IOException("Writer Interrupted"));
return;
}
try {
mailboxBdfWriter.writeList(message.toBdfList());
mailboxBdfWriter.flush();
}
} catch (IOException | InterruptedException e) {
if (aborted.compareAndSet(false, true)) {
readingThread.interrupt();
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
terminatePendingRequests();
} catch (IOException e) {
writingThread.interrupt();
handleIOException(false, e);
return;
}
}
}
public MailboxRequest getNextRequest()
throws InterruptedException, IOException {
if (aborted.get())
throw new IOException("Connection closed");
private void handleIOException(boolean isReadingThread, IOException e) {
if (isReadingThread && writingThread != null)
writingThread.interrupt();
return inQueue.take();
// Exception occurred after the protocol was stopped externally
if (stopped.getAndSet(true))
return;
if (LOG.isLoggable(INFO))
LOG.info("Mailbox Protocol was closed: " + e.toString());
// Signal that the session has ended to a thread waiting for requests
inQueue.clear();
inQueue.add(new MailboxRequestEnd());
// Ensure that threads waiting to write a request terminate
outQueue.clear();
// Signal the error to anyone waiting for a response
for (Entry<Long, MailboxRequest> entry : pendingRequests.entrySet()) {
MailboxResponse r = new MailboxResponse(entry.getKey(), false,
"Connection closed");
entry.getValue().signalError(r.getErrorMessage());
}
}
private MailboxMessage parseMessage(BdfList msg) throws ProtocolException {
try {
TYPE mType = TYPE.fromLong(msg.getLong(0));
switch (mType) {
case RESPONSE:
return new MailboxResponse(msg);
case SYNC:
return new MailboxRequestSync(msg);
case TAKE:
return new MailboxRequestTake(msg);
case END:
return new MailboxRequestEnd(msg);
case STORE:
return new MailboxRequestStore(msg);
default:
throw new ProtocolException(
"Unknown message Type received");
}
} catch (FormatException e) {
throw new ProtocolException("Invalid MailboxMessageReceived");
}
}
private void handleResponse(MailboxResponse msg) {
......@@ -193,25 +230,4 @@ public class MailboxProtocol implements Runnable {
}
public void terminatePendingRequests() {
for (Entry<Long, MailboxRequest> e : pendingRequests.entrySet()) {
MailboxResponse r = new MailboxResponse(e.getKey(), false,
"Request was terminated");
e.getValue().signalError(r.getErrorMessage());
}
}
public void stop() {
synchronized (aborted) {
if (aborted.compareAndSet(false, true)) {
if (readingThread != null)
readingThread.interrupt();
if (writingThread != null)
writingThread.interrupt();
terminatePendingRequests();
}
}
}
}
......@@ -13,7 +13,7 @@ public abstract class MailboxRequest implements MailboxMessage {
protected long msgId;
private TYPE type;
private String error;
private AtomicBoolean hasResponse = new AtomicBoolean(false);
private AtomicBoolean responseReceived = new AtomicBoolean(false);
private boolean wasSuccessful = false;
public MailboxRequest(TYPE type) {
......@@ -47,19 +47,19 @@ public abstract class MailboxRequest implements MailboxMessage {
public void signalSucess() {
synchronized (hasResponse) {
hasResponse.set(true);
synchronized (responseReceived) {
responseReceived.set(true);
wasSuccessful = true;
hasResponse.notifyAll();
responseReceived.notifyAll();
}
}
public void signalError(String error) {
synchronized (hasResponse) {
hasResponse.set(true);
synchronized (responseReceived) {
responseReceived.set(true);
wasSuccessful = false;
this.error = error;
hasResponse.notifyAll();
responseReceived.notifyAll();
}
}
......@@ -70,9 +70,13 @@ public abstract class MailboxRequest implements MailboxMessage {
* @throws InterruptedException
*/
public boolean awaitAndGetResponse() throws InterruptedException {
synchronized (hasResponse) {
while (!hasResponse.get()) {
hasResponse.wait();
if (!hasResponse())
throw new RuntimeException(
"Attempting to wait for a response of a request with no response");
synchronized (responseReceived) {
while (!responseReceived.get()) {
responseReceived.wait();
}
}
......@@ -80,7 +84,8 @@ public abstract class MailboxRequest implements MailboxMessage {
}
public String getError() {
if (!hasResponse.get() || (hasResponse.get() && !wasSuccessful))
if (!responseReceived.get() ||
(responseReceived.get() && wasSuccessful))
throw new RuntimeException(
"Trying to get Error from unfinished or successful request");
return error;
......
......@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.data.BdfWriter;
import org.briarproject.bramble.api.data.BdfWriterFactory;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.test.BrambleTestCase;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
......@@ -21,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Inject;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END;
import static org.junit.Assert.assertEquals;
public class MailboxProtocolIntegrationTest extends BrambleTestCase {
......@@ -35,15 +37,15 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
BdfWriterFactory bdfWriterFactory;
private final BdfWriter bdfWriter;
private final BdfReader bdfReader;
private final PipedOutputStream pipedOS;
private final PipedInputStream pipedIS;
private BdfWriter bdfWriter;
private BdfReader bdfReader;
private PipedOutputStream pipedOS;
private PipedInputStream pipedIS;
private final MailboxProtocol mailboxProtocol;
private MailboxProtocol mailboxProtocol;
public MailboxProtocolIntegrationTest() throws Exception {
public MailboxProtocolIntegrationTest() {
// The thread pool is unbounded, so use direct handoff
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
// Discard tasks that are submitted during shutdown
......@@ -57,6 +59,11 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
DaggerMailboxProtocolIntegrationTestComponent.builder().build();
component.inject(this);
}
@Before
public void initialize() throws IOException {
pipedIS = new PipedInputStream();
pipedOS = new PipedOutputStream(pipedIS);
......@@ -66,6 +73,28 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
mailboxProtocol = new MailboxProtocol(ioExecutor, bdfWriter, bdfReader);
}
@Test
public void testProtocolTerminationOnClosedConnection()
throws IOException, InterruptedException {
ioExecutor.execute(mailboxProtocol);
MailboxRequest req = new MailboxRequestStore(new ContactId(123), "test".getBytes());
mailboxProtocol.writeRequest(req);
// read request to ensure writing thread is running
mailboxProtocol.getNextRequest();
// close "connection"
pipedOS.close();
// And "End" request should be the next read request
MailboxRequest recvReq = mailboxProtocol.getNextRequest();
assertEquals(END, recvReq.getType());
// pending request should be marked as failed
assertEquals(false, req.awaitAndGetResponse());
assertEquals("Connection closed", req.getError());
}
@Test
public void testRequestsAndHandling()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment