Skip to content
Snippets Groups Projects
Verified Commit faa591cc authored by bontric's avatar bontric Committed by Julian Dehm
Browse files

Implement Sync-Session handling for contaccts/owner/mailbox

parent 1947e803
No related branches found
No related tags found
No related merge requests found
Showing
with 164 additions and 41 deletions
......@@ -168,7 +168,8 @@ public class MailboxManagerImpl implements MailboxManager {
MailboxSession mailboxSession = mailboxSessionFactory
.createMailboxSession(mailboxProtocol, contactId,
contactType);
contactType, writer.getMaxLatency(),
writer.getMaxIdleTime());
ioExecutor.execute(mailboxProtocol);
......
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
......@@ -18,7 +17,6 @@ class MailboxOwnerSession extends MailboxSession {
private static final Logger LOG =
Logger.getLogger(MailboxOwnerSession.class.getName());
private final ContactId contactId;
private final Executor ioExecutor;
private final MailboxProtocol mailboxProtocol;
......@@ -27,19 +25,20 @@ class MailboxOwnerSession extends MailboxSession {
SyncSessionFactory syncSessionFactory,
StreamWriterFactory streamWriterFactory,
StreamReaderFactory streamReaderFactory,
MailboxProtocol mailboxProtocol) {
MailboxProtocol mailboxProtocol, int transportMaxLatency,
int transportMaxIdleTime) {
super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory,
streamReaderFactory, mailboxProtocol);
streamReaderFactory, mailboxProtocol, transportMaxLatency,
transportMaxIdleTime, contactId);
this.contactId = contactId;
this.ioExecutor = ioExecutor;
this.mailboxProtocol = mailboxProtocol;
}
@Override
public void run() {
ioExecutor.execute(()->readRequests());
ioExecutor.execute(() -> readRequests());
}
@Override
......
......@@ -34,22 +34,27 @@ abstract class MailboxSession implements Runnable {
private static final Logger LOG =
Logger.getLogger(MailboxSession.class.getName());
protected final Executor ioExecutor;
protected final ContactId contactId;
private KeyManager keyManager;
private SyncSessionFactory syncSessionFactory;
private final StreamWriterFactory streamWriterFactory;
private final StreamReaderFactory streamReaderFactory;
private MailboxProtocol mailboxProtocol;
protected ContactId contactId;
private final int transportMaxLatency;
private final int transportMaxIdleTime;
// Used to handle graceful session termination with END request
private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false);
private boolean remoteSessionFinished = false;
public MailboxSession(Executor ioExecutor, KeyManager keyManager,
public MailboxSession(Executor ioExecutor,
KeyManager keyManager,
SyncSessionFactory syncSessionFactory,
StreamWriterFactory streamWriterFactory,
StreamReaderFactory streamReaderFactory,
MailboxProtocol mailboxProtocol) {
MailboxProtocol mailboxProtocol,
int transportMaxLatency,
int transportMaxIdleTime,
ContactId contactId) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
......@@ -57,12 +62,60 @@ abstract class MailboxSession implements Runnable {
this.streamWriterFactory = streamWriterFactory;
this.streamReaderFactory = streamReaderFactory;
this.mailboxProtocol = mailboxProtocol;
this.transportMaxLatency = transportMaxLatency;
this.transportMaxIdleTime = transportMaxIdleTime;
this.contactId = contactId;
}
protected byte[] createOutgoingSyncStream(ContactId contactId)
/**
* Creates and runs a duplex session which is used for sync streams between
* mailbox and owner (user)
*
* @return An Outgoing Duplex Session which writes MailboxProtocol SYNC requests
* using a MailboxSyncRequestWriter
* <p>
* Note: The stream is not encrypted, since the MailboxProtocol is responsible
* for encrypting/decrypting requests
*/
protected void runDuplexOutgoingSession() {
MailboxSyncRequestWriter mbWriter =
new MailboxSyncRequestWriter(mailboxProtocol);
try {
syncSessionFactory
.createDuplexOutgoingSession(contactId, transportMaxLatency,
transportMaxIdleTime, mbWriter).run();
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
/**
* Handle an incoming sync stream intended for the mailbox/client
*
* @param syncStream (previously decrypted) sync stream
* @throws IOException
*/
protected void handleSyncStream(byte[] syncStream)
throws IOException {
InputStream is = new ByteArrayInputStream(syncStream);
syncSessionFactory.createIncomingSession(contactId, is).run();
}
/**
* Get an encrypted Sync stream which can be stored on the mailbox by
* issuing a STORE request
*
* @param targetContactId The encrypted sync stream is generated for the given
* contactId
* @return Encrypted Sync stream
* @throws DbException
* @throws IOException
*/
protected byte[] getSyncStreamToStore(ContactId targetContactId)
throws DbException, IOException {
StreamContext ctx =
keyManager.getStreamContext(contactId, MailboxConstants.ID);
keyManager
.getStreamContext(targetContactId, MailboxConstants.ID);
if (ctx == null)
throw new IOException("Could not allocated stream context");
......@@ -72,13 +125,22 @@ abstract class MailboxSession implements Runnable {
StreamWriter streamWriter = streamWriterFactory
.createStreamWriter(os, ctx);
syncSessionFactory.createSimplexOutgoingSession(contactId,
syncSessionFactory.createSimplexOutgoingSession(targetContactId,
MailboxConstants.MAX_LATENCY, streamWriter);
return os.toByteArray();
}
protected void handleIncomingSyncStream(byte[] encryptedStream)
/**
* Handles an encrypted stream which was stored on the mailbox and received
* with a TAKE request.
*
* @param encryptedStream
* @throws DbException
* @throws IOException
*/
protected void handleEncryptedSyncStream(byte[] encryptedStream)
throws DbException, IOException {
// read tag from input stream
InputStream in = new ByteArrayInputStream(encryptedStream);
......
......@@ -2,14 +2,11 @@ package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.ContactType;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.mailbox.protocol.MailboxProtocol;
interface MailboxSessionFactory {
MailboxSession createOutgoingMailboxSession(StreamContext ctx,
StreamWriter streamWriter, ContactType cType);
MailboxSession createMailboxSession(MailboxProtocol mailboxProtocol,
ContactId contactId, ContactType contactType);
ContactId contactId, ContactType contactType,
int transportMaxLatency,
int transportMaxIdleTime);
}
......@@ -42,29 +42,26 @@ public class MailboxSessionFactoryImpl implements MailboxSessionFactory {
this.streamReaderFactory = streamReaderFactory;
}
@Override
public MailboxSession createOutgoingMailboxSession(StreamContext ctx,
StreamWriter streamWriter, ContactType cType) {
return null;
}
@Override
public MailboxSession createMailboxSession(MailboxProtocol mailboxProtocol,
ContactId contactId, ContactType contactType) {
ContactId contactId, ContactType contactType,
int transportMaxLatency,
int transportMaxIdleTime) {
switch (contactType) {
case CONTACT:
break;
case PRIVATE_MAILBOX:
return new PrivateMailboxSession(contactId, ioExecutor,
keyManager, syncSessionFactory, streamWriterFactory,
streamReaderFactory, mailboxProtocol);
streamReaderFactory, mailboxProtocol,
transportMaxLatency, transportMaxIdleTime);
case CONTACT_MAILBOX:
return null;
case MAILBOX_OWNER:
return new MailboxOwnerSession(contactId, ioExecutor,
keyManager, syncSessionFactory, streamWriterFactory,
streamReaderFactory, mailboxProtocol);
streamReaderFactory, mailboxProtocol,
transportMaxLatency, transportMaxIdleTime);
}
return null;
}
......
package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.mailbox.protocol.MailboxProtocol;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
private MailboxProtocol mailboxProtocol;
private final ByteArrayOutputStream bufferOS = new ByteArrayOutputStream();
public MailboxSyncRequestWriter(MailboxProtocol mailboxProtocol) {
this.mailboxProtocol = mailboxProtocol;
}
@Override
public OutputStream getOutputStream() {
return this;
}
@Override
public void sendEndOfStream() throws IOException {
flush();
}
@Override
public synchronized void write(int i) {
bufferOS.write(i);
}
@Override
public synchronized void flush() throws IOException {
byte[] syncStream = bufferOS.toByteArray();
MailboxRequestSync req = new MailboxRequestSync(syncStream);
try {
mailboxProtocol.writeRequest(req);
} catch (InterruptedException e) {
throw new IOException(e.toString());
}
}
}
......@@ -10,33 +10,44 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequestStore;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync;
import org.briarproject.bramble.mailbox.protocol.MailboxRequestTake;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
class PrivateMailboxSession extends MailboxSession {
private static final Logger LOG =
Logger.getLogger(PrivateMailboxSession.class.getName());
private final ContactId contactId;
public PrivateMailboxSession(ContactId contactId, Executor ioExecutor,
KeyManager keyManager,
SyncSessionFactory syncSessionFactory,
StreamWriterFactory streamWriterFactory,
StreamReaderFactory streamReaderFactory,
MailboxProtocol mailboxProtocol) {
MailboxProtocol mailboxProtocol, int transportMaxLatency,
int transportMaxIdleTime) {
super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory,
streamReaderFactory, mailboxProtocol);
this.contactId = contactId;
streamReaderFactory, mailboxProtocol, transportMaxLatency,
transportMaxIdleTime, contactId);
}
@Override
public void run() {
ioExecutor.execute(()->super.readRequests());
ioExecutor.execute(() -> super.readRequests());
ioExecutor.execute(() -> runDuplexOutgoingSession());
}
@Override
public void handleSync(MailboxRequestSync mailboxRequestSync) {
public void handleSync(MailboxRequestSync req) {
try {
handleSyncStream(req.getSyncStream());
} catch (IOException e) {
logException(LOG, WARNING, e);
}
}
@Override
......
......@@ -17,7 +17,6 @@ import java.util.logging.Logger;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.RESPONSE;
import static org.briarproject.bramble.util.LogUtils.logException;
public class MailboxProtocol implements Runnable {
......
......@@ -4,26 +4,37 @@ import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.data.BdfList;
import java.io.IOException;
import java.util.Arrays;
public class MailboxRequestSync extends MailboxRequest {
private byte[] encryptedStream;
public MailboxRequestSync(byte [] encryptedStream) {
super(TYPE.SYNC);
this.encryptedStream = encryptedStream;
}
public MailboxRequestSync(BdfList lst) throws FormatException {
super(lst);
}
@Override
protected BdfList makeRequestBody() {
return null;
return new BdfList(Arrays.asList(encryptedStream));
}
@Override
public void parseBody(BdfList list) throws FormatException {
public boolean hasResponse() {
return false;
}
@Override
public void parseBody(BdfList list) throws FormatException {
this.encryptedStream = list.getRaw(0);
}
public byte[] getEncryptedStream() {
public byte[] getSyncStream() {
return encryptedStream;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment