Commit 71b1f99b authored by akwizgran's avatar akwizgran

Use regular poll task for expiry.

parent 2982a874
Pipeline #3470 passed with stage
in 8 minutes and 27 seconds
...@@ -140,10 +140,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -140,10 +140,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
private void addPendingContact(PendingContact p) { private void addPendingContact(PendingContact p) {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long expiry = p.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; long expiry = p.getTimestamp() + RENDEZVOUS_TIMEOUT_MS;
if (expiry > now) { if (expiry <= now) {
scheduler.schedule(() -> expirePendingContactAsync(p.getId()),
expiry - now, MILLISECONDS);
} else {
eventBus.broadcast(new RendezvousFailedEvent(p.getId())); eventBus.broadcast(new RendezvousFailedEvent(p.getId()));
return; return;
} }
...@@ -158,7 +155,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -158,7 +155,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
.deriveRendezvousKey(staticMasterKey); .deriveRendezvousKey(staticMasterKey);
boolean alice = transportCrypto boolean alice = transportCrypto
.isAlice(p.getPublicKey(), handshakeKeyPair); .isAlice(p.getPublicKey(), handshakeKeyPair);
CryptoState cs = new CryptoState(rendezvousKey, alice); CryptoState cs = new CryptoState(rendezvousKey, alice, expiry);
requireNull(cryptoStates.put(p.getId(), cs)); requireNull(cryptoStates.put(p.getId(), cs));
for (PluginState ps : pluginStates.values()) { for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint = RendezvousEndpoint endpoint =
...@@ -171,28 +168,6 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -171,28 +168,6 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
} }
} }
@Scheduler
private void expirePendingContactAsync(PendingContactId p) {
worker.execute(() -> expirePendingContact(p));
}
// Worker
private void expirePendingContact(PendingContactId p) {
if (removePendingContact(p))
eventBus.broadcast(new RendezvousFailedEvent(p));
}
// Worker
private boolean removePendingContact(PendingContactId p) {
// We can come here twice if a pending contact fails and is then removed
if (cryptoStates.remove(p) == null) return false;
for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint = ps.endpoints.remove(p);
if (endpoint != null) tryToClose(endpoint, LOG, INFO);
}
return true;
}
@Nullable @Nullable
private RendezvousEndpoint createEndpoint(DuplexPlugin plugin, private RendezvousEndpoint createEndpoint(DuplexPlugin plugin,
PendingContactId p, CryptoState cs) { PendingContactId p, CryptoState cs) {
...@@ -206,10 +181,34 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -206,10 +181,34 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
@Scheduler @Scheduler
private void poll() { private void poll() {
worker.execute(() -> { worker.execute(() -> {
removeExpiredPendingContacts();
for (PluginState ps : pluginStates.values()) poll(ps); for (PluginState ps : pluginStates.values()) poll(ps);
}); });
} }
// Worker
private void removeExpiredPendingContacts() {
long now = clock.currentTimeMillis();
List<PendingContactId> expired = new ArrayList<>();
for (Entry<PendingContactId, CryptoState> e : cryptoStates.entrySet()) {
if (e.getValue().expiry <= now) expired.add(e.getKey());
}
for (PendingContactId p : expired) {
removePendingContact(p);
eventBus.broadcast(new RendezvousFailedEvent(p));
}
}
// Worker
private void removePendingContact(PendingContactId p) {
// We can come here twice if a pending contact expires and is removed
if (cryptoStates.remove(p) == null) return;
for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint = ps.endpoints.remove(p);
if (endpoint != null) tryToClose(endpoint, LOG, INFO);
}
}
// Worker // Worker
private void poll(PluginState ps) { private void poll(PluginState ps) {
List<Pair<TransportProperties, ConnectionHandler>> properties = List<Pair<TransportProperties, ConnectionHandler>> properties =
...@@ -221,7 +220,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -221,7 +220,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
Handler h = new Handler(e.getKey(), ps.plugin.getId(), false); Handler h = new Handler(e.getKey(), ps.plugin.getId(), false);
properties.add(new Pair<>(props, h)); properties.add(new Pair<>(props, h));
} }
ps.plugin.poll(properties); if (!properties.isEmpty()) ps.plugin.poll(properties);
} }
@Override @Override
...@@ -324,10 +323,13 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -324,10 +323,13 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
private final SecretKey rendezvousKey; private final SecretKey rendezvousKey;
private final boolean alice; private final boolean alice;
private final long expiry;
private CryptoState(SecretKey rendezvousKey, boolean alice) { private CryptoState(SecretKey rendezvousKey, boolean alice,
long expiry) {
this.rendezvousKey = rendezvousKey; this.rendezvousKey = rendezvousKey;
this.alice = alice; this.alice = alice;
this.expiry = expiry;
} }
} }
......
...@@ -35,6 +35,7 @@ import java.util.concurrent.Executor; ...@@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS; import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS;
...@@ -92,28 +93,28 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -92,28 +93,28 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
} }
@Test @Test
public void testAddsPendingContactsAndSchedulesExpiryAtStartup() public void testAddsPendingContactsAndSchedulesPollingAtStartup()
throws Exception { throws Exception {
Transaction txn = new Transaction(null, true); Transaction txn = new Transaction(null, true);
long now = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS - 1000; long beforeExpiry = pendingContact.getTimestamp()
AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>(); + RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
// Start the service
context.checking(new DbExpectations() {{ context.checking(new DbExpectations() {{
// Load the pending contacts // Load the pending contacts
oneOf(db).transaction(with(true), withDbRunnable(txn)); oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn); oneOf(db).getPendingContacts(txn);
will(returnValue(singletonList(pendingContact))); will(returnValue(singletonList(pendingContact)));
// Schedule the first poll // The pending contact has not expired
oneOf(clock).currentTimeMillis();
will(returnValue(beforeExpiry));
// Capture the poll task, we'll run it later
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)), oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS)); with(MILLISECONDS));
// Calculate the pending contact's expiry time, 1 second from now will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
oneOf(clock).currentTimeMillis();
will(returnValue(now));
// Capture the expiry task, we'll run it later
oneOf(scheduler).schedule(with(any(Runnable.class)), with(1000L),
with(MILLISECONDS));
will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class,
0)); 0));
}}); }});
...@@ -122,32 +123,50 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -122,32 +123,50 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
rendezvousPoller.startService(); rendezvousPoller.startService();
context.assertIsSatisfied(); context.assertIsSatisfied();
// Run the poll task - pending contact expires
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Run the expiry task oneOf(clock).currentTimeMillis();
will(returnValue(afterExpiry));
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
}}); }});
captureExpiryTask.get().run(); capturePollTask.get().run();
} }
@Test @Test
public void testBroadcastsEventWhenExpiredPendingContactIsAdded() { public void testExpiresPendingContactAtStartup() throws Exception {
long now = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS; Transaction txn = new Transaction(null, true);
long atExpiry = pendingContact.getTimestamp() + RENDEZVOUS_TIMEOUT_MS;
context.checking(new Expectations() {{ // Start the service
context.checking(new DbExpectations() {{
// Load the pending contacts
oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn);
will(returnValue(singletonList(pendingContact)));
// The pending contact has already expired
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(atExpiry));
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
// Schedule the poll task
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
}}); }});
rendezvousPoller.eventOccurred( rendezvousPoller.startService();
new PendingContactAddedEvent(pendingContact));
} }
@Test @Test
public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndRemoved() public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndRemoved()
throws Exception { throws Exception {
long now = pendingContact.getTimestamp(); long beforeExpiry = pendingContact.getTimestamp();
// Start the service
expectStartupWithNoPendingContacts();
rendezvousPoller.startService();
context.assertIsSatisfied();
// Enable the transport - no endpoints should be created yet // Enable the transport - no endpoints should be created yet
expectGetPlugin(); expectGetPlugin();
...@@ -157,11 +176,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -157,11 +176,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
// Add the pending contact - endpoint should be created and polled // Add the pending contact - endpoint should be created and polled
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Add pending contact
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(beforeExpiry));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
}}); }});
expectDeriveRendezvousKey(); expectDeriveRendezvousKey();
...@@ -196,8 +212,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -196,8 +212,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
@Test @Test
public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndExpired() public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndExpired()
throws Exception { throws Exception {
long now = pendingContact.getTimestamp(); long beforeExpiry = pendingContact.getTimestamp()
AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>(); + RENDEZVOUS_TIMEOUT_MS - 1000;
long afterExpiry = beforeExpiry + POLLING_INTERVAL_MS;
// Start the service, capturing the poll task
AtomicReference<Runnable> capturePollTask =
expectStartupWithNoPendingContacts();
rendezvousPoller.startService();
context.assertIsSatisfied();
// Enable the transport - no endpoints should be created yet // Enable the transport - no endpoints should be created yet
expectGetPlugin(); expectGetPlugin();
...@@ -207,14 +231,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -207,14 +231,8 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
// Add the pending contact - endpoint should be created and polled // Add the pending contact - endpoint should be created and polled
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Add pending contact
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(beforeExpiry));
// Capture the expiry task, we'll run it later
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class,
0));
}}); }});
expectDeriveRendezvousKey(); expectDeriveRendezvousKey();
...@@ -233,13 +251,15 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -233,13 +251,15 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
new PendingContactAddedEvent(pendingContact)); new PendingContactAddedEvent(pendingContact));
context.assertIsSatisfied(); context.assertIsSatisfied();
// The pending contact expires - endpoint should be closed // Run the poll task - pending contact expires, endpoint is closed
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(clock).currentTimeMillis();
will(returnValue(afterExpiry));
oneOf(rendezvousEndpoint).close(); oneOf(rendezvousEndpoint).close();
oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class))); oneOf(eventBus).broadcast(with(any(RendezvousFailedEvent.class)));
}}); }});
captureExpiryTask.get().run(); capturePollTask.get().run();
context.assertIsSatisfied(); context.assertIsSatisfied();
// Remove the pending contact - endpoint is already closed // Remove the pending contact - endpoint is already closed
...@@ -254,14 +274,18 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -254,14 +274,18 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
@Test @Test
public void testCreatesAndClosesEndpointsWhenTransportIsEnabledAndDisabled() public void testCreatesAndClosesEndpointsWhenTransportIsEnabledAndDisabled()
throws Exception { throws Exception {
long now = pendingContact.getTimestamp(); long beforeExpiry = pendingContact.getTimestamp();
// Start the service
expectStartupWithNoPendingContacts();
rendezvousPoller.startService();
context.assertIsSatisfied();
// Add the pending contact - no endpoints should be created yet // Add the pending contact - no endpoints should be created yet
context.checking(new DbExpectations() {{ context.checking(new DbExpectations() {{
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(beforeExpiry));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
}}); }});
expectDeriveRendezvousKey(); expectDeriveRendezvousKey();
...@@ -290,6 +314,25 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -290,6 +314,25 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
new PendingContactRemovedEvent(pendingContact.getId())); new PendingContactRemovedEvent(pendingContact.getId()));
} }
private AtomicReference<Runnable> expectStartupWithNoPendingContacts()
throws Exception {
Transaction txn = new Transaction(null, true);
AtomicReference<Runnable> capturePollTask = new AtomicReference<>();
context.checking(new DbExpectations() {{
oneOf(db).transaction(with(true), withDbRunnable(txn));
oneOf(db).getPendingContacts(txn);
will(returnValue(emptyList()));
oneOf(scheduler).scheduleAtFixedRate(with(any(Runnable.class)),
with(POLLING_INTERVAL_MS), with(POLLING_INTERVAL_MS),
with(MILLISECONDS));
will(new CaptureArgumentAction<>(capturePollTask, Runnable.class,
0));
}});
return capturePollTask;
}
private void expectDeriveRendezvousKey() throws Exception { private void expectDeriveRendezvousKey() throws Exception {
Transaction txn = new Transaction(null, true); Transaction txn = new Transaction(null, true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment