Skip to content
Snippets Groups Projects
Commit ccfdcfc2 authored by bontric's avatar bontric
Browse files

implementsyncsession reuse (WIP) (abandoning this task, maybe useful

in the future)
parent dd3f48a7
No related branches found
No related tags found
No related merge requests found
Pipeline #
...@@ -22,7 +22,11 @@ import java.io.ByteArrayOutputStream; ...@@ -22,7 +22,11 @@ import java.io.ByteArrayOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.ProtocolException; import java.net.ProtocolException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
...@@ -46,6 +50,9 @@ abstract class AbstractMailboxSession { ...@@ -46,6 +50,9 @@ abstract class AbstractMailboxSession {
private final int transportMaxLatency; private final int transportMaxLatency;
private final int transportMaxIdleTime; private final int transportMaxIdleTime;
private LinkedHashMap<ContactId, PipedOutputStream>
incomingSyncSessionWriter = new LinkedHashMap<>();
// Used to handle graceful session termination with END request // Used to handle graceful session termination with END request
private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false); private AtomicBoolean remoteSessionFinished = new AtomicBoolean(false);
protected AtomicBoolean terminated = new AtomicBoolean(false); protected AtomicBoolean terminated = new AtomicBoolean(false);
...@@ -147,8 +154,27 @@ abstract class AbstractMailboxSession { ...@@ -147,8 +154,27 @@ abstract class AbstractMailboxSession {
LOG.info(e.toString()); LOG.info(e.toString());
handleProtocolException(); handleProtocolException();
} finally { } finally {
synchronized (remoteSessionFinished){ readingFinished();
remoteSessionFinished.set(true); }
}
}
/**
* Called when no more requests will be received from the remote
*/
protected void readingFinished() {
synchronized (remoteSessionFinished) {
remoteSessionFinished.set(true);
}
synchronized (incomingSyncSessionWriter) {
//Close all Incoming SyncSessions assosciated with this MailboxSession
for (Map.Entry<ContactId, PipedOutputStream> entry : incomingSyncSessionWriter
.entrySet()) {
try {
entry.getValue().close();
} catch (IOException e) {
logException(LOG, WARNING, e);
} }
} }
} }
...@@ -228,14 +254,62 @@ abstract class AbstractMailboxSession { ...@@ -228,14 +254,62 @@ abstract class AbstractMailboxSession {
* @param req MailboxRequestTake * @param req MailboxRequestTake
*/ */
public void handleSync(MailboxRequestSync req) { public void handleSync(MailboxRequestSync req) {
//
if (req.isEndOfStream())
closeStreamForIncomingSession(contactId);
try { try {
InputStream is = new ByteArrayInputStream(req.getSyncStream()); receiveSyncStream(contactId, req.getSyncStream());
syncSessionFactory.createIncomingSession(contactId, is).run();
} catch (IOException e) { } catch (IOException e) {
logException(LOG, WARNING, e); logException(LOG, WARNING, e);
} }
} }
private void closeStreamForIncomingSession(ContactId id) {
synchronized (incomingSyncSessionWriter) {
if (!incomingSyncSessionWriter.containsKey(contactId)) {
if (LOG.isLoggable(WARNING))
LOG.warning(
"Received end of stream for nonexistent session");
return;
}
try {
incomingSyncSessionWriter.get(contactId).close();
} catch (IOException e) {
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
}
incomingSyncSessionWriter.remove(id);
}
}
private void receiveSyncStream(ContactId id, byte[] syncStream)
throws IOException {
PipedOutputStream os;
synchronized (incomingSyncSessionWriter) {
if (!incomingSyncSessionWriter.containsKey(contactId)) {
os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream(os);
incomingSyncSessionWriter.put(contactId, os);
ioExecutor.execute(() -> {
try {
syncSessionFactory.createIncomingSession(contactId, is)
.run();
} catch (IOException e) {
if (LOG.isLoggable(INFO))
LOG.info(e.toString());
}
});
} else {
os = incomingSyncSessionWriter.get(id);
}
}
synchronized (os) {
os.write(syncStream);
}
}
/** /**
* Handles an incoming TAKE request. The request contains an encrypted BSP * Handles an incoming TAKE request. The request contains an encrypted BSP
* stream. From this stream, the tag is read and a StreamReader is created. * stream. From this stream, the tag is read and a StreamReader is created.
...@@ -265,11 +339,13 @@ abstract class AbstractMailboxSession { ...@@ -265,11 +339,13 @@ abstract class AbstractMailboxSession {
"Received stream with unrecognisable tag"); "Received stream with unrecognisable tag");
} }
InputStream reader = InputStream reader =
streamReaderFactory.createStreamReader(in, ctx); streamReaderFactory.createStreamReader(in, ctx);
syncSessionFactory.createIncomingSession(ctx.getContactId(), reader) byte[] syncStream = new byte[reader.available()];
.run(); reader.read(syncStream);
receiveSyncStream(ctx.getContactId(), syncStream);
} catch (DbException | IOException e) { } catch (DbException | IOException e) {
throw new MailboxSessionHandleException(e.toString()); throw new MailboxSessionHandleException(e.toString());
} }
...@@ -280,10 +356,13 @@ abstract class AbstractMailboxSession { ...@@ -280,10 +356,13 @@ abstract class AbstractMailboxSession {
// TODO: mailbox storage for Mailbox implementation // TODO: mailbox storage for Mailbox implementation
} }
public abstract void run() throws IOException; /**
* Called when a protocol exception occurs when reading a request
*/
protected abstract void handleProtocolException(); protected abstract void handleProtocolException();
public abstract void run() throws IOException;
protected class MailboxSessionHandleException extends Exception { protected class MailboxSessionHandleException extends Exception {
public MailboxSessionHandleException(String error) { public MailboxSessionHandleException(String error) {
super(error); super(error);
......
...@@ -48,6 +48,7 @@ import static org.briarproject.bramble.util.LogUtils.logException; ...@@ -48,6 +48,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
/** /**
* The Mailbox Service runs to poll mailboxes * The Mailbox Service runs to poll mailboxes
* TODO/FIXME REFACTOR!
*/ */
public class MailboxServiceImpl implements MailboxService, EventListener { public class MailboxServiceImpl implements MailboxService, EventListener {
private static final Logger LOG = private static final Logger LOG =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment