Commit d8b04edc authored by Torsten Grote's avatar Torsten Grote
Browse files

Merge branch '1240-avoid-raw-messages' into 'master'

Avoid raw messages

See merge request briar/briar!906
parents 3db35f70 0bc07cd0
......@@ -151,13 +151,13 @@ public interface DatabaseComponent {
throws DbException;
/**
* Returns a batch of raw messages for the given contact, with a total
* length less than or equal to the given length, for transmission over a
* Returns a batch of messages for the given contact, with a total length
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
*/
@Nullable
Collection<byte[]> generateBatch(Transaction txn, ContactId c,
Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
......@@ -178,14 +178,14 @@ public interface DatabaseComponent {
throws DbException;
/**
* Returns a batch of raw messages for the given contact, with a total
* length less than or equal to the given length, for transmission over a
* Returns a batch of messages for the given contact, with a total length
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Only messages that have been
* requested by the contact are returned. Returns null if there are no
* sendable messages that fit in the given length.
*/
@Nullable
Collection<byte[]> generateRequestedBatch(Transaction txn, ContactId c,
Collection<Message> generateRequestedBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
......@@ -263,6 +263,15 @@ public interface DatabaseComponent {
*/
Collection<LocalAuthor> getLocalAuthors(Transaction txn) throws DbException;
/**
* Returns the message with the given ID.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
*/
Message getMessage(Transaction txn, MessageId m) throws DbException;
/**
* Returns the IDs of all delivered messages in the given group.
* <p/>
......@@ -297,15 +306,6 @@ public interface DatabaseComponent {
Collection<MessageId> getMessagesToShare(Transaction txn)
throws DbException;
/**
* Returns the message with the given ID, in serialised form.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
*/
byte[] getRawMessage(Transaction txn, MessageId m) throws DbException;
/**
* Returns the metadata for all delivered messages in the given group.
* <p/>
......
......@@ -50,7 +50,7 @@ public class Message {
/**
* Returns the length of the raw message in bytes.
*/
public int getLength() {
public int getRawLength() {
return raw.length;
}
......
......@@ -9,7 +9,7 @@ public interface SyncRecordWriter {
void writeAck(Ack a) throws IOException;
void writeMessage(byte[] raw) throws IOException;
void writeMessage(Message m) throws IOException;
void writeOffer(Offer o) throws IOException;
......
......@@ -127,7 +127,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public Message getMessage(Transaction txn, MessageId m) throws DbException {
return messageFactory.createMessage(m, db.getRawMessage(txn, m));
return db.getMessage(txn, m);
}
@Override
......@@ -147,7 +147,7 @@ class ClientHelperImpl implements ClientHelper {
@Override
public BdfList getMessageAsList(Transaction txn, MessageId m)
throws DbException, FormatException {
byte[] raw = db.getRawMessage(txn, m);
byte[] raw = db.getMessage(txn, m).getRaw();
return toList(raw, MESSAGE_HEADER_LENGTH,
raw.length - MESSAGE_HEADER_LENGTH);
}
......
......@@ -298,6 +298,15 @@ interface Database<T> {
*/
Collection<LocalAuthor> getLocalAuthors(T txn) throws DbException;
/**
* Returns the message with the given ID.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
*/
Message getMessage(T txn, MessageId m) throws DbException;
/**
* Returns the IDs and states of all dependencies of the given message.
* For missing dependencies and dependencies in other groups, the state
......@@ -465,15 +474,6 @@ interface Database<T> {
*/
long getNextSendTime(T txn, ContactId c) throws DbException;
/**
* Returns the message with the given ID, in serialised form.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given
......
......@@ -307,16 +307,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Collection<byte[]> generateBatch(Transaction transaction,
public Collection<Message> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength);
List<byte[]> messages = new ArrayList<>(ids.size());
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getRawMessage(txn, m));
messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
......@@ -356,7 +356,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Collection<byte[]> generateRequestedBatch(Transaction transaction,
public Collection<Message> generateRequestedBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
......@@ -364,9 +364,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchContactException();
Collection<MessageId> ids = db.getRequestedMessagesToSend(txn, c,
maxLength);
List<byte[]> messages = new ArrayList<>(ids.size());
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getRawMessage(txn, m));
messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
......@@ -457,6 +457,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getLocalAuthors(txn);
}
@Override
public Message getMessage(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
return db.getMessage(txn, m);
}
@Override
public Collection<MessageId> getMessageIds(Transaction transaction,
GroupId g) throws DbException {
......@@ -487,15 +496,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return db.getMessagesToShare(txn);
}
@Override
public byte[] getRawMessage(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
return db.getRawMessage(txn, m);
}
@Override
public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction,
GroupId g) throws DbException {
......
......@@ -741,7 +741,7 @@ abstract class JdbcDatabase implements Database<Connection> {
boolean offered = removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || (sender != null && c.equals(sender));
addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(),
m.getLength(), state, e.getValue(), messageShared,
m.getRawLength(), state, e.getValue(), messageShared,
false, seen);
}
// Update denormalised column in messageDependencies if dependency
......@@ -1483,6 +1483,32 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public Message getMessage(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT groupId, timestamp, raw FROM messages"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
rs = ps.executeQuery();
if (!rs.next()) throw new DbStateException();
GroupId g = new GroupId(rs.getBytes(1));
long timestamp = rs.getLong(2);
byte[] raw = rs.getBytes(3);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
if (raw == null) throw new MessageDeletedException();
return new Message(m, g, timestamp, raw);
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Override
public Collection<MessageId> getMessageIds(Connection txn, GroupId g)
throws DbException {
......@@ -2019,30 +2045,6 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public byte[] getRawMessage(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT raw FROM messages WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
rs = ps.executeQuery();
if (!rs.next()) throw new DbStateException();
byte[] raw = rs.getBytes(1);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
if (raw == null) throw new MessageDeletedException();
return raw;
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Override
public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength) throws DbException {
......
......@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
......@@ -274,7 +275,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateBatchQueued.getAndSet(false))
throw new AssertionError();
try {
Collection<byte[]> b;
Collection<Message> b;
Transaction txn = db.startTransaction(false);
try {
b = db.generateRequestedBatch(txn, contactId,
......@@ -296,9 +297,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private final Collection<Message> batch;
private WriteBatch(Collection<byte[]> batch) {
private WriteBatch(Collection<Message> batch) {
this.batch = batch;
}
......@@ -306,7 +307,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() throws IOException {
if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw);
for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent batch");
generateBatch();
}
......
......@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.transport.StreamWriter;
......@@ -171,7 +172,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() {
if (interrupted) return;
try {
Collection<byte[]> b;
Collection<Message> b;
Transaction txn = db.startTransaction(false);
try {
b = db.generateBatch(txn, contactId,
......@@ -193,9 +194,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch;
private final Collection<Message> batch;
private WriteBatch(Collection<byte[]> batch) {
private WriteBatch(Collection<Message> batch) {
this.batch = batch;
}
......@@ -203,7 +204,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override
public void run() throws IOException {
if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw);
for (Message m : batch) recordWriter.writeMessage(m);
LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch());
}
......
......@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.record.Record;
import org.briarproject.bramble.api.record.RecordWriter;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Request;
......@@ -44,8 +45,8 @@ class SyncRecordWriterImpl implements SyncRecordWriter {
}
@Override
public void writeMessage(byte[] raw) throws IOException {
writer.writeRecord(new Record(PROTOCOL_VERSION, MESSAGE, raw));
public void writeMessage(Message m) throws IOException {
writer.writeRecord(new Record(PROTOCOL_VERSION, MESSAGE, m.getRaw()));
}
@Override
......
......@@ -16,7 +16,6 @@ import org.briarproject.bramble.api.sync.Group;
import org.briarproject.bramble.api.sync.InvalidMessageException;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageContext;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.ValidationManager;
import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
......@@ -52,7 +51,6 @@ class ValidationManagerImpl implements ValidationManager, Service,
private final DatabaseComponent db;
private final Executor dbExecutor, validationExecutor;
private final MessageFactory messageFactory;
private final Map<ClientMajorVersion, MessageValidator> validators;
private final Map<ClientMajorVersion, IncomingMessageHook> hooks;
private final AtomicBoolean used = new AtomicBoolean(false);
......@@ -60,12 +58,10 @@ class ValidationManagerImpl implements ValidationManager, Service,
@Inject
ValidationManagerImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor,
@ValidationExecutor Executor validationExecutor,
MessageFactory messageFactory) {
@ValidationExecutor Executor validationExecutor) {
this.db = db;
this.dbExecutor = dbExecutor;
this.validationExecutor = validationExecutor;
this.messageFactory = messageFactory;
validators = new ConcurrentHashMap<>();
hooks = new ConcurrentHashMap<>();
}
......@@ -128,8 +124,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
Transaction txn = db.startTransaction(true);
try {
MessageId id = unvalidated.poll();
byte[] raw = db.getRawMessage(txn, id);
m = messageFactory.createMessage(id, raw);
m = db.getMessage(txn, id);
g = db.getGroup(txn, m.getGroupId());
db.commitTransaction(txn);
} finally {
......@@ -196,8 +191,7 @@ class ValidationManagerImpl implements ValidationManager, Service,
invalidateMessage(txn, id);
invalidate = getDependentsToInvalidate(txn, id);
} else if (allDelivered) {
byte[] raw = db.getRawMessage(txn, id);
Message m = messageFactory.createMessage(id, raw);
Message m = db.getMessage(txn, id);
Group g = db.getGroup(txn, m.getGroupId());
ClientId c = g.getClientId();
int majorVersion = g.getMajorVersion();
......
......@@ -71,7 +71,6 @@ public class ClientHelperImplTest extends BrambleTestCase {
private final Message message = getMessage(groupId);
private final MessageId messageId = message.getId();
private final long timestamp = message.getTimestamp();
private final byte[] rawMessage = message.getRaw();
private final Metadata metadata = new Metadata();
private final BdfList list = BdfList.of("Sign this!", getRandomBytes(42));
private final String label = StringUtils.getRandomString(5);
......@@ -120,8 +119,8 @@ public class ClientHelperImplTest extends BrambleTestCase {
context.checking(new Expectations() {{
oneOf(db).startTransaction(true);
will(returnValue(txn));
oneOf(db).getRawMessage(txn, messageId);
will(returnValue(rawMessage));
oneOf(db).getMessage(txn, messageId);
will(returnValue(message));
oneOf(db).commitTransaction(txn);
oneOf(db).endTransaction(txn);
}});
......@@ -267,7 +266,7 @@ public class ClientHelperImplTest extends BrambleTestCase {
public void testToList() throws Exception {
expectToList(true);
assertEquals(list, clientHelper.toList(rawMessage));
assertEquals(list, clientHelper.toList(getRandomBytes(123)));
context.assertIsSatisfied();
}
......@@ -276,7 +275,7 @@ public class ClientHelperImplTest extends BrambleTestCase {
expectToList(false); // no EOF after list
try {
clientHelper.toList(rawMessage);
clientHelper.toList(getRandomBytes(123));
fail();
} catch (FormatException e) {
// expected
......
......@@ -99,9 +99,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
private final Group group;
private final Author author;
private final LocalAuthor localAuthor;
private final Message message;
private final Message message, message1;
private final MessageId messageId, messageId1;
private final byte[] raw, raw1;
private final Metadata metadata;
private final TransportId transportId;
private final int maxLatency;
......@@ -117,11 +116,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
author = getAuthor();
localAuthor = getLocalAuthor();
message = getMessage(groupId);
Message message1 = getMessage(groupId);
message1 = getMessage(groupId);
messageId = message.getId();
messageId1 = message1.getId();
raw = message.getRaw();
raw1 = message1.getRaw();
metadata = new Metadata();
metadata.put("foo", new byte[] {'b', 'a', 'r'});
transportId = getTransportId();
......@@ -647,7 +644,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
transaction = db.startTransaction(false);
try {
db.getRawMessage(transaction, messageId);
db.getMessage(transaction, messageId);
fail();
} catch (NoSuchMessageException expected) {
// Expected
......@@ -867,7 +864,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
@Test
public void testGenerateBatch() throws Exception {
Collection<MessageId> ids = Arrays.asList(messageId, messageId1);
Collection<byte[]> messages = Arrays.asList(raw, raw1);
Collection<Message> messages = Arrays.asList(message, message1);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
......@@ -876,12 +873,12 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).getMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2);
will(returnValue(ids));
oneOf(database).getRawMessage(txn, messageId);
will(returnValue(raw));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId,
maxLatency);
oneOf(database).getRawMessage(txn, messageId1);
will(returnValue(raw1));
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1,
maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
......@@ -963,7 +960,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
@Test
public void testGenerateRequestedBatch() throws Exception {
Collection<MessageId> ids = Arrays.asList(messageId, messageId1);
Collection<byte[]> messages = Arrays.asList(raw, raw1);
Collection<Message> messages = Arrays.asList(message, message1);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
......@@ -972,12 +969,12 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).getRequestedMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2);
will(returnValue(ids));
oneOf(database).getRawMessage(txn, messageId);
will(returnValue(raw));
oneOf(database).getMessage(txn, messageId);
will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId,
maxLatency);
oneOf(database).getRawMessage(txn, messageId1);
will(returnValue(raw1));
oneOf(database).getMessage(txn, messageId1);
will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1,
maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids);
......
......@@ -506,11 +506,11 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
}
@Test
public void testGetRawMessage() throws Exception {
String name = "getRawMessage(T, MessageId)";
public void testGetMessage() throws Exception {
String name = "getMessage(T, MessageId)";
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getRawMessage(txn, pickRandom(messages).getId());
db.getMessage(txn, pickRandom(messages).getId());
db.commitTransaction(txn);
});
}
......
......@@ -144,7 +144,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(db.containsContact(txn, contactId));
assertTrue(db.containsGroup(txn, groupId));
assertTrue(db.containsMessage(txn, messageId));
assertArrayEquals(message.getRaw(), db.getRawMessage(txn, messageId));
assertArrayEquals(message.getRaw(),
db.getMessage(txn, messageId).getRaw());