Allow retransmission if faster.

* This commit introduces an estimated time of arrival (eta) to the
message status which helps to decide whether a message should be
retransmitted over a faster transport.
parent 6925dfcb
package org.briarproject.bramble.test;
import org.briarproject.bramble.api.system.Clock;
public class ArrayClock implements Clock {
private final long[] times;
private int index = 0;
public ArrayClock(long... times) {
this.times = times;
}
@Override
public long currentTimeMillis() {
return times[index++];
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);
}
}
......@@ -421,7 +421,7 @@ interface Database<T> {
* Read-only.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages) throws DbException;
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be requested from
......@@ -438,8 +438,8 @@ interface Database<T> {
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength)
throws DbException;
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
int maxLatency) throws DbException;
/**
* Returns the IDs of any messages that need to be validated.
......@@ -482,7 +482,7 @@ interface Database<T> {
* Read-only.
*/
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength) throws DbException;
int maxLength, int maxLatency) throws DbException;
/**
* Returns all settings in the given namespace.
......@@ -647,11 +647,11 @@ interface Database<T> {
throws DbException;
/**
* Updates the transmission count and expiry time of the given message
* with respect to the given contact, using the latency of the transport
* over which it was sent.
* Updates the transmission count, expiry time and estimated time of arrival
* of the given message with respect to the given contact, using the latency
* of the transport over which it was sent.
*/
void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency)
void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException;
/**
......
......@@ -313,11 +313,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength);
Collection<MessageId> ids =
db.getMessagesToSend(txn, c, maxLength, maxLatency);
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
......@@ -333,9 +334,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToOffer(txn, c, maxMessages);
Collection<MessageId> ids =
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null;
for (MessageId m : ids) db.updateExpiryTime(txn, c, m, maxLatency);
for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
return new Offer(ids);
}
......@@ -362,12 +365,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getRequestedMessagesToSend(txn, c,
maxLength);
Collection<MessageId> ids =
db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
......@@ -855,7 +858,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException("Shared undelivered message");
throw new IllegalArgumentException(
"Shared undelivered message");
db.setMessageShared(txn, m);
transaction.attach(new MessageSharedEvent(m));
}
......@@ -881,7 +885,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchMessageException();
State dependentState = db.getMessageState(txn, dependent.getId());
for (MessageId dependency : dependencies) {
db.addMessageDependency(txn, dependent, dependency, dependentState);
db.addMessageDependency(txn, dependent, dependency,
dependentState);
}
}
......@@ -913,7 +918,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
for (KeySet ks : keys) {
TransportId t = ks.getTransportKeys().getTransportId();
if (db.containsTransport(txn, t)) db.updateTransportKeys(txn, ks);
if (db.containsTransport(txn, t))
db.updateTransportKeys(txn, ks);
}
}
}
......@@ -38,6 +38,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
......@@ -55,7 +56,6 @@ import java.util.logging.Logger;
import javax.annotation.Nullable;
import static java.sql.Types.INTEGER;
import static java.util.Collections.singletonList;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
......@@ -79,7 +79,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 39;
static final int CODE_SCHEMA_VERSION = 40;
// Rotation period offsets for incoming transport keys
private static final int OFFSET_PREV = -1;
......@@ -219,6 +219,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " requested BOOLEAN NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL,"
+ " eta BIGINT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
......@@ -397,7 +398,7 @@ abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
List<Migration<Connection>> getMigrations() {
return singletonList(new Migration38_39());
return Arrays.asList(new Migration38_39(), new Migration39_40());
}
private void storeSchemaVersion(Connection txn, int version)
......@@ -805,8 +806,9 @@ abstract class JdbcDatabase implements Database<Connection> {
try {
String sql = "INSERT INTO statuses (messageId, contactId, groupId,"
+ " timestamp, length, state, groupShared, messageShared,"
+ " deleted, ack, seen, requested, expiry, txCount)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0)";
+ " deleted, ack, seen, requested, expiry, txCount, eta)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, FALSE, 0, 0,"
+ " 0)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
......@@ -1869,8 +1871,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public Collection<MessageId> getMessagesToOffer(Connection txn,
ContactId c, int maxMessages) throws DbException {
ContactId c, int maxMessages, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency * 2;
PreparedStatement ps = null;
ResultSet rs = null;
try {
......@@ -1879,13 +1882,14 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND expiry < ?"
+ " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setInt(4, maxMessages);
ps.setLong(4, eta);
ps.setInt(5, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
......@@ -1926,8 +1930,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c,
int maxLength) throws DbException {
int maxLength, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency * 2;
PreparedStatement ps = null;
ResultSet rs = null;
try {
......@@ -1936,12 +1941,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE"
+ " AND expiry < ?"
+ " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
int total = 0;
......@@ -2055,8 +2061,9 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength) throws DbException {
ContactId c, int maxLength, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency * 2;
PreparedStatement ps = null;
ResultSet rs = null;
try {
......@@ -2065,12 +2072,13 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = TRUE"
+ " AND expiry < ?"
+ " AND (expiry < ? OR eta > ?)"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setLong(3, now);
ps.setLong(4, eta);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
int total = 0;
......@@ -2881,7 +2889,7 @@ abstract class JdbcDatabase implements Database<Connection> {
}
@Override
public void updateExpiryTime(Connection txn, ContactId c, MessageId m,
public void updateExpiryTimeAndEta(Connection txn, ContactId c, MessageId m,
int maxLatency) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
......@@ -2897,13 +2905,16 @@ abstract class JdbcDatabase implements Database<Connection> {
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
sql = "UPDATE statuses SET expiry = ?, txCount = txCount + 1"
sql = "UPDATE statuses"
+ " SET expiry = ?, txCount = txCount + 1, eta = ?"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
long now = clock.currentTimeMillis();
long eta = now + maxLatency * 2;
ps.setLong(1, calculateExpiry(now, maxLatency, txCount));
ps.setBytes(2, m.getBytes());
ps.setInt(3, c.getInt());
ps.setLong(2, eta);
ps.setBytes(3, m.getBytes());
ps.setInt(4, c.getInt());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
class Migration39_40 implements Migration<Connection> {
private static final Logger LOG =
Logger.getLogger(Migration39_40.class.getName());
@Override
public int getStartVersion() {
return 39;
}
@Override
public int getEndVersion() {
return 40;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE statuses"
+ " ADD eta BIGINT");
s.execute("UPDATE statuses SET eta = 0");
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN eta"
+ " SET NOT NULL");
} catch (SQLException e) {
tryToClose(s);
throw new DbException(e);
}
}
private void tryToClose(@Nullable Statement s) {
try {
if (s != null) s.close();
} catch (SQLException e) {
logException(LOG, WARNING, e);
}
}
}
......@@ -11,6 +11,8 @@ import java.util.HashSet;
import java.util.Set;
import static junit.framework.TestCase.assertTrue;
import org.briarproject.bramble.test.ArrayClock;
import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
import static org.briarproject.bramble.util.StringUtils.getRandomString;
import static org.junit.Assert.assertEquals;
......@@ -74,24 +76,4 @@ public class ScryptKdfTest extends BrambleTestCase {
PasswordBasedKdf kdf = new ScryptKdf(clock);
assertEquals(256, kdf.chooseCostParameter());
}
private static class ArrayClock implements Clock {
private final long[] times;
private int index = 0;
private ArrayClock(long... times) {
this.times = times;
}
@Override
public long currentTimeMillis() {
return times[index++];
}
@Override
public void sleep(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds);
}
}
}
......@@ -871,15 +871,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2);
MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
......@@ -907,11 +907,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToOffer(txn, contactId, 123);
oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
will(returnValue(ids));
oneOf(database).updateExpiryTime(txn, contactId, messageId,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
oneOf(database).updateExpiryTime(txn, contactId, messageId1,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency);
oneOf(database).commitTransaction(txn);
}});
......@@ -967,15 +967,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getRequestedMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2);
MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1,
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId1,
maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
......
......@@ -96,6 +96,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
*/
private static final int STEADY_STATE_BLOCKS = 5;
// All our transports use a maximum latency of 30 seconds
private static final int MAX_LATENCY = 30 * 1000;
protected final File testDir = getTestDirectory();
private final File resultsFile = new File(getTestName() + ".tsv");
protected final Random random = new Random();
......@@ -448,7 +451,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getMessagesToOffer(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS);
MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn);
});
}
......@@ -470,7 +473,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getMessagesToSend(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS);
MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn);
});
}
......@@ -521,7 +524,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS);
MAX_MESSAGE_IDS, MAX_LATENCY);
db.commitTransaction(txn);
});
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment