diff --git a/briar-core/src/org/briarproject/db/DatabaseModule.java b/briar-core/src/org/briarproject/db/DatabaseModule.java index d5770151db161c1294f5b36a0e6684f5f2f9c38c..a76155bdb6b0e91deb2090e57204b46991a1578c 100644 --- a/briar-core/src/org/briarproject/db/DatabaseModule.java +++ b/briar-core/src/org/briarproject/db/DatabaseModule.java @@ -55,8 +55,13 @@ public class DatabaseModule extends AbstractModule { } @Provides @Singleton @DatabaseExecutor - Executor getDatabaseExecutor(LifecycleManager lifecycleManager) { + ExecutorService getDatabaseExecutor(LifecycleManager lifecycleManager) { lifecycleManager.registerForShutdown(databaseExecutor); return databaseExecutor; } + + @Provides @Singleton @DatabaseExecutor + Executor getDatabaseExecutor(@DatabaseExecutor ExecutorService dbExecutor) { + return dbExecutor; + } } diff --git a/briar-core/src/org/briarproject/transport/KeyManagerImpl.java b/briar-core/src/org/briarproject/transport/KeyManagerImpl.java index c25008141caea4424cf1cfe85b8b5ec4412e1ce5..7c7123637ab254f49620f7150ab753ae0d6a3fb5 100644 --- a/briar-core/src/org/briarproject/transport/KeyManagerImpl.java +++ b/briar-core/src/org/briarproject/transport/KeyManagerImpl.java @@ -22,7 +22,7 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.logging.Logger; import javax.inject.Inject; @@ -36,7 +36,7 @@ class KeyManagerImpl implements KeyManager, EventListener { private final DatabaseComponent db; private final CryptoComponent crypto; - private final Executor dbExecutor; + private final ExecutorService dbExecutor; private final EventBus eventBus; private final Timer timer; private final Clock clock; @@ -44,7 +44,7 @@ class KeyManagerImpl implements KeyManager, EventListener { @Inject KeyManagerImpl(DatabaseComponent db, CryptoComponent crypto, - @DatabaseExecutor Executor dbExecutor, EventBus eventBus, + @DatabaseExecutor ExecutorService dbExecutor, EventBus eventBus, Timer timer, Clock clock) { this.db = db; this.crypto = crypto; diff --git a/briar-core/src/org/briarproject/transport/TransportKeyManager.java b/briar-core/src/org/briarproject/transport/TransportKeyManager.java index 19e49a89cc62e35725fbec4eee113e61adfcab68..7b6b5011c078651a283c332e9f55b341016a9c76 100644 --- a/briar-core/src/org/briarproject/transport/TransportKeyManager.java +++ b/briar-core/src/org/briarproject/transport/TransportKeyManager.java @@ -17,7 +17,10 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.TimerTask; -import java.util.concurrent.Executor; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; @@ -33,7 +36,7 @@ class TransportKeyManager extends TimerTask { private final DatabaseComponent db; private final CryptoComponent crypto; - private final Executor dbExecutor; + private final ExecutorService dbExecutor; private final Timer timer; private final Clock clock; private final TransportId transportId; @@ -46,7 +49,7 @@ class TransportKeyManager extends TimerTask { private final Map<ContactId, MutableTransportKeys> keys; TransportKeyManager(DatabaseComponent db, CryptoComponent crypto, - Executor dbExecutor, Timer timer, Clock clock, + ExecutorService dbExecutor, Timer timer, Clock clock, TransportId transportId, long maxLatency) { this.db = db; this.crypto = crypto; @@ -119,7 +122,8 @@ class TransportKeyManager extends TimerTask { } } - private void saveTransportKeys(final Map<ContactId, TransportKeys> rotated) { + private void saveTransportKeys( + final Map<ContactId, TransportKeys> rotated) { dbExecutor.execute(new Runnable() { public void run() { try { @@ -159,6 +163,7 @@ class TransportKeyManager extends TimerTask { StreamContext getStreamContext(ContactId c) { StreamContext ctx; + Future<Void> saved; lock.lock(); try { // Look up the outgoing keys for the contact @@ -170,30 +175,44 @@ class TransportKeyManager extends TimerTask { outKeys.getHeaderKey(), outKeys.getStreamCounter()); // Increment the stream counter and write it back to the DB outKeys.incrementStreamCounter(); - saveIncrementedStreamCounter(c, outKeys.getRotationPeriod()); + saved = saveIncrementedStreamCounter(c, + outKeys.getRotationPeriod()); } finally { lock.unlock(); } - // TODO: Wait for save to complete, return null if it fails + // Wait for the save to complete before returning the stream context + try { + saved.get(); + } catch (InterruptedException e) { + LOG.warning("Interrupted while incrementing stream counter"); + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + return null; + } return ctx; } - private void saveIncrementedStreamCounter(final ContactId c, + private Future<Void> saveIncrementedStreamCounter(final ContactId c, final long rotationPeriod) { - dbExecutor.execute(new Runnable() { - public void run() { + return dbExecutor.submit(new Callable<Void>() { + public Void call() { try { db.incrementStreamCounter(c, transportId, rotationPeriod); } catch (DbException e) { if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } + return null; } }); } StreamContext recogniseTag(byte[] tag) { StreamContext ctx; + Future<Void> saved; lock.lock(); try { // Look up the incoming keys for the tag @@ -221,19 +240,31 @@ class TransportKeyManager extends TimerTask { inContexts.remove(new Bytes(removeTag)); } // Write the window back to the DB - saveReorderingWindow(tagCtx.contactId, inKeys.getRotationPeriod(), - window.getBase(), window.getBitmap()); + saved = saveReorderingWindow(tagCtx.contactId, + inKeys.getRotationPeriod(), window.getBase(), + window.getBitmap()); } finally { lock.unlock(); } - // TODO: Wait for save to complete, return null if it fails + // Wait for the save to complete before returning the stream context + try { + saved.get(); + } catch (InterruptedException e) { + LOG.warning("Interrupted while updating reordering window"); + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + return null; + } return ctx; } - private void saveReorderingWindow(final ContactId c, + private Future<Void> saveReorderingWindow(final ContactId c, final long rotationPeriod, final long base, final byte[] bitmap) { - dbExecutor.execute(new Runnable() { - public void run() { + return dbExecutor.submit(new Callable<Void>() { + public Void call() { try { db.setReorderingWindow(c, transportId, rotationPeriod, base, bitmap); @@ -241,6 +272,7 @@ class TransportKeyManager extends TimerTask { if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } + return null; } }); }