Commit 2982a874 authored by Torsten Grote's avatar Torsten Grote

Merge branch '1232-rendezvous-poller-cleanup' into 'master'

Small cleanups for rendezvous poller

See merge request !1126
parents a94ffd41 ea228164
Pipeline #3465 passed with stage
in 7 minutes and 34 seconds
...@@ -47,6 +47,7 @@ import java.util.Map; ...@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
...@@ -79,6 +80,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -79,6 +80,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock; private final Clock clock;
private final AtomicBoolean used = new AtomicBoolean(false);
// Executor that runs one task at a time // Executor that runs one task at a time
private final Executor worker; private final Executor worker;
// The following fields are only accessed on the worker // The following fields are only accessed on the worker
...@@ -113,6 +115,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener { ...@@ -113,6 +115,7 @@ class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
@Override @Override
public void startService() throws ServiceException { public void startService() throws ServiceException {
if (used.getAndSet(true)) throw new IllegalStateException();
try { try {
db.transaction(true, txn -> { db.transaction(true, txn -> {
Collection<PendingContact> pending = db.getPendingContacts(txn); Collection<PendingContact> pending = db.getPendingContacts(txn);
......
...@@ -115,20 +115,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -115,20 +115,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
with(MILLISECONDS)); with(MILLISECONDS));
will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class, will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class,
0)); 0));
// Load our handshake key pair
oneOf(db).transactionWithResult(with(true), withDbCallable(txn));
will(returnValue(handshakeKeyPair));
// Derive the rendezvous key
oneOf(transportCrypto).deriveStaticMasterKey(
pendingContact.getPublicKey(), handshakeKeyPair);
will(returnValue(staticMasterKey));
oneOf(rendezvousCrypto).deriveRendezvousKey(staticMasterKey);
will(returnValue(rendezvousKey));
oneOf(transportCrypto).isAlice(pendingContact.getPublicKey(),
handshakeKeyPair);
will(returnValue(alice));
}}); }});
expectDeriveRendezvousKey();
rendezvousPoller.startService(); rendezvousPoller.startService();
context.assertIsSatisfied(); context.assertIsSatisfied();
...@@ -157,45 +147,27 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -157,45 +147,27 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
@Test @Test
public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndRemoved() public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndRemoved()
throws Exception { throws Exception {
Transaction txn = new Transaction(null, true);
long now = pendingContact.getTimestamp(); long now = pendingContact.getTimestamp();
// Enable the transport - no endpoints should be created yet // Enable the transport - no endpoints should be created yet
context.checking(new Expectations() {{ expectGetPlugin();
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
oneOf(plugin).supportsRendezvous();
will(returnValue(true));
allowing(plugin).getId();
will(returnValue(transportId));
}});
rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId)); rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId));
context.assertIsSatisfied(); context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled // Add the pending contact - endpoint should be created and polled
context.checking(new DbExpectations() {{ context.checking(new Expectations() {{
// Add pending contact // Add pending contact
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
oneOf(db).transactionWithResult(with(true), withDbCallable(txn)); }});
will(returnValue(handshakeKeyPair));
oneOf(transportCrypto).deriveStaticMasterKey( expectDeriveRendezvousKey();
pendingContact.getPublicKey(), handshakeKeyPair); expectCreateEndpoint();
will(returnValue(staticMasterKey));
oneOf(rendezvousCrypto).deriveRendezvousKey(staticMasterKey); context.checking(new Expectations() {{
will(returnValue(rendezvousKey));
oneOf(transportCrypto).isAlice(pendingContact.getPublicKey(),
handshakeKeyPair);
will(returnValue(alice));
oneOf(rendezvousCrypto).createKeyMaterialSource(rendezvousKey,
transportId);
will(returnValue(keyMaterialSource));
oneOf(plugin).createRendezvousEndpoint(with(keyMaterialSource),
with(alice), with(any(ConnectionHandler.class)));
will(returnValue(rendezvousEndpoint));
// Poll newly added pending contact // Poll newly added pending contact
oneOf(rendezvousEndpoint).getRemoteTransportProperties(); oneOf(rendezvousEndpoint).getRemoteTransportProperties();
will(returnValue(transportProperties)); will(returnValue(transportProperties));
...@@ -224,25 +196,17 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -224,25 +196,17 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
@Test @Test
public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndExpired() public void testCreatesAndClosesEndpointsWhenPendingContactIsAddedAndExpired()
throws Exception { throws Exception {
Transaction txn = new Transaction(null, true);
long now = pendingContact.getTimestamp(); long now = pendingContact.getTimestamp();
AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>(); AtomicReference<Runnable> captureExpiryTask = new AtomicReference<>();
// Enable the transport - no endpoints should be created yet // Enable the transport - no endpoints should be created yet
context.checking(new Expectations() {{ expectGetPlugin();
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
oneOf(plugin).supportsRendezvous();
will(returnValue(true));
allowing(plugin).getId();
will(returnValue(transportId));
}});
rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId)); rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId));
context.assertIsSatisfied(); context.assertIsSatisfied();
// Add the pending contact - endpoint should be created and polled // Add the pending contact - endpoint should be created and polled
context.checking(new DbExpectations() {{ context.checking(new Expectations() {{
// Add pending contact // Add pending contact
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(now)); will(returnValue(now));
...@@ -251,22 +215,12 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -251,22 +215,12 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class, will(new CaptureArgumentAction<>(captureExpiryTask, Runnable.class,
0)); 0));
oneOf(db).transactionWithResult(with(true), withDbCallable(txn)); }});
will(returnValue(handshakeKeyPair));
oneOf(transportCrypto).deriveStaticMasterKey( expectDeriveRendezvousKey();
pendingContact.getPublicKey(), handshakeKeyPair); expectCreateEndpoint();
will(returnValue(staticMasterKey));
oneOf(rendezvousCrypto).deriveRendezvousKey(staticMasterKey); context.checking(new Expectations() {{
will(returnValue(rendezvousKey));
oneOf(transportCrypto).isAlice(pendingContact.getPublicKey(),
handshakeKeyPair);
will(returnValue(alice));
oneOf(rendezvousCrypto).createKeyMaterialSource(rendezvousKey,
transportId);
will(returnValue(keyMaterialSource));
oneOf(plugin).createRendezvousEndpoint(with(keyMaterialSource),
with(alice), with(any(ConnectionHandler.class)));
will(returnValue(rendezvousEndpoint));
// Poll newly added pending contact // Poll newly added pending contact
oneOf(rendezvousEndpoint).getRemoteTransportProperties(); oneOf(rendezvousEndpoint).getRemoteTransportProperties();
will(returnValue(transportProperties)); will(returnValue(transportProperties));
...@@ -300,7 +254,6 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -300,7 +254,6 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
@Test @Test
public void testCreatesAndClosesEndpointsWhenTransportIsEnabledAndDisabled() public void testCreatesAndClosesEndpointsWhenTransportIsEnabledAndDisabled()
throws Exception { throws Exception {
Transaction txn = new Transaction(null, true);
long now = pendingContact.getTimestamp(); long now = pendingContact.getTimestamp();
// Add the pending contact - no endpoints should be created yet // Add the pending contact - no endpoints should be created yet
...@@ -309,6 +262,38 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -309,6 +262,38 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
will(returnValue(now)); will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), oneOf(scheduler).schedule(with(any(Runnable.class)),
with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS)); with(RENDEZVOUS_TIMEOUT_MS), with(MILLISECONDS));
}});
expectDeriveRendezvousKey();
rendezvousPoller.eventOccurred(
new PendingContactAddedEvent(pendingContact));
context.assertIsSatisfied();
// Enable the transport - endpoint should be created
expectGetPlugin();
expectCreateEndpoint();
rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId));
context.assertIsSatisfied();
// Disable the transport - endpoint should be closed
context.checking(new Expectations() {{
oneOf(rendezvousEndpoint).close();
}});
rendezvousPoller.eventOccurred(new TransportDisabledEvent(transportId));
context.assertIsSatisfied();
// Remove the pending contact - endpoint is already closed
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
}
private void expectDeriveRendezvousKey() throws Exception {
Transaction txn = new Transaction(null, true);
context.checking(new DbExpectations() {{
oneOf(db).transactionWithResult(with(true), withDbCallable(txn)); oneOf(db).transactionWithResult(with(true), withDbCallable(txn));
will(returnValue(handshakeKeyPair)); will(returnValue(handshakeKeyPair));
oneOf(transportCrypto).deriveStaticMasterKey( oneOf(transportCrypto).deriveStaticMasterKey(
...@@ -320,19 +305,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -320,19 +305,10 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
handshakeKeyPair); handshakeKeyPair);
will(returnValue(alice)); will(returnValue(alice));
}}); }});
}
rendezvousPoller.eventOccurred( private void expectCreateEndpoint() {
new PendingContactAddedEvent(pendingContact));
context.assertIsSatisfied();
// Enable the transport - endpoint should be created
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
oneOf(plugin).supportsRendezvous();
will(returnValue(true));
allowing(plugin).getId();
will(returnValue(transportId));
oneOf(rendezvousCrypto).createKeyMaterialSource(rendezvousKey, oneOf(rendezvousCrypto).createKeyMaterialSource(rendezvousKey,
transportId); transportId);
will(returnValue(keyMaterialSource)); will(returnValue(keyMaterialSource));
...@@ -340,20 +316,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase { ...@@ -340,20 +316,16 @@ public class RendezvousPollerImplTest extends BrambleMockTestCase {
with(alice), with(any(ConnectionHandler.class))); with(alice), with(any(ConnectionHandler.class)));
will(returnValue(rendezvousEndpoint)); will(returnValue(rendezvousEndpoint));
}}); }});
}
rendezvousPoller.eventOccurred(new TransportEnabledEvent(transportId)); private void expectGetPlugin() {
context.assertIsSatisfied();
// Disable the transport - endpoint should be closed
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(rendezvousEndpoint).close(); oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
oneOf(plugin).supportsRendezvous();
will(returnValue(true));
allowing(plugin).getId();
will(returnValue(transportId));
}}); }});
rendezvousPoller.eventOccurred(new TransportDisabledEvent(transportId));
context.assertIsSatisfied();
// Remove the pending contact - endpoint is already closed
rendezvousPoller.eventOccurred(
new PendingContactRemovedEvent(pendingContact.getId()));
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment