diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java index 0f15587109fa5bab77bb1d7c90790ced9ef76dc7..12fbff73d7ec8e6f9f0e90eb97be40ce3b685160 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java @@ -6,7 +6,9 @@ import org.briarproject.bramble.api.data.BdfReader; import org.briarproject.bramble.api.data.BdfWriter; import org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE; +import java.io.EOFException; import java.io.IOException; +import java.net.ProtocolException; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -17,6 +19,7 @@ import java.util.logging.Logger; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.RESPONSE; public class MailboxProtocol implements Runnable { @@ -38,7 +41,7 @@ public class MailboxProtocol implements Runnable { volatile ConcurrentHashMap<Long, MailboxRequest> pendingRequests = new ConcurrentHashMap<>(); - private volatile AtomicBoolean aborted = new AtomicBoolean(false); + private volatile AtomicBoolean stopped = new AtomicBoolean(false); private volatile Thread writingThread; private volatile Thread readingThread; @@ -53,32 +56,41 @@ public class MailboxProtocol implements Runnable { public void writeRequest(MailboxMessage req) throws InterruptedException, IOException { - if (aborted.get()) - throw new IOException("Connection closed"); - outQueue.put(req); + if (req.hasResponse()) + pendingRequests.put(req.getId(), (MailboxRequest) req); + writeMailboxMessage(req); + } - public void writeSucessResponse(MailboxRequest request) + public void writeSucessResponse(MailboxRequest req) throws InterruptedException, IOException { - if (!request.hasResponse()) + if (!req.hasResponse()) throw new RuntimeException( "Trying to send a response for a request that does not expect a response"); - - if (aborted.get()) - throw new IOException("Connection closed"); - outQueue.put(request.createSuccessResponse()); + writeMailboxMessage(req.createSuccessResponse()); } - public void writeErrorResponse(MailboxRequest request, String error) + public void writeErrorResponse(MailboxRequest req, String error) throws InterruptedException, IOException { - if (!request.hasResponse()) + if (!req.hasResponse()) throw new RuntimeException( "Trying to send a response for a request that does not expect a response"); - if (aborted.get()) - throw new IOException("Connection closed"); + writeMailboxMessage(req.createErrorResponse(error)); + } + + private void writeMailboxMessage(MailboxMessage msg) + throws InterruptedException, ProtocolException { + synchronized (outQueue) { + if (stopped.get()) + throw new ProtocolException("Protocol has stopped"); + outQueue.put(msg); + } + } - outQueue.put(request.createErrorResponse(error)); + public MailboxRequest getNextRequest() + throws InterruptedException { + return inQueue.take(); } @Override @@ -87,94 +99,119 @@ public class MailboxProtocol implements Runnable { readIncomingMessages(); } - private void readIncomingMessages() { - synchronized (aborted) { - if (aborted.get()) - return; - } + /** + * Call once the protocol should terminate, before the connection is closed + */ + public void stop() { + if (!stopped.compareAndSet(false, true)) + throw new RuntimeException("Stopping already stopped protocol"); + } + private void readIncomingMessages() { readingThread = Thread.currentThread(); - try { - BdfList msg; - while (!aborted.get()) { - try { - msg = mailboxBdfReader.readList(); - TYPE mType = TYPE.fromLong(msg.getLong(0)); - - switch (mType) { - case RESPONSE: - handleResponse(new MailboxResponse(msg)); - continue; - case SYNC: - inQueue.put(new MailboxRequestSync(msg)); - break; - case TAKE: - inQueue.put(new MailboxRequestTake(msg)); - break; - case END: - inQueue.put(new MailboxRequestEnd(msg)); - break; - case STORE: - inQueue.put(new MailboxRequestStore(msg)); - break; - default: - if (LOG.isLoggable(INFO)) - LOG.warning("Invalid message Type received"); - throw new FormatException(); - } - - } catch (IOException e) { - writingThread.interrupt(); - throw e; - } + BdfList bdfMsg; + MailboxMessage mailboxMessage; + + while (!stopped.get()) { + try { + if (mailboxBdfReader.eof()) + throw new EOFException(); + bdfMsg = mailboxBdfReader.readList(); + } catch (IOException e) { + readingThread.interrupt(); + handleIOException(true, e); + return; + } + try { + mailboxMessage = parseMessage(bdfMsg); + } catch (ProtocolException e) { + LOG.info(e.toString()); + continue; } - } catch (IOException | InterruptedException e) { - if (aborted.compareAndSet(false, true)) { - writingThread.interrupt(); - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - terminatePendingRequests(); + + if (mailboxMessage.getType() == RESPONSE) { + handleResponse((MailboxResponse) mailboxMessage); + continue; } + + inQueue.add((MailboxRequest) mailboxMessage); } } private void writeOutgoingMessages() { - synchronized (aborted) { - if (aborted.get()) - return; - } writingThread = Thread.currentThread(); - MailboxMessage message; - try { - while (!aborted.get()) { - message = outQueue.take(); - if (message.hasResponse()) - pendingRequests - .put(message.getId(), (MailboxRequest) message); + while (!stopped.get()) { + try { + message = outQueue.take(); + } catch (InterruptedException e) { + writingThread.interrupt(); + handleIOException(false, + new IOException("Writer Interrupted")); + return; + } + try { mailboxBdfWriter.writeList(message.toBdfList()); mailboxBdfWriter.flush(); - } - } catch (IOException | InterruptedException e) { - if (aborted.compareAndSet(false, true)) { - readingThread.interrupt(); - if (LOG.isLoggable(INFO)) - LOG.info(e.toString()); - terminatePendingRequests(); + } catch (IOException e) { + writingThread.interrupt(); + handleIOException(false, e); + return; } } } - public MailboxRequest getNextRequest() - throws InterruptedException, IOException { - if (aborted.get()) - throw new IOException("Connection closed"); + private void handleIOException(boolean isReadingThread, IOException e) { + if (isReadingThread && writingThread != null) + writingThread.interrupt(); - return inQueue.take(); + // Exception occurred after the protocol was stopped externally + if (stopped.getAndSet(true)) + return; + + if (LOG.isLoggable(INFO)) + LOG.info("Mailbox Protocol was closed: " + e.toString()); + + // Signal that the session has ended to a thread waiting for requests + inQueue.clear(); + inQueue.add(new MailboxRequestEnd()); + + // Ensure that threads waiting to write a request terminate + outQueue.clear(); + + // Signal the error to anyone waiting for a response + for (Entry<Long, MailboxRequest> entry : pendingRequests.entrySet()) { + MailboxResponse r = new MailboxResponse(entry.getKey(), false, + "Connection closed"); + entry.getValue().signalError(r.getErrorMessage()); + } + } + + private MailboxMessage parseMessage(BdfList msg) throws ProtocolException { + try { + TYPE mType = TYPE.fromLong(msg.getLong(0)); + switch (mType) { + case RESPONSE: + return new MailboxResponse(msg); + case SYNC: + return new MailboxRequestSync(msg); + case TAKE: + return new MailboxRequestTake(msg); + case END: + return new MailboxRequestEnd(msg); + case STORE: + return new MailboxRequestStore(msg); + default: + throw new ProtocolException( + "Unknown message Type received"); + } + } catch (FormatException e) { + throw new ProtocolException("Invalid MailboxMessageReceived"); + } } private void handleResponse(MailboxResponse msg) { @@ -193,25 +230,4 @@ public class MailboxProtocol implements Runnable { } - public void terminatePendingRequests() { - for (Entry<Long, MailboxRequest> e : pendingRequests.entrySet()) { - MailboxResponse r = new MailboxResponse(e.getKey(), false, - "Request was terminated"); - e.getValue().signalError(r.getErrorMessage()); - } - } - - public void stop() { - synchronized (aborted) { - if (aborted.compareAndSet(false, true)) { - if (readingThread != null) - readingThread.interrupt(); - if (writingThread != null) - writingThread.interrupt(); - terminatePendingRequests(); - } - } - } - - } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java index 425d97c6622642320f6b457a5efc044ae8aeb813..1a6313eabfbc6fbcd15335712a4d921f85f56150 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java @@ -13,7 +13,7 @@ public abstract class MailboxRequest implements MailboxMessage { protected long msgId; private TYPE type; private String error; - private AtomicBoolean hasResponse = new AtomicBoolean(false); + private AtomicBoolean responseReceived = new AtomicBoolean(false); private boolean wasSuccessful = false; public MailboxRequest(TYPE type) { @@ -47,19 +47,19 @@ public abstract class MailboxRequest implements MailboxMessage { public void signalSucess() { - synchronized (hasResponse) { - hasResponse.set(true); + synchronized (responseReceived) { + responseReceived.set(true); wasSuccessful = true; - hasResponse.notifyAll(); + responseReceived.notifyAll(); } } public void signalError(String error) { - synchronized (hasResponse) { - hasResponse.set(true); + synchronized (responseReceived) { + responseReceived.set(true); wasSuccessful = false; this.error = error; - hasResponse.notifyAll(); + responseReceived.notifyAll(); } } @@ -70,9 +70,13 @@ public abstract class MailboxRequest implements MailboxMessage { * @throws InterruptedException */ public boolean awaitAndGetResponse() throws InterruptedException { - synchronized (hasResponse) { - while (!hasResponse.get()) { - hasResponse.wait(); + if (!hasResponse()) + throw new RuntimeException( + "Attempting to wait for a response of a request with no response"); + + synchronized (responseReceived) { + while (!responseReceived.get()) { + responseReceived.wait(); } } @@ -80,7 +84,8 @@ public abstract class MailboxRequest implements MailboxMessage { } public String getError() { - if (!hasResponse.get() || (hasResponse.get() && !wasSuccessful)) + if (!responseReceived.get() || + (responseReceived.get() && wasSuccessful)) throw new RuntimeException( "Trying to get Error from unfinished or successful request"); return error; diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java index d7eeecdf25b406ec0997ad3985a6e9b8d00638e2..38a061187aadb42112250d8ffb0c7170c5f4f1a4 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.data.BdfWriter; import org.briarproject.bramble.api.data.BdfWriterFactory; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.test.BrambleTestCase; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -21,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor; import javax.inject.Inject; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.END; import static org.junit.Assert.assertEquals; public class MailboxProtocolIntegrationTest extends BrambleTestCase { @@ -35,15 +37,15 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { BdfWriterFactory bdfWriterFactory; - private final BdfWriter bdfWriter; - private final BdfReader bdfReader; - private final PipedOutputStream pipedOS; - private final PipedInputStream pipedIS; + private BdfWriter bdfWriter; + private BdfReader bdfReader; + private PipedOutputStream pipedOS; + private PipedInputStream pipedIS; - private final MailboxProtocol mailboxProtocol; + private MailboxProtocol mailboxProtocol; - public MailboxProtocolIntegrationTest() throws Exception { + public MailboxProtocolIntegrationTest() { // The thread pool is unbounded, so use direct handoff BlockingQueue<Runnable> queue = new SynchronousQueue<>(); // Discard tasks that are submitted during shutdown @@ -57,6 +59,11 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { DaggerMailboxProtocolIntegrationTestComponent.builder().build(); component.inject(this); + } + + @Before + public void initialize() throws IOException { + pipedIS = new PipedInputStream(); pipedOS = new PipedOutputStream(pipedIS); @@ -66,6 +73,28 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { mailboxProtocol = new MailboxProtocol(ioExecutor, bdfWriter, bdfReader); } + @Test + public void testProtocolTerminationOnClosedConnection() + throws IOException, InterruptedException { + ioExecutor.execute(mailboxProtocol); + MailboxRequest req = new MailboxRequestStore(new ContactId(123), "test".getBytes()); + mailboxProtocol.writeRequest(req); + + // read request to ensure writing thread is running + mailboxProtocol.getNextRequest(); + + // close "connection" + pipedOS.close(); + + // And "End" request should be the next read request + MailboxRequest recvReq = mailboxProtocol.getNextRequest(); + assertEquals(END, recvReq.getType()); + + // pending request should be marked as failed + assertEquals(false, req.awaitAndGetResponse()); + assertEquals("Connection closed", req.getError()); + } + @Test public void testRequestsAndHandling()