diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java index 4d8d3a3588d9aeb3f6a8beb65ad5f47831dd3711..e40b145795d942469b46725ce38250de439a03ba 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java @@ -77,7 +77,7 @@ public class MailboxContactSession extends AbstractMailboxSession { void sendStoredStreams() throws IOException, InterruptedException { // Get stored sync stream if available MailboxStorage.MailboxStorageStream nextStream = - mailboxStorage.getStream(contactId); + mailboxStorage.getStream(contactId, false); while (nextStream != null) { // Send TAKE request and delete stream if request was successfull @@ -93,7 +93,7 @@ public class MailboxContactSession extends AbstractMailboxSession { throw new ProtocolException(req.getError()); // Get next stored sync stream if available - nextStream = mailboxStorage.getStream(contactId); + nextStream = mailboxStorage.getStream(contactId, false); } } @@ -110,7 +110,7 @@ public class MailboxContactSession extends AbstractMailboxSession { "Contact Id is implicit for contact to mailbox STORE requests"); mailboxStorage - .storeStream(contactId, storeReq.getEncryptedSyncStream()); + .storeStream(contactId, storeReq.getEncryptedSyncStream(), true); } @Override @@ -119,8 +119,6 @@ public class MailboxContactSession extends AbstractMailboxSession { } @Override - public void protocolFinished() { - mailboxStorage.close(); - } + public void protocolFinished() {} } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java index 257eb884d7b0ee4df2cd5ce09d5282ee45423713..b3c99d3e55acb4e787a4e85f1447715186e52162 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxOwnerSession.java @@ -23,13 +23,15 @@ import static java.util.logging.Level.INFO; import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; /** - * This session represents a connection from the mailbox owner to their own mailbox + * This session represents the mailbox side of a connection from the owner + * of the mailbox */ public class MailboxOwnerSession extends AbstractMailboxSession { private static final Logger LOG = Logger.getLogger(MailboxOwnerSession.class.getName()); private final MailboxStorage mailboxStorage; + private MailboxProtocol mailboxProtocol; public MailboxOwnerSession(ContactId contactId, Executor ioExecutor, KeyManager keyManager, @@ -37,11 +39,12 @@ public class MailboxOwnerSession extends AbstractMailboxSession { StreamWriterFactory streamWriterFactory, StreamReaderFactory streamReaderFactory, MailboxProtocol mailboxProtocol, int transportMaxLatency, - int transportMaxIdleTime, MailboxStorage mailboxStorage) { + int transportMaxIdleTime, MailboxStorage mailboxStorage, DatabaseComponent db) { - super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory, + super(ioExecutor, db, keyManager, syncSessionFactory, streamWriterFactory, streamReaderFactory, mailboxProtocol, transportMaxLatency, transportMaxIdleTime, contactId); + this.mailboxProtocol = mailboxProtocol; this.mailboxStorage = mailboxStorage; @@ -54,13 +57,37 @@ public class MailboxOwnerSession extends AbstractMailboxSession { @Override public void run() { try { + sendStoredStreams(); awaitSYNCHandlerFinished(); - } catch (InterruptedException e) { + } catch (InterruptedException | IOException e) { if (LOG.isLoggable(INFO)) LOG.info(e.toString()); } } + void sendStoredStreams() throws IOException, InterruptedException { + // Get stored sync stream if available + MailboxStorage.MailboxStorageStream nextStream = + mailboxStorage.getStreamForOwner(); + + while (nextStream != null) { + // Send TAKE request and delete stream if request was successfull + MailboxRequestTake req = + new MailboxRequestTake(nextStream.getEncryptedStream()); + mailboxProtocol.writeRequest(req); + + if (req.awaitAndGetResponse()) + // delete stream if request was successful + mailboxStorage.deleteStream(nextStream); + else + //TODO: stop sending after failed request? + throw new ProtocolException(req.getError()); + + // Get next stored sync stream if available + nextStream = mailboxStorage.getStreamForOwner(); + } + } + private class STOREHandler implements MailboxRequestHandler { @Override public void handleRequest(MailboxRequest request) @@ -73,7 +100,7 @@ public class MailboxOwnerSession extends AbstractMailboxSession { mailboxStorage .storeStream(storeReq.getContactId(), - storeReq.getEncryptedSyncStream()); + storeReq.getEncryptedSyncStream(), false); } @Override @@ -82,8 +109,6 @@ public class MailboxOwnerSession extends AbstractMailboxSession { } @Override - public void protocolFinished() { - mailboxStorage.close(); - } + public void protocolFinished() {} } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java index d512afe5db4221a30539eba7ae6788e6aafbf725..5e61a7e66d0f6b4a0c3eadf8b6731bb8f2c5dc95 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java @@ -2,37 +2,47 @@ package org.briarproject.bramble.mailbox; import org.briarproject.bramble.api.contact.ContactId; -import java.io.IOException; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; import javax.annotation.Nullable; class MailboxStorage { + private static final Logger LOG = Logger.getLogger(MailboxStorage.class.getName()); static volatile AtomicLong streamCounter = new AtomicLong(); // Dummy implementation with a hashmap for now LinkedHashMap<ContactId, LinkedList<MailboxStorageStream>> contactStreams = new LinkedHashMap(); + private boolean isClosed; + + synchronized void storeStream(ContactId contactId, byte[] encryptedStream, + boolean forOwner) { + LOG.info("Storing STREAM"); - synchronized void storeStream(ContactId contactId, byte[] encryptedStream) { MailboxStorageStream stream = - new MailboxStorageStream(contactId, encryptedStream); + new MailboxStorageStream(contactId, encryptedStream, forOwner); if (!contactStreams.containsKey(contactId)) contactStreams.put(contactId, new LinkedList<>()); contactStreams.get(contactId).add(stream); + + if(forOwner) + this.notifyAll(); + } @Nullable - synchronized MailboxStorageStream getStream(ContactId contactId) { + synchronized MailboxStorageStream getStream(ContactId contactId, + boolean forOwner) { if (contactStreams.get(contactId) == null) return null; for (MailboxStorageStream s : contactStreams.get(contactId)) { - if (!s.pending) { + if (!s.pending && (s.forOwner == forOwner)) { s.pending = true; return s; } @@ -40,6 +50,18 @@ class MailboxStorage { return null; } + public MailboxStorageStream getStreamForOwner() { + for (LinkedList<MailboxStorageStream> streams : contactStreams + .values()) { + for (MailboxStorageStream s : streams) + if (!s.pending && s.forOwner) { + s.pending = true; + return s; + } + } + return null; + } + synchronized void deleteStream(MailboxStorageStream stream) { for (MailboxStorageStream s : contactStreams.get(stream.contactId)) { @@ -49,23 +71,22 @@ class MailboxStorage { } } - public void close() { - - } protected class MailboxStorageStream { private final Long id; private final byte[] encryptedStream; private final ContactId contactId; + private final boolean forOwner; private boolean pending = false; protected MailboxStorageStream( ContactId contactId, - byte[] encryptedStream) { + byte[] encryptedStream, boolean forOwner) { this.contactId = contactId; this.id = streamCounter.getAndIncrement(); this.encryptedStream = encryptedStream; + this.forOwner = forOwner; } byte[] getEncryptedStream() {