From 38ee2221cdfcf0efa0bd0e0de9e31365cb205c64 Mon Sep 17 00:00:00 2001 From: bontric <benjohnwie@gmail.com> Date: Fri, 7 Sep 2018 19:58:54 +0200 Subject: [PATCH] Refactor MailboxSession and add event handling --- .../bramble/mailbox/MailboxManagerImpl.java | 35 ++- .../bramble/mailbox/MailboxServiceImpl.java | 220 ++++++++++-------- 2 files changed, 152 insertions(+), 103 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java index 5caa90198..05fa3a3d2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxManagerImpl.java @@ -9,6 +9,7 @@ import org.briarproject.bramble.api.data.BdfWriterFactory; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.mailbox.MailboxConstants; +import org.briarproject.bramble.api.mailbox.MailboxInfo; import org.briarproject.bramble.api.mailbox.MailboxManager; import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.TransportConnectionReader; @@ -48,6 +49,8 @@ public class MailboxManagerImpl implements MailboxManager { private final BdfWriterFactory bdfWriterFactory; private KeyManager keyManager; + private PrivateMailboxSession privateMailboxSession = null; + @Inject public MailboxManagerImpl(@IoExecutor Executor ioExecutor, KeyManager keyManager, MailboxSessionFactory mailboxSessionFactory, @@ -95,6 +98,17 @@ public class MailboxManagerImpl implements MailboxManager { ContactType.MAILBOX_OWNER)); } + @Override + public void handleOwnerContactWithoutMailbox(MailboxInfo mailboxInfo) { + if (null == privateMailboxSession || + privateMailboxSession.isTerminated()) { + return; + } + + ioExecutor.execute( + () -> privateMailboxSession.handleOwnerContact(mailboxInfo)); + } + private class ManageMailboxConnection implements Runnable { private boolean incoming; @@ -167,18 +181,26 @@ public class MailboxManagerImpl implements MailboxManager { new MailboxProtocol(ioExecutor, mailboxBdfWriter, mailboxBdfReader); - MailboxSession mailboxSession = mailboxSessionFactory + AbstractMailboxSession mailboxSession = mailboxSessionFactory .createMailboxSession(mailboxProtocol, contactId, contactType, writer.getMaxLatency(), writer.getMaxIdleTime()); ioExecutor.execute(mailboxProtocol); + //TODO: Too hacky? + if (contactType == ContactType.PRIVATE_MAILBOX) + privateMailboxSession = (PrivateMailboxSession) mailboxSession; - mailboxSession.run(); - mailboxProtocol.stop(); - - disposeConnection(false); + try { + mailboxSession.run(); + mailboxProtocol.stop(); + disposeConnection(false); + } catch (IOException e) { + if (LOG.isLoggable(INFO)) + LOG.info(e.toString()); + disposeConnection(true); + } connectionRegistry .unregisterConnection(contactId, MailboxConstants.ID, @@ -186,7 +208,8 @@ public class MailboxManagerImpl implements MailboxManager { } - private void handleOutgoingStream() throws DbException, IOException { + private void handleOutgoingStream() + throws DbException, IOException { // Allocate a stream context StreamContext ctx = keyManager.getStreamContext(contactId, transportId); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java index 81de9c3af..a64350255 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxServiceImpl.java @@ -1,10 +1,15 @@ package org.briarproject.bramble.mailbox; +import org.briarproject.bramble.api.contact.Contact; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactType; +import org.briarproject.bramble.api.contact.event.ContactAddedEvent; import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.event.Event; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.ServiceException; import org.briarproject.bramble.api.mailbox.MailboxConstants; @@ -17,6 +22,8 @@ import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; +import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; import org.briarproject.bramble.api.properties.TransportProperties; import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.system.Scheduler; @@ -24,10 +31,10 @@ import org.briarproject.bramble.api.system.Scheduler; import java.security.SecureRandom; import java.util.Collection; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.inject.Inject; @@ -35,18 +42,20 @@ import javax.inject.Inject; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; +import static org.briarproject.bramble.api.contact.ContactType.PRIVATE_MAILBOX; import static org.briarproject.bramble.api.mailbox.MailboxConstants.POLLING_INTERVALL; import static org.briarproject.bramble.util.LogUtils.logException; /** * The Mailbox Service runs to poll mailboxes */ -public class MailboxServiceImpl implements MailboxService { +public class MailboxServiceImpl implements MailboxService, EventListener { private static final Logger LOG = Logger.getLogger(MailboxServiceImpl.class.getName()); private final Executor ioExecutor; private final ScheduledExecutorService scheduler; + private EventBus eventBus; private final DatabaseComponent db; private final ConnectionRegistry connectionRegistry; private final PluginManager pluginManager; @@ -55,12 +64,13 @@ public class MailboxServiceImpl implements MailboxService { private final SecureRandom random; - private volatile Future PrivateMailboxFuture; - private volatile Future ContactMailboxFuture; + private volatile Future mailboxLanFuture; + private volatile ContactId privateMailboxId = null; + private volatile AtomicBoolean hasPrivateMailbox = new AtomicBoolean(false); @Inject MailboxServiceImpl(@IoExecutor Executor ioExecutor, @Scheduler - ScheduledExecutorService scheduler, + ScheduledExecutorService scheduler, EventBus eventBus, DatabaseComponent db, ConnectionRegistry connectionRegistry, PluginManager pluginManager, MailboxManager mailboxManager, @@ -68,12 +78,14 @@ public class MailboxServiceImpl implements MailboxService { SecureRandom random) { this.ioExecutor = ioExecutor; this.scheduler = scheduler; + this.eventBus = eventBus; this.db = db; this.connectionRegistry = connectionRegistry; this.pluginManager = pluginManager; this.mailboxManager = mailboxManager; this.transportPropertyManager = transportPropertyManager; this.random = random; + } @Override @@ -81,126 +93,114 @@ public class MailboxServiceImpl implements MailboxService { if (LOG.isLoggable(INFO)) LOG.info("Starting Mailbox Service"); - //NOTE/TODO: Only using LAN connections for now - - // TODO: Find a useful polling interval and randomize (should have - // a minimum polling interval) - - // TODO: Only schedule if there is a private mailbox - PrivateMailboxFuture = - schedule(new ConnectPrivateMailboxTask(LanTcpConstants.ID, - POLLING_INTERVALL), - POLLING_INTERVALL); - - //TODO: Only schedule if there are contact mailboxes? - ContactMailboxFuture = - schedule(new ConnectContactMailboxesTask(LanTcpConstants.ID, - POLLING_INTERVALL), - POLLING_INTERVALL); + checkForPrivateMailbox(); + tryToRunLanMailboxFuture(); + this.eventBus.addListener(this); } - private Future schedule(Runnable task, int delay) { - return scheduler - .schedule(() -> ioExecutor.execute(task), delay, MILLISECONDS); - } - - private class ConnectPrivateMailboxTask implements Runnable { - - private final int delay; - private TransportId transportId; + @Override + public void eventOccurred(Event e) { + if (e instanceof TransportEnabledEvent) { + if (((TransportEnabledEvent) e).getTransportId() + .equals(LanTcpConstants.ID)) { + if (mailboxLanFuture == null || mailboxLanFuture.isCancelled()) + tryToRunLanMailboxFuture(); + } + } - private ConnectPrivateMailboxTask(TransportId transportId, int delay) { - this.delay = - POLLING_INTERVALL; - this.transportId = transportId; + if (e instanceof TransportDisabledEvent) { + if (((TransportDisabledEvent) e).getTransportId() + .equals(LanTcpConstants.ID)) { + if (mailboxLanFuture != null) + mailboxLanFuture.cancel(false); + } } - private void reschedule() { - PrivateMailboxFuture = schedule(this, delay); + if (e instanceof ContactAddedEvent) { + if (!hasPrivateMailbox.get()) + ioExecutor.execute(() -> checkForPrivateMailbox()); } + } + private void tryToRunLanMailboxFuture() { + //NOTE/TODO: Only using LAN connections for now + DuplexPlugin plugin = + (DuplexPlugin) pluginManager.getPlugin(LanTcpConstants.ID); + // TODO: Find a useful polling interval and randomize (should have + // a minimum polling interval) + if (plugin != null && plugin.isRunning()) + mailboxLanFuture = schedule( + new ConnectMailboxesTask(LanTcpConstants.ID, + POLLING_INTERVALL, plugin), POLLING_INTERVALL); + } - @Override - public void run() { - DuplexPlugin plugin = - (DuplexPlugin) pluginManager.getPlugin(transportId); - if (plugin == null) { - reschedule(); + private void checkForPrivateMailbox() { + synchronized (hasPrivateMailbox) { + if (hasPrivateMailbox.get()) return; - } - if (LOG.isLoggable(INFO)) - LOG.info("Trying to connect to private Mailbox"); - Map<ContactId, TransportProperties> contacts; + + Transaction txn = null; + Collection<Contact> privateMb; try { - contacts = transportPropertyManager - .getRemotePropertiesByType(transportId, - ContactType.PRIVATE_MAILBOX); - } catch (DbException e) { - logException(LOG, WARNING, e); - // TODO: reschedule on DB excetpion? - reschedule(); - return; - } + txn = db.startTransaction(true); + privateMb = db.getContactsByType(txn, PRIVATE_MAILBOX); + db.commitTransaction(txn); - if (contacts.isEmpty()) { - if (LOG.isLoggable(INFO)) - LOG.info("No private mailbox available"); - // TODO: Should not be scheduled if no private MB is available - reschedule(); - return; - } + if (privateMb.size() > 1) + throw new RuntimeException( + "Multiple Private Mailboxes exist!"); - //TODO: Check if multiple private MB's exist? - for (Entry<ContactId, TransportProperties> entry : contacts - .entrySet()) { - if (!connectionRegistry.isConnected(entry.getKey())) { - if (LOG.isLoggable(INFO)) - LOG.info("Connecting to private mailbox"); - - DuplexTransportConnection mailBoxConn = - plugin.createConnection(entry.getValue()); - - if (mailBoxConn != null) - mailboxManager - .handleOutgoingPrivateMailboxConnection( - entry.getKey(), mailBoxConn, - transportId); - } else { - if (LOG.isLoggable(INFO)) { - LOG.info("Private Mailbox already connected"); - } + if (!privateMb.isEmpty()) { + privateMailboxId = privateMb.iterator().next().getId(); + hasPrivateMailbox.set(true); } + } catch (DbException dbe) { + logException(LOG, WARNING, dbe); + } finally { + if (txn != null) + db.endTransaction(txn); } - reschedule(); } } + private Future schedule(Runnable task, int delay) { + return scheduler + .schedule(() -> ioExecutor.execute(task), delay, MILLISECONDS); + } - private class ConnectContactMailboxesTask implements Runnable { + private class ConnectMailboxesTask implements Runnable { private TransportId transportId; private int delay; + private DuplexPlugin plugin; + private TransportProperties privateMailboxProperties = null; - private ConnectContactMailboxesTask(TransportId transportId, - int delay) { + private ConnectMailboxesTask(TransportId transportId, + int delay, DuplexPlugin plugin) { this.transportId = transportId; this.delay = delay; + this.plugin = plugin; } private void reschedule() { - ContactMailboxFuture = schedule(this, delay); + mailboxLanFuture = schedule(this, delay); } + @Override public void run() { - DuplexPlugin plugin = - (DuplexPlugin) pluginManager.getPlugin(transportId); - if (plugin == null) { - reschedule(); - return; + if (hasPrivateMailbox.get() && + connectionRegistry.isConnected(privateMailboxId)) { + try { + connectPrivateMailbox(); + } catch (DbException e) { + logException(LOG, WARNING, e); + return; + } } + // Poll Contact mailboxes - Collection<MailboxInfo> contactMailboxes; Map<ContactId, TransportProperties> contacts; + Collection<MailboxInfo> contactMailboxes; Transaction txn = null; try { txn = db.startTransaction(true); @@ -211,12 +211,12 @@ public class MailboxServiceImpl implements MailboxService { db.commitTransaction(txn); } catch (DbException e) { logException(LOG, WARNING, e); - reschedule(); // TODO Reschedule on Db exception? return; } finally { if (txn != null) db.endTransaction(txn); } + for (MailboxInfo mailboxInfo : contactMailboxes) { if (mailboxInfo.getMailboxId() == null) continue; @@ -232,24 +232,50 @@ public class MailboxServiceImpl implements MailboxService { mailboxTP)); } } + reschedule(); + } private void connectContactMailbox(DuplexPlugin plugin, ContactId contactId, - TransportProperties mailboxTransportProperties) { + TransportProperties transportProperties) { DuplexTransportConnection mailBoxConn = - plugin.createConnection(mailboxTransportProperties); + plugin.createConnection(transportProperties); if (mailBoxConn != null) mailboxManager.handleOutgoingContactMailboxConnection( contactId, mailBoxConn, transportId); } + + private void connectPrivateMailbox() throws DbException { + if (privateMailboxProperties == null) { + privateMailboxProperties = transportPropertyManager + .getRemoteProperties(privateMailboxId, transportId); + } + + if (LOG.isLoggable(INFO)) + LOG.info("Connecting to private mailbox"); + + DuplexTransportConnection mailBoxConn = + plugin.createConnection(privateMailboxProperties); + + if (mailBoxConn != null) { + mailboxManager + .handleOutgoingPrivateMailboxConnection( + privateMailboxId, mailBoxConn, + transportId); + } else { + if (LOG.isLoggable(INFO)) + LOG.info("Unable to connect to private mailbox"); + } + } } @Override public void stopService() throws ServiceException { - PrivateMailboxFuture.cancel(true); - ContactMailboxFuture.cancel(true); + if (mailboxLanFuture != null && !mailboxLanFuture.isCancelled()) + mailboxLanFuture.cancel(true); + eventBus.removeListener(this); } } -- GitLab