From 31427796f4ea0abc6d7ab003800c52a81ac09c6c Mon Sep 17 00:00:00 2001 From: bontric <benjohnwie@gmail.com> Date: Wed, 5 Sep 2018 20:41:39 +0200 Subject: [PATCH] Mailbox Protocol fixes --- .../bramble/mailbox/MailboxSession.java | 18 ++-- .../mailbox/protocol/MailboxProtocol.java | 91 ++++++++++++------- .../mailbox/protocol/MailboxRequest.java | 19 ++-- .../mailbox/protocol/MailboxRequestEnd.java | 6 +- .../mailbox/protocol/MailboxRequestTake.java | 5 +- 5 files changed, 86 insertions(+), 53 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSession.java index aecd0eb55..c8c2f1bb0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSession.java @@ -172,7 +172,7 @@ abstract class MailboxSession implements Runnable { public void readRequests() { while (!remoteSessionFinished.get()) { try { - MailboxRequest req = mailboxProtocol.readNextRequest(); + MailboxRequest req = mailboxProtocol.getNextRequest(); ioExecutor.execute(() -> handleRequest(req)); } catch (IOException | InterruptedException e) { // TODO: Handle interruptedException differently ? @@ -201,17 +201,23 @@ abstract class MailboxSession implements Runnable { "Unsupported request type"); } } catch (MailboxSessionHandleException e) { - logException(LOG, WARNING, e); + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); error = e.toString(); } if (!req.hasResponse()) return; - if (error == null) - mailboxProtocol.writeSucessResponse(req); - else - mailboxProtocol.writeErrorResponse(req, error); + try { + if (error == null) + mailboxProtocol.writeSucessResponse(req); + else + mailboxProtocol.writeErrorResponse(req, error); + } catch (InterruptedException | IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } } private void handleEnd(MailboxRequestEnd req) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java index 746602320..201cf29c9 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocol.java @@ -17,7 +17,6 @@ import java.util.logging.Logger; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -import static org.briarproject.bramble.util.LogUtils.logException; public class MailboxProtocol implements Runnable { @@ -32,7 +31,7 @@ public class MailboxProtocol implements Runnable { volatile BlockingQueue<MailboxMessage> outQueue = new LinkedBlockingQueue<>(); - volatile BlockingQueue<BdfList> inQueue = + volatile BlockingQueue<MailboxRequest> inQueue = new LinkedBlockingQueue<>(); @@ -59,18 +58,27 @@ public class MailboxProtocol implements Runnable { outQueue.put(req); } - public void writeSucessResponse(MailboxRequest request) { + public void writeSucessResponse(MailboxRequest request) + throws InterruptedException, IOException { if (!request.hasResponse()) throw new RuntimeException( "Trying to send a response for a request that does not expect a response"); - request.makeSuccessResponse(); + + if (aborted.get()) + throw new IOException("Connection closed"); + outQueue.put(request.makeSuccessResponse()); } - public void writeErrorResponse(MailboxRequest request, String error) { + public void writeErrorResponse(MailboxRequest request, String error) + throws InterruptedException, IOException { if (!request.hasResponse()) throw new RuntimeException( "Trying to send a response for a request that does not expect a response"); - request.makeErrorResponse(error); + + if (aborted.get()) + throw new IOException("Connection closed"); + + outQueue.put(request.makeErrorResponse(error)); } @Override @@ -80,13 +88,41 @@ public class MailboxProtocol implements Runnable { } private void readIncomingMessages() { + synchronized (aborted) { + if (aborted.get()) + return; + } + readingThread = Thread.currentThread(); try { BdfList msg; while (!aborted.get()) { try { msg = mailboxBdfReader.readList(); - inQueue.put(msg); + TYPE mType = TYPE.fromLong(msg.getLong(0)); + + switch (mType) { + case RESPONSE: + handleResponse(new MailboxResponse(msg)); + continue; + case SYNC: + inQueue.put(new MailboxRequestSync(msg)); + break; + case TAKE: + inQueue.put(new MailboxRequestTake(msg)); + break; + case END: + inQueue.put(new MailboxRequestEnd(msg)); + break; + case STORE: + inQueue.put(new MailboxRequestStore(msg)); + break; + default: + if (LOG.isLoggable(INFO)) + LOG.warning("Invalid message Type received"); + throw new FormatException(); + } + } catch (IOException e) { writingThread.interrupt(); throw e; @@ -104,9 +140,13 @@ public class MailboxProtocol implements Runnable { } private void writeOutgoingMessages() { - MailboxMessage message; + synchronized (aborted) { + if (aborted.get()) + return; + } writingThread = Thread.currentThread(); + MailboxMessage message; try { while (!aborted.get()) { message = outQueue.take(); @@ -128,32 +168,13 @@ public class MailboxProtocol implements Runnable { } } - public MailboxRequest readNextRequest() + public MailboxRequest getNextRequest() throws InterruptedException, IOException { if (aborted.get()) throw new IOException("Connection closed"); - BdfList msg = inQueue.take(); - - TYPE mType = TYPE.fromLong(msg.getLong(0)); - - switch (mType) { - case RESPONSE: - handleResponse(new MailboxResponse(msg)); - case SYNC: - return new MailboxRequestSync(msg); - case TAKE: - return new MailboxRequestTake(msg); - case STORE: - return new MailboxRequestStore(msg); - default: - if (LOG.isLoggable(INFO)) - LOG.warning("Invalid message Type received"); - throw new FormatException(); - } - - + return inQueue.take(); } private void handleResponse(MailboxResponse msg) { @@ -181,10 +202,14 @@ public class MailboxProtocol implements Runnable { } public void stop() { - if (aborted.compareAndSet(false, true)) { - readingThread.interrupt(); - writingThread.interrupt(); - terminatePendingRequests(); + synchronized (aborted) { + if (aborted.compareAndSet(false, true)) { + if (readingThread != null) + readingThread.interrupt(); + if (writingThread != null) + writingThread.interrupt(); + terminatePendingRequests(); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java index 78740702f..a68618369 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequest.java @@ -2,23 +2,20 @@ package org.briarproject.bramble.mailbox.protocol; import org.briarproject.bramble.api.FormatException; import org.briarproject.bramble.api.data.BdfList; +import org.briarproject.bramble.api.mailbox.MailboxConstants; -import java.io.IOException; import java.util.Arrays; import java.util.Random; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class MailboxRequest implements MailboxMessage { - long msgId; - boolean wasSuccessfull; + protected long msgId; private TYPE type; private String error; - private Boolean hasResponse = false; + private AtomicBoolean hasResponse = new AtomicBoolean(false); + private boolean wasSuccessfull = false; public MailboxRequest(TYPE type) { this.type = type; @@ -51,14 +48,14 @@ public abstract class MailboxRequest implements MailboxMessage { public void signalSucess() { synchronized (hasResponse) { - hasResponse = true; + hasResponse.set(true); wasSuccessfull = true; hasResponse.notifyAll(); } } public void signalError(String error){ synchronized (hasResponse) { - hasResponse = true; + hasResponse.set(true); wasSuccessfull = false; this.error = error; hasResponse.notifyAll(); @@ -67,7 +64,7 @@ public abstract class MailboxRequest implements MailboxMessage { public void awaitResponse() throws InterruptedException { synchronized (hasResponse) { - while (!hasResponse) { + while (!hasResponse.get()) { hasResponse.wait(); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java index 3d77220a5..dde179f1e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestEnd.java @@ -11,9 +11,13 @@ public class MailboxRequestEnd extends MailboxRequest { super(TYPE.END); } + public MailboxRequestEnd(BdfList msg) throws FormatException { + super(msg); + } + @Override protected BdfList makeRequestBody() { - return null; + return new BdfList(); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestTake.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestTake.java index a45433733..5fae3ae92 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestTake.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/protocol/MailboxRequestTake.java @@ -11,8 +11,9 @@ public class MailboxRequestTake extends MailboxRequest { private ContactId contactId; private byte [] encryptedSyncStream = null; - public MailboxRequestTake(byte [] encryptedSyncStream){ + public MailboxRequestTake(ContactId contactId,byte [] encryptedSyncStream){ super(TYPE.TAKE); + this.contactId = contactId; this.encryptedSyncStream = encryptedSyncStream; } @@ -23,7 +24,7 @@ public class MailboxRequestTake extends MailboxRequest { @Override protected BdfList makeRequestBody() { - return new BdfList(Arrays.asList(contactId, encryptedSyncStream)); + return new BdfList(Arrays.asList(contactId.getInt(), encryptedSyncStream)); } @Override -- GitLab