diff --git a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java index 11340b2338175b71e9f37db7eb6870f658d29277..b0d0c34619f387a5e09a774995f45d21a75d5cd1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/rendezvous/RendezvousPollerImpl.java @@ -140,10 +140,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { private void addPendingContact(PendingContact p) { long now = clock.currentTimeMillis(); long expiry = p.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; - if (expiry > now) { - scheduler.schedule(() -> expirePendingContactAsync(p.getId()), - expiry - now, MILLISECONDS); - } else { + if (expiry <= now) { eventBus.broadcast(new RendezvousFailedEvent(p.getId())); return; } @@ -158,7 +155,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { .deriveRendezvousKey(staticMasterKey); boolean alice = transportCrypto .isAlice(p.getPublicKey(), handshakeKeyPair); - CryptoState cs = new CryptoState(rendezvousKey, alice); + CryptoState cs = new CryptoState(rendezvousKey, alice, expiry); requireNull(cryptoStates.put(p.getId(), cs)); for (PluginState ps : pluginStates.values()) { RendezvousEndpoint endpoint = @@ -171,28 +168,6 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { } } - @Scheduler - private void expirePendingContactAsync(PendingContactId p) { - worker.execute(() -> expirePendingContact(p)); - } - - // Worker - private void expirePendingContact(PendingContactId p) { - if (removePendingContact(p)) - eventBus.broadcast(new RendezvousFailedEvent(p)); - } - - // Worker - private boolean removePendingContact(PendingContactId p) { - // We can come here twice if a pending contact fails and is then removed - if (cryptoStates.remove(p) == null) return false; - for (PluginState ps : pluginStates.values()) { - RendezvousEndpoint endpoint = ps.endpoints.remove(p); - if (endpoint != null) tryToClose(endpoint, LOG, INFO); - } - return true; - } - @Nullable private RendezvousEndpoint createEndpoint(DuplexPlugin plugin, PendingContactId p, CryptoState cs) { @@ -206,10 +181,34 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { @Scheduler private void poll() { worker.execute(() -> { + removeExpiredPendingContacts(); for (PluginState ps : pluginStates.values()) poll(ps); }); } + // Worker + private void removeExpiredPendingContacts() { + long now = clock.currentTimeMillis(); + List<PendingContactId> expired = new ArrayList<>(); + for (Entry<PendingContactId, CryptoState> e : cryptoStates.entrySet()) { + if (e.getValue().expiry <= now) expired.add(e.getKey()); + } + for (PendingContactId p : expired) { + removePendingContact(p); + eventBus.broadcast(new RendezvousFailedEvent(p)); + } + } + + // Worker + private void removePendingContact(PendingContactId p) { + // We can come here twice if a pending contact expires and is removed + if (cryptoStates.remove(p) == null) return; + for (PluginState ps : pluginStates.values()) { + RendezvousEndpoint endpoint = ps.endpoints.remove(p); + if (endpoint != null) tryToClose(endpoint, LOG, INFO); + } + } + // Worker private void poll(PluginState ps) { List<Pair<TransportProperties, ConnectionHandler>> properties = @@ -221,7 +220,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { Handler h = new Handler(e.getKey(), ps.plugin.getId(), false); properties.add(new Pair<>(props, h)); } - ps.plugin.poll(properties); + if (!properties.isEmpty()) ps.plugin.poll(properties); } @Override @@ -324,10 +323,13 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { private final SecretKey rendezvousKey; private final boolean alice; + private final long expiry; - private CryptoState(SecretKey rendezvousKey, boolean alice) { + private CryptoState(SecretKey rendezvousKey, boolean alice, + long expiry) { this.rendezvousKey = rendezvousKey; this.alice = alice; + this.expiry = expiry; } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/rendezvous/RendezvousPollerImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/rendezvous/RendezvousPollerImplTest.java index 4832ff8704e5fd1f868559e4db874d964e10382c..e5b4a6463fd5d5aae223add01b7307b55270d7f1 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/rendezvous/RendezvousPollerImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/rendezvous/RendezvousPollerImplTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS; @@ -92,28 +93,28 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { } @Test - public void testAddsPendingContactsAndSchedulesExpiryAtStartup() + public void testAddsPendingContactsAndSchedulesPollingAtStartup() throws Exception { Transaction txn = new Transaction(null, true); - long now = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS - 1000; - AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>(); + long beforeExpiry = pendingContact.getTimestamp() + + RENDEZVOUS_TIMEOUT_MS - 1000; + long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS; + AtomicReference<Runnable> capturePollTask = new AtomicReference<>(); + // Start the service context.checking(new DbExpectations() {{ // Load the pending contacts oneOf(db).transaction(with(true), withDbRunnable(txn)); oneOf(db).getPendingContacts(txn); will(returnValue(singletonList(pendingContact))); - // Schedule the first poll + // The pending contact has not expired + oneOf(clock).currentTimeMillis(); + will(returnValue(beforeExpiry)); + // Capture the poll task, we'll run it later oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)), with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS), with(MILLISECONDS)); - // Calculate the pending contact's expiry time, 1 second from now - oneOf(clock).currentTimeMillis(); - will(returnValue(now)); - // Capture the expiry task, we'll run it later - oneOf(scheduler).schedule(with(any(Runnable.class)), with(1000L), - with(MILLISECONDS)); - will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class, + will(new CaptureArgumentAction<>(capturePollTask, Runnable.class, 0)); }}); @@ -122,32 +123,50 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { rendezvousPoller.startService(); context.assertIsSatisfied(); + // Run the poll task - pending contact expires context.checking(new Expectations() {{ - // Run the expiry task + oneOf(clock).currentTimeMillis(); + will(returnValue(afterExpiry)); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); }}); - captureExpiryTask.get().run(); + capturePollTask.get().run(); } @Test - public void testBroadcastsEventWhenExpiredPendingContactIsAdded() { - long now = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; + public void testExpiresPendingContactAtStartup() throws Exception { + Transaction txn = new Transaction(null, true); + long atExpiry = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; - context.checking(new Expectations() {{ + // Start the service + context.checking(new DbExpectations() {{ + // Load the pending contacts + oneOf(db).transaction(with(true), withDbRunnable(txn)); + oneOf(db).getPendingContacts(txn); + will(returnValue(singletonList(pendingContact))); + // The pending contact has already expired oneOf(clock).currentTimeMillis(); - will(returnValue(now)); + will(returnValue(atExpiry)); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); + // Schedule the poll task + oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)), + with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS), + with(MILLISECONDS)); }}); - rendezvousPoller.eventOccurred( - new PendingContactAddedEvent(pendingContact)); + rendezvousPoller.startService(); } @Test public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndRemoved() throws Exception { - long now = pendingContact.getTimestamp(); + long beforeExpiry = pendingContact.getTimestamp(); + + // Start the service + expectStartupWithNoPendingContacts(); + + rendezvousPoller.startService(); + context.assertIsSatisfied(); // Enable the transport - no endpoints should be created yet expectGetPlugin(); @@ -157,11 +176,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { // Add the pending contact - endpoint should be created and polled context.checking(new Expectations() {{ - // Add pending contact oneOf(clock).currentTimeMillis(); - will(returnValue(now)); - oneOf(scheduler).schedule(with(any(Runnable.class)), - with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); + will(returnValue(beforeExpiry)); }}); expectDeriveRendezvousKey(); @@ -196,8 +212,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { @Test public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndExpired() throws Exception { - long now = pendingContact.getTimestamp(); - AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>(); + long beforeExpiry = pendingContact.getTimestamp() + + RENDEZVOUS_TIMEOUT_MS - 1000; + long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS; + + // Start the service, capturing the poll task + AtomicReference<Runnable> capturePollTask = + expectStartupWithNoPendingContacts(); + + rendezvousPoller.startService(); + context.assertIsSatisfied(); // Enable the transport - no endpoints should be created yet expectGetPlugin(); @@ -207,14 +231,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { // Add the pending contact - endpoint should be created and polled context.checking(new Expectations() {{ - // Add pending contact oneOf(clock).currentTimeMillis(); - will(returnValue(now)); - // Capture the expiry task, we'll run it later - oneOf(scheduler).schedule(with(any(Runnable.class)), - with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); - will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class, - 0)); + will(returnValue(beforeExpiry)); }}); expectDeriveRendezvousKey(); @@ -233,13 +251,15 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { new PendingContactAddedEvent(pendingContact)); context.assertIsSatisfied(); - // The pending contact expires - endpoint should be closed + // Run the poll task - pending contact expires, endpoint is closed context.checking(new Expectations() {{ + oneOf(clock).currentTimeMillis(); + will(returnValue(afterExpiry)); oneOf(rendezvousEndpoint).close(); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); }}); - captureExpiryTask.get().run(); + capturePollTask.get().run(); context.assertIsSatisfied(); // Remove the pending contact - endpoint is already closed @@ -254,14 +274,18 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { @Test public void testCreatesAndClosesEndpointsWhenTransportIsEnabledAndDisabled() throws Exception { - long now = pendingContact.getTimestamp(); + long beforeExpiry = pendingContact.getTimestamp(); + + // Start the service + expectStartupWithNoPendingContacts(); + + rendezvousPoller.startService(); + context.assertIsSatisfied(); // Add the pending contact - no endpoints should be created yet context.checking(new DbExpectations() {{ oneOf(clock).currentTimeMillis(); - will(returnValue(now)); - oneOf(scheduler).schedule(with(any(Runnable.class)), - with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); + will(returnValue(beforeExpiry)); }}); expectDeriveRendezvousKey(); @@ -290,6 +314,25 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { new PendingContactRemovedEvent(pendingContact.getId())); } + private AtomicReference<Runnable> expectStartupWithNoPendingContacts() + throws Exception { + Transaction txn = new Transaction(null, true); + AtomicReference<Runnable> capturePollTask = new AtomicReference<>(); + + context.checking(new DbExpectations() {{ + oneOf(db).transaction(with(true), withDbRunnable(txn)); + oneOf(db).getPendingContacts(txn); + will(returnValue(emptyList())); + oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)), + with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS), + with(MILLISECONDS)); + will(new CaptureArgumentAction<>(capturePollTask, Runnable.class, + 0)); + }}); + + return capturePollTask; + } + private void expectDeriveRendezvousKey() throws Exception { Transaction txn = new Transaction(null, true);