From 179bec390dc3694f0891ced5854baa198aacaee8 Mon Sep 17 00:00:00 2001 From: bontric <benjohnwie@gmail.com> Date: Thu, 20 Sep 2018 18:46:31 +0200 Subject: [PATCH] Implemen sessions for connection between a contact (of the owner) and the mailbox add basic implementation of MailboxStorage (For testing) --- .../mailbox/ContactMailboxSession.java | 146 ++++++++++++++++++ .../mailbox/MailboxContactSession.java | 125 +++++++++++++++ .../mailbox/MailboxSessionFactory.java | 2 +- .../mailbox/MailboxSessionFactoryImpl.java | 29 ++-- .../bramble/mailbox/MailboxStorage.java | 76 +++++++++ 5 files changed, 366 insertions(+), 12 deletions(-) create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxSession.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxSession.java new file mode 100644 index 000000000..60785ad44 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/ContactMailboxSession.java @@ -0,0 +1,146 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage; +import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; +import org.briarproject.bramble.mailbox.protocol.MailboxRequest; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ProtocolException; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.TAKE; +import static org.briarproject.bramble.util.LogUtils.logException; + +/** + * This session implements a contact communicating with another contact's + * mailbox + */ +public class ContactMailboxSession extends AbstractMailboxSession { + private static final Logger LOG = + Logger.getLogger(ContactMailboxSession.class.getName()); + private final KeyManager keyManager; + private final SyncSessionFactory syncSessionFactory; + private final StreamReaderFactory streamReaderFactory; + private final MailboxProtocol mailboxProtocol; + private final ContactId contactId; + + public ContactMailboxSession(ContactId contactId, Executor ioExecutor, + KeyManager keyManager, + SyncSessionFactory syncSessionFactory, + StreamWriterFactory streamWriterFactory, + StreamReaderFactory streamReaderFactory, + MailboxProtocol mailboxProtocol, + int transportMaxLatency, int transportMaxIdleTime) { + super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory, + streamReaderFactory, mailboxProtocol, transportMaxLatency, + transportMaxIdleTime, contactId); + this.keyManager = keyManager; + this.syncSessionFactory = syncSessionFactory; + this.streamReaderFactory = streamReaderFactory; + this.mailboxProtocol = mailboxProtocol; + this.contactId = contactId; + + mailboxProtocol.registerRequestHandler(new TAKEHandler()); + mailboxProtocol.enableRequestHandling(); + } + + @Override + public void run() { + // Get messages to send and formulate a STORE request + try { + sendStreamsToStore(); + } catch (IOException e) { + if (LOG.isLoggable(WARNING)) + LOG.warning(e.toString()); + } catch (DbException | InterruptedException e) { + logException(LOG, WARNING, e); + } + + // End the session by issuing and END request and wait for peer + // to send END request + try { + endSession(); + } catch (InterruptedException | IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + } + + void sendStreamsToStore() + throws IOException, DbException, InterruptedException { + + // Get sync stream if available + byte[] encryptedStream = getSyncStreamToStore(contactId); + + while (encryptedStream != null || encryptedStream.length > 0) { + // Send a STORE request and wait for a response + MailboxRequestStore req = new MailboxRequestStore(encryptedStream); + mailboxProtocol.writeRequest(req); + if (!req.awaitAndGetResponse()) + // TODO: Handle STORE request error! + throw new ProtocolException(req.getError()); + + // Get next sync stream if available + encryptedStream = getSyncStreamToStore(contactId); + } + } + + /** + * Handles an incoming TAKE request. The request contains an encrypted BSP + * stream. From this stream, the tag is read and a StreamReader is created. + * A BSP IncomingSession is created for the stream + */ + private class TAKEHandler implements MailboxRequestHandler { + + @Override + public void handleRequest(MailboxRequest request) + throws ProtocolException { + MailboxRequestTake takeRequest = (MailboxRequestTake) request; + InputStream in = new ByteArrayInputStream( + takeRequest.getEncryptedSyncStream()); + try { + StreamContext ctx = readTag(in); + + if (!ctx.getContactId().equals(contactId)) + throw new ProtocolException( + "Contact Id from stream does not match expected contactId!"); + + InputStream reader = + streamReaderFactory.createStreamReader(in, ctx); + + syncSessionFactory + .createIncomingSession(ctx.getContactId(), reader) + .run(); + + } catch (DbException | IOException e) { + throw new ProtocolException("Received invalid Stream"); + } + + } + + @Override + public MailboxMessage.TYPE getType() { + return TAKE; + } + + @Override + public void protocolFinished() { + } + } + +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java new file mode 100644 index 000000000..447496fa4 --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxContactSession.java @@ -0,0 +1,125 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.sync.SyncSessionFactory; +import org.briarproject.bramble.api.transport.KeyManager; +import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriterFactory; +import org.briarproject.bramble.mailbox.protocol.MailboxMessage; +import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; +import org.briarproject.bramble.mailbox.protocol.MailboxRequest; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestHandler; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore; +import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake; + +import java.io.IOException; +import java.net.ProtocolException; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE; +import static org.briarproject.bramble.util.LogUtils.logException; + +/** + * This session implements a mailbox communicating with a contact + * of it's owner (which was previously introduced to the mailbox) + */ +public class MailboxContactSession extends AbstractMailboxSession { + private static final Logger LOG = + Logger.getLogger(MailboxContactSession.class.getName()); + + private final MailboxProtocol mailboxProtocol; + private final ContactId contactId; + private final MailboxStorage mailboxStorage; + + public MailboxContactSession(ContactId contactId, Executor ioExecutor, + KeyManager keyManager, SyncSessionFactory syncSessionFactory, + StreamWriterFactory streamWriterFactory, + StreamReaderFactory streamReaderFactory, + MailboxProtocol mailboxProtocol, int transportMaxLatency, + int transportMaxIdleTime, MailboxStorage mailboxStorage) { + super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory, + streamReaderFactory, mailboxProtocol, transportMaxLatency, + transportMaxIdleTime, contactId); + this.mailboxProtocol = mailboxProtocol; + this.contactId = contactId; + this.mailboxStorage = mailboxStorage; + + mailboxProtocol.registerRequestHandler(new STOREHandler()); + } + + @Override + public void run() { + // Send stored sync streams to connected contact + try { + sendStoredStreams(); + } catch (IOException e) { + if (LOG.isLoggable(WARNING)) + LOG.warning(e.toString()); + } catch (InterruptedException e) { + logException(LOG, WARNING, e); + return; + } + + // End the session by issuing and END request and wait for peer + // to send END request + try { + endSession(); + } catch (InterruptedException | IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + } + } + + void sendStoredStreams() throws IOException, InterruptedException { + // Get stored sync stream if available + MailboxStorage.MailboxStorageStream nextStream = + mailboxStorage.getStream(contactId); + + while (nextStream != null) { + // Send TAKE request and delete stream if request was successfull + MailboxRequestTake req = + new MailboxRequestTake(nextStream.getEncryptedStream()); + mailboxProtocol.writeRequest(req); + + if (req.awaitAndGetResponse()) + // delete stream if request was successful + mailboxStorage.deleteStream(nextStream); + else + //TODO: stop sending after failed request? + throw new ProtocolException(req.getError()); + + // Get next stored sync stream if available + nextStream = mailboxStorage.getStream(contactId); + + } + } + + + private class STOREHandler implements MailboxRequestHandler { + @Override + public void handleRequest(MailboxRequest request) + throws ProtocolException { + MailboxRequestStore storeReq = (MailboxRequestStore) request; + + if (storeReq.hasContactId()) + throw new ProtocolException( + "Contact Id is implicit for contact to mailbox STORE requests"); + + mailboxStorage + .storeStream(contactId, storeReq.getEncryptedSyncStream()); + } + + @Override + public MailboxMessage.TYPE getType() { + return STORE; + } + + @Override + public void protocolFinished() { + mailboxStorage.close(); + } + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactory.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactory.java index 5bf20cac9..43db7c4f5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactory.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactory.java @@ -4,7 +4,7 @@ import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactType; import org.briarproject.bramble.mailbox.protocol.MailboxProtocol; -interface MailboxSessionFactory { +public interface MailboxSessionFactory { AbstractMailboxSession createMailboxSession(MailboxProtocol mailboxProtocol, ContactId contactId, ContactType contactType, int transportMaxLatency, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactoryImpl.java index 936125e5d..b5a93bff5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSessionFactoryImpl.java @@ -2,10 +2,7 @@ package org.briarproject.bramble.mailbox; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactType; -import org.briarproject.bramble.api.data.BdfReaderFactory; -import org.briarproject.bramble.api.data.BdfWriterFactory; import org.briarproject.bramble.api.lifecycle.IoExecutor; -import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; @@ -18,15 +15,12 @@ import javax.inject.Inject; public class MailboxSessionFactoryImpl implements MailboxSessionFactory { + private final MailboxStorage mailboxStorage; private Executor ioExecutor; private final KeyManager keyManager; private final SyncSessionFactory syncSessionFactory; private final StreamWriterFactory streamWriterFactory; private final StreamReaderFactory streamReaderFactory; - private ConnectionManager connectionManager; - private BdfWriterFactory bdfWriterFactory; - private BdfReaderFactory bdfReaderFactory; - @Inject public MailboxSessionFactoryImpl(@IoExecutor Executor ioExecutor, KeyManager keyManager, @@ -38,28 +32,41 @@ public class MailboxSessionFactoryImpl implements MailboxSessionFactory { this.syncSessionFactory = syncSessionFactory; this.streamWriterFactory = streamWriterFactory; this.streamReaderFactory = streamReaderFactory; + + // FIXME: using temporary storage for now + this.mailboxStorage = new MailboxStorage(); } @Override - public AbstractMailboxSession createMailboxSession(MailboxProtocol mailboxProtocol, + public AbstractMailboxSession createMailboxSession( + MailboxProtocol mailboxProtocol, ContactId contactId, ContactType contactType, int transportMaxLatency, int transportMaxIdleTime) { switch (contactType) { case CONTACT: - break; + throw new RuntimeException( + "A contact of type CONTACT can not be handled by a mailbox session"); case PRIVATE_MAILBOX: return new PrivateMailboxSession(contactId, ioExecutor, keyManager, syncSessionFactory, streamWriterFactory, streamReaderFactory, mailboxProtocol, transportMaxLatency, transportMaxIdleTime); case CONTACT_MAILBOX: - return null; + return new ContactMailboxSession(contactId, ioExecutor, + keyManager, syncSessionFactory, streamWriterFactory, + streamReaderFactory, mailboxProtocol, + transportMaxLatency, transportMaxIdleTime); case MAILBOX_OWNER: return new MailboxOwnerSession(contactId, ioExecutor, keyManager, syncSessionFactory, streamWriterFactory, streamReaderFactory, mailboxProtocol, - transportMaxLatency, transportMaxIdleTime); + transportMaxLatency, transportMaxIdleTime, mailboxStorage); + case MAILBOX_CONTACT: + return new MailboxContactSession(contactId, ioExecutor, + keyManager, syncSessionFactory, streamWriterFactory, + streamReaderFactory, mailboxProtocol, + transportMaxLatency, transportMaxIdleTime, mailboxStorage); } return null; } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java new file mode 100644 index 000000000..d512afe5d --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxStorage.java @@ -0,0 +1,76 @@ +package org.briarproject.bramble.mailbox; + +import org.briarproject.bramble.api.contact.ContactId; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Nullable; + +class MailboxStorage { + static volatile AtomicLong streamCounter = new AtomicLong(); + + // Dummy implementation with a hashmap for now + LinkedHashMap<ContactId, LinkedList<MailboxStorageStream>> contactStreams = + new LinkedHashMap(); + + synchronized void storeStream(ContactId contactId, byte[] encryptedStream) { + MailboxStorageStream stream = + new MailboxStorageStream(contactId, encryptedStream); + + if (!contactStreams.containsKey(contactId)) + contactStreams.put(contactId, new LinkedList<>()); + + contactStreams.get(contactId).add(stream); + } + + @Nullable + synchronized MailboxStorageStream getStream(ContactId contactId) { + if (contactStreams.get(contactId) == null) + return null; + + for (MailboxStorageStream s : contactStreams.get(contactId)) { + if (!s.pending) { + s.pending = true; + return s; + } + } + return null; + } + + + synchronized void deleteStream(MailboxStorageStream stream) { + for (MailboxStorageStream s : contactStreams.get(stream.contactId)) { + if (s.id == stream.id) { + contactStreams.get(stream.contactId).remove(s); + } + } + } + + public void close() { + + } + + protected class MailboxStorageStream { + private final Long id; + private final byte[] encryptedStream; + private final ContactId contactId; + + private boolean pending = false; + + protected MailboxStorageStream( + ContactId contactId, + byte[] encryptedStream) { + this.contactId = contactId; + this.id = streamCounter.getAndIncrement(); + this.encryptedStream = encryptedStream; + } + + byte[] getEncryptedStream() { + return encryptedStream; + } + + } +} -- GitLab