Commit a9c35cf5 authored by akwizgran's avatar akwizgran

Add database methods for selecting small messages.

parent 4b6ac085
Pipeline #3618 passed with stage
in 8 minutes and 54 seconds
......@@ -163,19 +163,23 @@ public interface DatabaseComponent extends TransactionManager {
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
*
* @param small True if only single-block messages should be sent
*/
@Nullable
Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
int maxLength, int maxLatency, boolean small) throws DbException;
/**
* Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no
* messages to offer.
*
* @param small True if only single-block messages should be offered
*/
@Nullable
Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
int maxLatency) throws DbException;
int maxLatency, boolean small) throws DbException;
/**
* Returns a request for the given contact, or null if there are no
......
......@@ -29,10 +29,15 @@ public interface SyncConstants {
*/
int MESSAGE_HEADER_LENGTH = UniqueId.LENGTH + 8;
/**
* The maximum length of a block in bytes.
*/
int MAX_BLOCK_LENGTH = 32 * 1024; // 32 KiB
/**
* The maximum length of a message body in bytes.
*/
int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB
int MAX_MESSAGE_BODY_LENGTH = MAX_BLOCK_LENGTH;
/**
* The maximum length of a message in bytes.
......
......@@ -446,6 +446,15 @@ interface Database<T> {
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some single-block messages that are eligible to be
* offered to the given contact, up to the given number of messages.
* <p/>
* Read-only.
*/
Collection<MessageId> getSmallMessagesToOffer(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
......@@ -464,6 +473,15 @@ interface Database<T> {
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
int maxLatency) throws DbException;
/**
* Returns the IDs of some single-block messages that are eligible to be
* sent to the given contact, up to the given total length.
* <p/>
* Read-only.
*/
Collection<MessageId> getSmallMessagesToSend(T txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
* Returns the IDs of any messages that need to be validated.
* <p/>
......
......@@ -401,13 +401,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Collection<Message> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
ContactId c, int maxLength, int maxLatency, boolean small)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToSend(txn, c, maxLength, maxLatency);
Collection<MessageId> ids;
if (small)
ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency);
else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency);
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
......@@ -422,13 +425,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Offer generateOffer(Transaction transaction, ContactId c,
int maxMessages, int maxLatency) throws DbException {
int maxMessages, int maxLatency, boolean small) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
Collection<MessageId> ids;
if (small)
ids = db.getSmallMessagesToOffer(txn, c, maxMessages, maxLatency);
else ids = db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null;
for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
......@@ -443,8 +448,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToRequest(txn, c,
maxMessages);
Collection<MessageId> ids =
db.getMessagesToRequest(txn, c, maxMessages);
if (ids.isEmpty()) return null;
db.removeOfferedMessages(txn, c, ids);
return new Request(ids);
......
......@@ -75,6 +75,8 @@ import static org.briarproject.bramble.api.db.Metadata.REMOVE;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.PENDING;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
......@@ -178,7 +180,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " state INT NOT NULL,"
+ " shared BOOLEAN NOT NULL,"
+ " temporary BOOLEAN NOT NULL,"
+ " length INT NOT NULL,"
+ " length INT NOT NULL," // Includes message header
+ " deleted BOOLEAN NOT NULL,"
+ " blockCount INT NOT NULL,"
+ " PRIMARY KEY (messageId),"
......@@ -230,7 +232,7 @@ abstract class JdbcDatabase implements Database<Connection> {
"CREATE TABLE blocks"
+ " (messageId _HASH NOT NULL,"
+ " blockNumber INT NOT NULL,"
+ " blockLength INT NOT NULL,"
+ " blockLength INT NOT NULL," // Excludes block header
+ " data BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId, blockNumber),"
+ " FOREIGN KEY (messageId)"
......@@ -2093,6 +2095,42 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public Collection<MessageId> getSmallMessagesToOffer(Connection txn,
ContactId c, int maxMessages, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND length <= ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE AND requested = FALSE"
+ " AND (expiry <= ? OR eta > ?)"
+ " ORDER BY timestamp LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH);
ps.setLong(4, now);
ps.setLong(5, eta);
ps.setInt(6, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
return ids;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public Collection<MessageId> getMessagesToRequest(Connection txn,
ContactId c, int maxMessages) throws DbException {
......@@ -2157,6 +2195,47 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public Collection<MessageId> getSmallMessagesToSend(Connection txn,
ContactId c, int maxLength, int maxLatency) throws DbException {
long now = clock.currentTimeMillis();
long eta = now + maxLatency;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT length, messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND length <= ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE"
+ " AND seen = FALSE"
+ " AND (expiry <= ? OR eta > ?)"
+ " ORDER BY timestamp";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
ps.setInt(3, MESSAGE_HEADER_LENGTH + MAX_BLOCK_LENGTH);
ps.setLong(4, now);
ps.setLong(5, eta);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
int total = 0;
while (rs.next()) {
int length = rs.getInt(1);
if (total + length > maxLength) break;
ids.add(new MessageId(rs.getBytes(2)));
total += length;
}
rs.close();
ps.close();
return ids;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public Collection<MessageId> getMessagesToValidate(Connection txn)
throws DbException {
......
......@@ -52,7 +52,7 @@ class Migration47_48 implements Migration<Connection> {
s.execute(dbTypes.replaceTypes("CREATE TABLE blocks"
+ " (messageId _HASH NOT NULL,"
+ " blockNumber INT NOT NULL,"
+ " blockLength INT NOT NULL,"
+ " blockLength INT NOT NULL," // Excludes block header
+ " data BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId, blockNumber),"
+ " FOREIGN KEY (messageId)"
......
......@@ -321,7 +321,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
Offer o = db.transactionWithNullableResult(false, txn -> {
Offer offer = db.generateOffer(txn, contactId,
MAX_MESSAGE_IDS, maxLatency);
MAX_MESSAGE_IDS, maxLatency, true);
setNextSendTime(db.getNextSendTime(txn, contactId));
return offer;
});
......
......@@ -174,7 +174,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
Collection<Message> b =
db.transactionWithNullableResult(false, txn ->
db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency));
MAX_RECORD_PAYLOAD_BYTES, maxLatency,
true));
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
......
......@@ -323,7 +323,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try {
db.transaction(false, transaction ->
db.generateBatch(transaction, contactId, 123, 456));
db.generateBatch(transaction, contactId, 123, 456, true));
fail();
} catch (NoSuchContactException expected) {
// Expected
......@@ -331,7 +331,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try {
db.transaction(false, transaction ->
db.generateOffer(transaction, contactId, 123, 456));
db.generateOffer(transaction, contactId, 123, 456, true));
fail();
} catch (NoSuchContactException expected) {
// Expected
......@@ -865,7 +865,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToSend(txn, contactId,
oneOf(database).getSmallMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
......@@ -885,7 +885,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
db.transaction(false, transaction ->
assertEquals(messages, db.generateBatch(transaction, contactId,
MAX_MESSAGE_LENGTH * 2, maxLatency)));
MAX_MESSAGE_LENGTH * 2, maxLatency, true)));
}
@Test
......@@ -897,7 +897,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
oneOf(database).getSmallMessagesToOffer(txn, contactId, 123,
maxLatency);
will(returnValue(ids));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
......@@ -909,7 +910,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency);
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency,
true);
assertNotNull(o);
assertEquals(ids, o.getMessageIds());
});
......
......@@ -61,7 +61,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
......@@ -98,7 +98,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message);
// No more acks
......@@ -110,7 +110,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment