Skip to content
Snippets Groups Projects
Commit cac7ee40 authored by bontric's avatar bontric
Browse files

Update Storage-testing implementation

parent 65ef4893
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,7 @@ public class MailboxContactSession extends AbstractMailboxSession {
void sendStoredStreams() throws IOException, InterruptedException {
// Get stored sync stream if available
MailboxStorage.MailboxStorageStream nextStream =
mailboxStorage.getStream(contactId);
mailboxStorage.getStream(contactId, false);
while (nextStream != null) {
// Send TAKE request and delete stream if request was successfull
......@@ -93,7 +93,7 @@ public class MailboxContactSession extends AbstractMailboxSession {
throw new ProtocolException(req.getError());
// Get next stored sync stream if available
nextStream = mailboxStorage.getStream(contactId);
nextStream = mailboxStorage.getStream(contactId, false);
}
}
......@@ -110,7 +110,7 @@ public class MailboxContactSession extends AbstractMailboxSession {
"Contact Id is implicit for contact to mailbox STORE requests");
mailboxStorage
.storeStream(contactId, storeReq.getEncryptedSyncStream());
.storeStream(contactId, storeReq.getEncryptedSyncStream(), true);
}
@Override
......@@ -119,8 +119,6 @@ public class MailboxContactSession extends AbstractMailboxSession {
}
@Override
public void protocolFinished() {
mailboxStorage.close();
}
public void protocolFinished() {}
}
}
......@@ -23,13 +23,15 @@ import static java.util.logging.Level.INFO;
import static org.briarproject.bramble.mailbox.protocol.MailboxMessage.TYPE.STORE;
/**
* This session represents a connection from the mailbox owner to their own mailbox
* This session represents the mailbox side of a connection from the owner
* of the mailbox
*/
public class MailboxOwnerSession extends AbstractMailboxSession {
private static final Logger LOG =
Logger.getLogger(MailboxOwnerSession.class.getName());
private final MailboxStorage mailboxStorage;
private MailboxProtocol mailboxProtocol;
public MailboxOwnerSession(ContactId contactId, Executor ioExecutor,
KeyManager keyManager,
......@@ -37,11 +39,12 @@ public class MailboxOwnerSession extends AbstractMailboxSession {
StreamWriterFactory streamWriterFactory,
StreamReaderFactory streamReaderFactory,
MailboxProtocol mailboxProtocol, int transportMaxLatency,
int transportMaxIdleTime, MailboxStorage mailboxStorage) {
int transportMaxIdleTime, MailboxStorage mailboxStorage, DatabaseComponent db) {
super(ioExecutor, keyManager, syncSessionFactory, streamWriterFactory,
super(ioExecutor, db, keyManager, syncSessionFactory, streamWriterFactory,
streamReaderFactory, mailboxProtocol, transportMaxLatency,
transportMaxIdleTime, contactId);
this.mailboxProtocol = mailboxProtocol;
this.mailboxStorage = mailboxStorage;
......@@ -54,13 +57,37 @@ public class MailboxOwnerSession extends AbstractMailboxSession {
@Override
public void run() {
try {
sendStoredStreams();
awaitSYNCHandlerFinished();
} catch (InterruptedException e) {
} catch (InterruptedException | IOException e) {
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
}
}
void sendStoredStreams() throws IOException, InterruptedException {
// Get stored sync stream if available
MailboxStorage.MailboxStorageStream nextStream =
mailboxStorage.getStreamForOwner();
while (nextStream != null) {
// Send TAKE request and delete stream if request was successfull
MailboxRequestTake req =
new MailboxRequestTake(nextStream.getEncryptedStream());
mailboxProtocol.writeRequest(req);
if (req.awaitAndGetResponse())
// delete stream if request was successful
mailboxStorage.deleteStream(nextStream);
else
//TODO: stop sending after failed request?
throw new ProtocolException(req.getError());
// Get next stored sync stream if available
nextStream = mailboxStorage.getStreamForOwner();
}
}
private class STOREHandler implements MailboxRequestHandler {
@Override
public void handleRequest(MailboxRequest request)
......@@ -73,7 +100,7 @@ public class MailboxOwnerSession extends AbstractMailboxSession {
mailboxStorage
.storeStream(storeReq.getContactId(),
storeReq.getEncryptedSyncStream());
storeReq.getEncryptedSyncStream(), false);
}
@Override
......@@ -82,8 +109,6 @@ public class MailboxOwnerSession extends AbstractMailboxSession {
}
@Override
public void protocolFinished() {
mailboxStorage.close();
}
public void protocolFinished() {}
}
}
......@@ -2,37 +2,59 @@ package org.briarproject.bramble.mailbox;
import org.briarproject.bramble.api.contact.ContactId;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.annotation.Nullable;
class MailboxStorage {
private static final Logger LOG = Logger.getLogger(MailboxStorage.class.getName());
static volatile AtomicLong streamCounter = new AtomicLong();
// Dummy implementation with a hashmap for now
LinkedHashMap<ContactId, LinkedList<MailboxStorageStream>> contactStreams =
new LinkedHashMap();
private boolean isClosed;
synchronized void storeStream(ContactId contactId, byte[] encryptedStream,
boolean forOwner) {
LOG.info("Storing STREAM");
synchronized void storeStream(ContactId contactId, byte[] encryptedStream) {
MailboxStorageStream stream =
new MailboxStorageStream(contactId, encryptedStream);
new MailboxStorageStream(contactId, encryptedStream, forOwner);
if (!contactStreams.containsKey(contactId))
contactStreams.put(contactId, new LinkedList<>());
contactStreams.get(contactId).add(stream);
if(forOwner)
this.notifyAll();
}
@Nullable
synchronized MailboxStorageStream getStream(ContactId contactId) {
synchronized MailboxStorageStream getStream(ContactId contactId,
boolean forOwner) {
if (contactStreams.get(contactId) == null)
return null;
for (MailboxStorageStream s : contactStreams.get(contactId)) {
if (!s.pending) {
if (!s.pending && (s.forOwner == forOwner)) {
s.pending = true;
return s;
}
}
return null;
}
public MailboxStorageStream getStreamForOwner() {
for (LinkedList<MailboxStorageStream> streams : contactStreams
.values()) {
for (MailboxStorageStream s : streams)
if (!s.pending && s.forOwner) {
s.pending = true;
return s;
}
......@@ -49,23 +71,22 @@ class MailboxStorage {
}
}
public void close() {
}
protected class MailboxStorageStream {
private final Long id;
private final byte[] encryptedStream;
private final ContactId contactId;
private final boolean forOwner;
private boolean pending = false;
protected MailboxStorageStream(
ContactId contactId,
byte[] encryptedStream) {
byte[] encryptedStream, boolean forOwner) {
this.contactId = contactId;
this.id = streamCounter.getAndIncrement();
this.encryptedStream = encryptedStream;
this.forOwner = forOwner;
}
byte[] getEncryptedStream() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment