diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 04f864d13a0ee4b369c8cf318467f8c69bafaf7c..f5e892405ece5f7569759a1024a04ffff696c222 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -54,7 +55,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private static final Logger LOG = Logger.getLogger(DuplexOutgoingSession.class.getName()); - private static final ThrowingRunnable<IOException> CLOSE = () -> {}; + private static final ThrowingRunnable<IOException> CLOSE = () -> { + }; private final DatabaseComponent db; private final Executor dbExecutor; @@ -65,6 +67,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final RecordWriter recordWriter; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; + private final AtomicBoolean generateAckQueued = new AtomicBoolean(false); + private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false); + private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false); + private final AtomicBoolean generateRequestQueued = + new AtomicBoolean(false); + private volatile boolean interrupted = false; DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, @@ -87,10 +95,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener { eventBus.addListener(this); try { // Start a query for each type of record - dbExecutor.execute(new GenerateAck()); - dbExecutor.execute(new GenerateBatch()); - dbExecutor.execute(new GenerateOffer()); - dbExecutor.execute(new GenerateRequest()); + generateAck(); + generateBatch(); + generateOffer(); + generateRequest(); long now = clock.currentTimeMillis(); long nextKeepalive = now + maxIdleTime; long nextRetxQuery = now + RETX_QUERY_INTERVAL; @@ -115,8 +123,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { now = clock.currentTimeMillis(); if (now >= nextRetxQuery) { // Check for retransmittable records - dbExecutor.execute(new GenerateBatch()); - dbExecutor.execute(new GenerateOffer()); + generateBatch(); + generateOffer(); nextRetxQuery = now + RETX_QUERY_INTERVAL; } if (now >= nextKeepalive) { @@ -142,6 +150,26 @@ class DuplexOutgoingSession implements SyncSession, EventListener { } } + private void generateAck() { + if (generateAckQueued.compareAndSet(false, true)) + dbExecutor.execute(new GenerateAck()); + } + + private void generateBatch() { + if (generateBatchQueued.compareAndSet(false, true)) + dbExecutor.execute(new GenerateBatch()); + } + + private void generateOffer() { + if (generateOfferQueued.compareAndSet(false, true)) + dbExecutor.execute(new GenerateOffer()); + } + + private void generateRequest() { + if (generateRequestQueued.compareAndSet(false, true)) + dbExecutor.execute(new GenerateRequest()); + } + @Override public void interrupt() { interrupted = true; @@ -154,20 +182,20 @@ class DuplexOutgoingSession implements SyncSession, EventListener { ContactRemovedEvent c = (ContactRemovedEvent) e; if (c.getContactId().equals(contactId)) interrupt(); } else if (e instanceof MessageSharedEvent) { - dbExecutor.execute(new GenerateOffer()); + generateOffer(); } else if (e instanceof GroupVisibilityUpdatedEvent) { GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; if (g.getAffectedContacts().contains(contactId)) - dbExecutor.execute(new GenerateOffer()); + generateOffer(); } else if (e instanceof MessageRequestedEvent) { if (((MessageRequestedEvent) e).getContactId().equals(contactId)) - dbExecutor.execute(new GenerateBatch()); + generateBatch(); } else if (e instanceof MessageToAckEvent) { if (((MessageToAckEvent) e).getContactId().equals(contactId)) - dbExecutor.execute(new GenerateAck()); + generateAck(); } else if (e instanceof MessageToRequestEvent) { if (((MessageToRequestEvent) e).getContactId().equals(contactId)) - dbExecutor.execute(new GenerateRequest()); + generateRequest(); } else if (e instanceof ShutdownEvent) { interrupt(); } @@ -179,6 +207,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { @Override public void run() { if (interrupted) return; + if (!generateAckQueued.getAndSet(false)) throw new AssertionError(); try { Ack a; Transaction txn = db.startTransaction(false); @@ -212,7 +241,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { if (interrupted) return; recordWriter.writeAck(ack); LOG.info("Sent ack"); - dbExecutor.execute(new GenerateAck()); + generateAck(); } } @@ -222,6 +251,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { @Override public void run() { if (interrupted) return; + if (!generateBatchQueued.getAndSet(false)) + throw new AssertionError(); try { Collection<byte[]> b; Transaction txn = db.startTransaction(false); @@ -256,7 +287,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { if (interrupted) return; for (byte[] raw : batch) recordWriter.writeMessage(raw); LOG.info("Sent batch"); - dbExecutor.execute(new GenerateBatch()); + generateBatch(); } } @@ -266,6 +297,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { @Override public void run() { if (interrupted) return; + if (!generateOfferQueued.getAndSet(false)) + throw new AssertionError(); try { Offer o; Transaction txn = db.startTransaction(false); @@ -300,7 +333,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { if (interrupted) return; recordWriter.writeOffer(offer); LOG.info("Sent offer"); - dbExecutor.execute(new GenerateOffer()); + generateOffer(); } } @@ -310,6 +343,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { @Override public void run() { if (interrupted) return; + if (!generateRequestQueued.getAndSet(false)) + throw new AssertionError(); try { Request r; Transaction txn = db.startTransaction(false); @@ -343,7 +378,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { if (interrupted) return; recordWriter.writeRequest(request); LOG.info("Sent request"); - dbExecutor.execute(new GenerateRequest()); + generateRequest(); } } }