diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index ff6ef3885b8835e063be8fe2542dde15b8d77fdf..a76138c1f98b83e0230ae782e2ab7f4a8012380f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -378,6 +378,16 @@ public interface DatabaseComponent { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /* + * Returns the next time (in milliseconds since the Unix epoch) when a + * message is due to be sent to the given contact. The returned value may + * be zero if a message is due to be sent immediately, or Long.MAX_VALUE if + * no messages are scheduled to be sent. + * <p/> + * Read-only. + */ + long getNextSendTime(Transaction txn, ContactId c) throws DbException; + /** * Returns all settings in the given namespace. * <p/> diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index ea695a5fb9ce682aa66cf47f2ce2282519c3ef87..9d1c7636498a869c856ad57a134514725fdd2dbc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -449,6 +449,16 @@ interface Database<T> { Collection<MessageId> getMessagesToShare(T txn, ClientId c) throws DbException; + /** + * Returns the next time (in milliseconds since the Unix epoch) when a + * message is due to be sent to the given contact. The returned value may + * be zero if a message is due to be sent immediately, or Long.MAX_VALUE + * if no messages are scheduled to be sent. + * <p/> + * Read-only. + */ + long getNextSendTime(T txn, ContactId c) throws DbException; + /** * Returns the message with the given ID, in serialised form, or null if * the message has been deleted. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 53b379073d5d4c36cec8f50d93e526f596c5f334..af80da92b3c7f8d7499b1a0e126d0af9f75f42e0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -569,6 +569,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageDependents(txn, m); } + @Override + public long getNextSendTime(Transaction transaction, ContactId c) + throws DbException { + T txn = unbox(transaction); + return db.getNextSendTime(txn, c); + } + @Override public Settings getSettings(Transaction transaction, String namespace) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index dcd758f9e3208a89044db35427ebfba5bb9dbf8d..809e8f9b2d8b4215678bb169eb0e50cc0ec949db 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -1929,6 +1929,37 @@ abstract class JdbcDatabase implements Database<Connection> { } } + @Override + public long getNextSendTime(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT expiry FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE" + + " AND seen = FALSE AND requested = FALSE" + + " ORDER BY expiry LIMIT 1"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + long nextSendTime = Long.MAX_VALUE; + if (rs.next()) { + nextSendTime = rs.getLong(1); + if (rs.next()) throw new AssertionError(); + } + rs.close(); + ps.close(); + return nextSendTime; + } catch (SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + @Override @Nullable public byte[] getRawMessage(Connection txn, MessageId m) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index f5e892405ece5f7569759a1024a04ffff696c222..3fdabc01dcc239959f3c54d339c050cc7e6d4536 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -50,13 +51,14 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD @NotNullByDefault class DuplexOutgoingSession implements SyncSession, EventListener { - // Check for retransmittable records once every 60 seconds - private static final int RETX_QUERY_INTERVAL = 60 * 1000; private static final Logger LOG = Logger.getLogger(DuplexOutgoingSession.class.getName()); private static final ThrowingRunnable<IOException> CLOSE = () -> { }; + private static final ThrowingRunnable<IOException> + NEXT_SEND_TIME_DECREASED = () -> { + }; private final DatabaseComponent db; private final Executor dbExecutor; @@ -72,6 +74,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false); private final AtomicBoolean generateRequestQueued = new AtomicBoolean(false); + private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE); private volatile boolean interrupted = false; @@ -101,15 +104,15 @@ class DuplexOutgoingSession implements SyncSession, EventListener { generateRequest(); long now = clock.currentTimeMillis(); long nextKeepalive = now + maxIdleTime; - long nextRetxQuery = now + RETX_QUERY_INTERVAL; boolean dataToFlush = true; // Write records until interrupted try { while (!interrupted) { // Work out how long we should wait for a record now = clock.currentTimeMillis(); - long wait = Math.min(nextKeepalive, nextRetxQuery) - now; - if (wait < 0) wait = 0; + long keepaliveWait = Math.max(0, nextKeepalive - now); + long sendWait = Math.max(0, nextSendTime.get() - now); + long wait = Math.min(keepaliveWait, sendWait); // Flush any unflushed data if we're going to wait if (wait > 0 && dataToFlush && writerTasks.isEmpty()) { recordWriter.flush(); @@ -121,20 +124,25 @@ class DuplexOutgoingSession implements SyncSession, EventListener { MILLISECONDS); if (task == null) { now = clock.currentTimeMillis(); - if (now >= nextRetxQuery) { - // Check for retransmittable records + if (now >= nextSendTime.get()) { + // Check for retransmittable messages + LOG.info("Checking for retransmittable messages"); + setNextSendTime(Long.MAX_VALUE); generateBatch(); generateOffer(); - nextRetxQuery = now + RETX_QUERY_INTERVAL; } if (now >= nextKeepalive) { // Flush the stream to keep it alive + LOG.info("Sending keepalive"); recordWriter.flush(); dataToFlush = false; nextKeepalive = now + maxIdleTime; } } else if (task == CLOSE) { + LOG.info("Closed"); break; + } else if (task == NEXT_SEND_TIME_DECREASED) { + LOG.info("Next send time decreased"); } else { task.run(); dataToFlush = true; @@ -170,6 +178,11 @@ class DuplexOutgoingSession implements SyncSession, EventListener { dbExecutor.execute(new GenerateRequest()); } + private void setNextSendTime(long time) { + long old = nextSendTime.getAndSet(time); + if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED); + } + @Override public void interrupt() { interrupted = true; @@ -259,6 +272,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { try { b = db.generateRequestedBatch(txn, contactId, MAX_RECORD_PAYLOAD_LENGTH, maxLatency); + setNextSendTime(db.getNextSendTime(txn, contactId)); db.commitTransaction(txn); } finally { db.endTransaction(txn); @@ -305,6 +319,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { try { o = db.generateOffer(txn, contactId, MAX_MESSAGE_IDS, maxLatency); + setNextSendTime(db.getNextSendTime(txn, contactId)); db.commitTransaction(txn); } finally { db.endTransaction(txn); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index f71f54f68c04ef231a3d81b7bed90ac821e4db9d..a0f040c2067dac2d86137d95de328c543ccbc493 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -1605,7 +1605,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { @Test public void testSetMessageState() throws Exception { - Database<Connection> db = open(false); Connection txn = db.startTransaction(); @@ -1626,6 +1625,52 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.close(); } + @Test + public void testGetNextSendTime() throws Exception { + long now = System.currentTimeMillis(); + Database<Connection> db = open(false, new StoppedClock(now)); + Connection txn = db.startTransaction(); + + // Add a contact, a group and a message + db.addLocalAuthor(txn, localAuthor); + assertEquals(contactId, db.addContact(txn, author, localAuthor.getId(), + true, true)); + db.addGroup(txn, group); + db.addMessage(txn, message, UNKNOWN, false, null); + + // There should be no messages to send + assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); + + // Share the group with the contact - still no messages to send + db.addGroupVisibility(txn, contactId, groupId, true); + assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); + + // Set the message's state to DELIVERED - still no messages to send + db.setMessageState(txn, messageId, DELIVERED); + assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); + + // Share the message - now it should be sendable immediately + db.setMessageShared(txn, messageId); + assertEquals(0, db.getNextSendTime(txn, contactId)); + + // Update the message's expiry time as though we sent it - now the + // message should be sendable after one round-trip + db.updateExpiryTime(txn, contactId, messageId, 1000); + assertEquals(now + 2000, db.getNextSendTime(txn, contactId)); + + // Update the message's expiry time again - now it should be sendable + // after two round-trips + db.updateExpiryTime(txn, contactId, messageId, 1000); + assertEquals(now + 4000, db.getNextSendTime(txn, contactId)); + + // Delete the message - there should be no messages to send + db.deleteMessage(txn, messageId); + assertEquals(Long.MAX_VALUE, db.getNextSendTime(txn, contactId)); + + db.commitTransaction(txn); + db.close(); + } + @Test public void testExceptionHandling() throws Exception { Database<Connection> db = open(false); @@ -1643,8 +1688,13 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { } private Database<Connection> open(boolean resume) throws Exception { + return open(resume, new SystemClock()); + } + + private Database<Connection> open(boolean resume, Clock clock) + throws Exception { Database<Connection> db = createDatabase( - new TestDatabaseConfig(testDir, MAX_SIZE), new SystemClock()); + new TestDatabaseConfig(testDir, MAX_SIZE), clock); if (!resume) TestUtils.deleteTestDirectory(testDir); db.open(); return db; @@ -1674,4 +1724,23 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { public void tearDown() { TestUtils.deleteTestDirectory(testDir); } + + private static class StoppedClock implements Clock { + + private final long time; + + private StoppedClock(long time) { + this.time = time; + } + + @Override + public long currentTimeMillis() { + return time; + } + + @Override + public void sleep(long milliseconds) throws InterruptedException { + Thread.sleep(milliseconds); + } + } }