Commit 37f02a40 authored by Torsten Grote's avatar Torsten Grote

Merge branch '1585-temporary-messages' into 'master'

Add support for temporary messages

See merge request !1132
parents dd7accfa 3c8b8c39
Pipeline #3529 passed with stage
in 9 minutes and 13 seconds
......@@ -77,7 +77,7 @@ public interface DatabaseComponent extends TransactionManager {
* Stores a local message.
*/
void addLocalMessage(Transaction txn, Message m, Metadata meta,
boolean shared) throws DbException;
boolean shared, boolean temporary) throws DbException;
/**
* Stores a pending contact.
......@@ -510,6 +510,12 @@ public interface DatabaseComponent extends TransactionManager {
void removePendingContact(Transaction txn, PendingContactId p)
throws DbException;
/**
* Removes all temporary messages (and all associated state) from the
* database.
*/
void removeTemporaryMessages(Transaction txn) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/
......@@ -538,6 +544,11 @@ public interface DatabaseComponent extends TransactionManager {
void setGroupVisibility(Transaction txn, ContactId c, GroupId g,
Visibility v) throws DbException;
/**
* Marks the given message as permanent, i.e. not temporary.
*/
void setMessagePermanent(Transaction txn, MessageId m) throws DbException;
/**
* Marks the given message as shared.
*/
......
......@@ -92,7 +92,8 @@ class ClientHelperImpl implements ClientHelper {
public void addLocalMessage(Transaction txn, Message m,
BdfDictionary metadata, boolean shared)
throws DbException, FormatException {
db.addLocalMessage(txn, m, metadataEncoder.encode(metadata), shared);
db.addLocalMessage(txn, m, metadataEncoder.encode(metadata), shared,
false);
}
@Override
......
......@@ -115,7 +115,7 @@ interface Database<T> {
* if the message was created locally.
*/
void addMessage(T txn, Message m, MessageState state, boolean shared,
@Nullable ContactId sender) throws DbException;
boolean temporary, @Nullable ContactId sender) throws DbException;
/**
* Adds a dependency between two messages, where the dependent message is
......@@ -630,6 +630,12 @@ interface Database<T> {
*/
void removePendingContact(T txn, PendingContactId p) throws DbException;
/**
* Removes all temporary messages (and all associated state) from the
* database.
*/
void removeTemporaryMessages(T txn) throws DbException;
/**
* Removes a transport (and all associated state) from the database.
*/
......@@ -671,6 +677,11 @@ interface Database<T> {
void setHandshakeKeyPair(T txn, AuthorId local, PublicKey publicKey,
PrivateKey privateKey) throws DbException;
/**
* Marks the given message as permanent, i.e. not temporary.
*/
void setMessagePermanent(T txn, MessageId m) throws DbException;
/**
* Marks the given message as shared.
*/
......
......@@ -273,13 +273,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Override
public void addLocalMessage(Transaction transaction, Message m,
Metadata meta, boolean shared) throws DbException {
Metadata meta, boolean shared, boolean temporary)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsGroup(txn, m.getGroupId()))
throw new NoSuchGroupException();
if (!db.containsMessage(txn, m.getId())) {
db.addMessage(txn, m, DELIVERED, shared, null);
db.addMessage(txn, m, DELIVERED, shared, temporary, null);
transaction.attach(new MessageAddedEvent(m, null));
transaction.attach(new MessageStateChangedEvent(m.getId(), true,
DELIVERED));
......@@ -800,7 +801,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.raiseSeenFlag(txn, c, m.getId());
db.raiseAckFlag(txn, c, m.getId());
} else {
db.addMessage(txn, m, UNKNOWN, false, c);
db.addMessage(txn, m, UNKNOWN, false, false, c);
transaction.attach(new MessageAddedEvent(m, c));
}
transaction.attach(new MessageToAckEvent(c));
......@@ -908,6 +909,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new PendingContactRemovedEvent(p));
}
@Override
public void removeTemporaryMessages(Transaction transaction)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
db.removeTemporaryMessages(txn);
}
@Override
public void removeTransport(Transaction transaction, TransportId t)
throws DbException {
......@@ -967,6 +976,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
@Override
public void setMessagePermanent(Transaction transaction, MessageId m)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
db.setMessagePermanent(txn, m);
}
@Override
public void setMessageShared(Transaction transaction, MessageId m)
throws DbException {
......@@ -975,8 +994,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
if (db.getMessageState(txn, m) != DELIVERED)
throw new IllegalArgumentException(
"Shared undelivered message");
throw new IllegalArgumentException("Shared undelivered message");
db.setMessageShared(txn, m);
transaction.attach(new MessageSharedEvent(m));
}
......
......@@ -62,6 +62,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import static java.sql.Types.BINARY;
import static java.sql.Types.BOOLEAN;
......@@ -97,7 +98,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing
static final int CODE_SCHEMA_VERSION = 45;
static final int CODE_SCHEMA_VERSION = 46;
// Time period offsets for incoming transport keys
private static final int OFFSET_PREV = -1;
......@@ -177,6 +178,7 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " timestamp BIGINT NOT NULL,"
+ " state INT NOT NULL,"
+ " shared BOOLEAN NOT NULL,"
+ " temporary BOOLEAN NOT NULL,"
+ " length INT NOT NULL,"
+ " raw BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId),"
......@@ -336,25 +338,26 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final Logger LOG =
getLogger(JdbcDatabase.class.getName());
// Different database libraries use different names for certain types
private final MessageFactory messageFactory;
private final Clock clock;
private final DatabaseTypes dbTypes;
// Locking: connectionsLock
private final Lock connectionsLock = new ReentrantLock();
private final Condition connectionsChanged = connectionsLock.newCondition();
@GuardedBy("connectionsLock")
private final LinkedList<Connection> connections = new LinkedList<>();
private int openConnections = 0; // Locking: connectionsLock
private boolean closed = false; // Locking: connectionsLock
@GuardedBy("connectionsLock")
private int openConnections = 0;
@GuardedBy("connectionsLock")
private boolean closed = false;
protected abstract Connection createConnection()
throws DbException, SQLException;
protected abstract void compactAndClose() throws DbException;
private final Lock connectionsLock = new ReentrantLock();
private final Condition connectionsChanged = connectionsLock.newCondition();
JdbcDatabase(DatabaseTypes databaseTypes, MessageFactory messageFactory,
Clock clock) {
this.dbTypes = databaseTypes;
......@@ -457,7 +460,8 @@ abstract class JdbcDatabase implements Database<Connection> {
new Migration41_42(dbTypes),
new Migration42_43(dbTypes),
new Migration43_44(dbTypes),
new Migration44_45()
new Migration44_45(),
new Migration45_46()
);
}
......@@ -777,22 +781,23 @@ abstract class JdbcDatabase implements Database<Connection> {
@Override
public void addMessage(Connection txn, Message m, MessageState state,
boolean messageShared, @Nullable ContactId sender)
boolean shared, boolean temporary, @Nullable ContactId sender)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "INSERT INTO messages (messageId, groupId, timestamp,"
+ " state, shared, length, raw)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?)";
+ " state, shared, temporary, length, raw)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getId().getBytes());
ps.setBytes(2, m.getGroupId().getBytes());
ps.setLong(3, m.getTimestamp());
ps.setInt(4, state.getValue());
ps.setBoolean(5, messageShared);
ps.setBoolean(5, shared);
ps.setBoolean(6, temporary);
byte[] raw = messageFactory.getRawMessage(m);
ps.setInt(6, raw.length);
ps.setBytes(7, raw);
ps.setInt(7, raw.length);
ps.setBytes(8, raw);
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
......@@ -804,8 +809,7 @@ abstract class JdbcDatabase implements Database<Connection> {
boolean offered = removeOfferedMessage(txn, c, m.getId());
boolean seen = offered || c.equals(sender);
addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(),
raw.length, state, e.getValue(), messageShared,
false, seen);
raw.length, state, e.getValue(), shared, false, seen);
}
// Update denormalised column in messageDependencies if dependency
// is in same group as dependent
......@@ -2876,6 +2880,21 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void removeTemporaryMessages(Connection txn) throws DbException {
Statement s = null;
try {
String sql = "DELETE FROM messages WHERE temporary = TRUE";
s = txn.createStatement();
int affected = s.executeUpdate(sql);
if (affected < 0) throw new DbStateException();
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void removeTransport(Connection txn, TransportId t)
throws DbException {
......@@ -3021,6 +3040,24 @@ abstract class JdbcDatabase implements Database<Connection> {
}
}
@Override
public void setMessagePermanent(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
try {
String sql = "UPDATE messages SET temporary = FALSE"
+ " WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void setMessageShared(Connection txn, MessageId m)
throws DbException {
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration45_46 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration45_46.class.getName());
@Override
public int getStartVersion() {
return 45;
}
@Override
public int getEndVersion() {
return 46;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN temporary BOOLEAN NOT NULL"
+ " DEFAULT (FALSE)");
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}
\ No newline at end of file
......@@ -107,8 +107,11 @@ class LifecycleManagerImpl implements LifecycleManager, MigrationListener {
else logDuration(LOG, "Creating database", start);
db.transaction(false, txn -> {
long start1 = now();
db.removeTemporaryMessages(txn);
logDuration(LOG, "Removing temporary messages", start1);
for (OpenDatabaseHook hook : openDatabaseHooks) {
long start1 = now();
start1 = now();
hook.onDatabaseOpened(txn);
if (LOG.isLoggable(FINE)) {
logDuration(LOG, "Calling open database hook "
......
......@@ -243,7 +243,7 @@ class ClientVersioningManagerImpl implements ClientVersioningManager,
try {
Message m = clientHelper.createMessage(localGroup.getId(), now,
body);
db.addLocalMessage(txn, m, new Metadata(), false);
db.addLocalMessage(txn, m, new Metadata(), false, false);
} catch (FormatException e) {
throw new AssertionError(e);
}
......
......@@ -96,7 +96,7 @@ public class ClientHelperImplTest extends BrambleTestCase {
oneOf(db).transaction(with(false), withDbRunnable(txn));
oneOf(metadataEncoder).encode(dictionary);
will(returnValue(metadata));
oneOf(db).addLocalMessage(txn, message, metadata, shared);
oneOf(db).addLocalMessage(txn, message, metadata, shared, false);
}});
clientHelper.addLocalMessage(message, dictionary, shared);
......
......@@ -61,6 +61,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
......@@ -120,6 +121,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
private final Contact contact;
private final KeySetId keySetId;
private final PendingContactId pendingContactId;
private final Random random = new Random();
private final boolean shared = random.nextBoolean();
private final boolean temporary = random.nextBoolean();
public DatabaseComponentImplTest() {
clientId = getClientId();
......@@ -253,7 +257,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
eventExecutor, shutdownManager);
db.transaction(false, transaction ->
db.addLocalMessage(transaction, message, metadata, true));
db.addLocalMessage(transaction, message, metadata, shared,
temporary));
}
@Test
......@@ -265,20 +270,23 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(true));
oneOf(database).containsMessage(txn, messageId);
will(returnValue(false));
oneOf(database).addMessage(txn, message, DELIVERED, true, null);
oneOf(database).addMessage(txn, message, DELIVERED, shared,
temporary, null);
oneOf(database).mergeMessageMetadata(txn, messageId, metadata);
oneOf(database).commitTransaction(txn);
// The message was added, so the listeners should be called
oneOf(eventBus).broadcast(with(any(MessageAddedEvent.class)));
oneOf(eventBus)
.broadcast(with(any(MessageStateChangedEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageSharedEvent.class)));
oneOf(eventBus).broadcast(with(any(
MessageStateChangedEvent.class)));
if (shared)
oneOf(eventBus).broadcast(with(any(MessageSharedEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
db.transaction(false, transaction ->
db.addLocalMessage(transaction, message, metadata, true));
db.addLocalMessage(transaction, message, metadata, shared,
temporary));
}
@Test
......@@ -569,11 +577,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
throws Exception {
context.checking(new Expectations() {{
// Check whether the message is in the DB (which it's not)
exactly(11).of(database).startTransaction();
exactly(12).of(database).startTransaction();
will(returnValue(txn));
exactly(11).of(database).containsMessage(txn, messageId);
exactly(12).of(database).containsMessage(txn, messageId);
will(returnValue(false));
exactly(11).of(database).abortTransaction(txn);
exactly(12).of(database).abortTransaction(txn);
// Allow other checks to pass
allowing(database).containsContact(txn, contactId);
will(returnValue(true));
......@@ -637,6 +645,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// Expected
}
try {
db.transaction(false, transaction ->
db.setMessagePermanent(transaction, message.getId()));
fail();
} catch (NoSuchMessageException expected) {
// Expected
}
try {
db.transaction(false, transaction ->
db.setMessageShared(transaction, message.getId()));
......@@ -972,7 +988,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(VISIBLE));
oneOf(database).containsMessage(txn, messageId);
will(returnValue(false));
oneOf(database).addMessage(txn, message, UNKNOWN, false, contactId);
oneOf(database).addMessage(txn, message, UNKNOWN, false, false,
contactId);
// Second time
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
......@@ -1507,6 +1524,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
public void testMessageDependencies() throws Exception {
int shutdownHandle = 12345;
MessageId messageId2 = new MessageId(getRandomId());
context.checking(new Expectations() {{
// open()
oneOf(database).open(key, null);
......@@ -1521,7 +1539,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(true));
oneOf(database).containsMessage(txn, messageId);
will(returnValue(false));
oneOf(database).addMessage(txn, message, DELIVERED, true, null);
oneOf(database).addMessage(txn, message, DELIVERED, shared,
temporary, null);
oneOf(database).mergeMessageMetadata(txn, messageId, metadata);
// addMessageDependencies()
oneOf(database).containsMessage(txn, messageId);
......@@ -1544,7 +1563,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(eventBus).broadcast(with(any(MessageAddedEvent.class)));
oneOf(eventBus).broadcast(with(any(
MessageStateChangedEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageSharedEvent.class)));
if (shared)
oneOf(eventBus).broadcast(with(any(MessageSharedEvent.class)));
// endTransaction()
oneOf(database).commitTransaction(txn);
// close()
......@@ -1555,7 +1575,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
assertFalse(db.open(key, null));
db.transaction(false, transaction -> {
db.addLocalMessage(transaction, message, metadata, true);
db.addLocalMessage(transaction, message, metadata, shared,
temporary);
Collection<MessageId> dependencies = new ArrayList<>(2);
dependencies.add(messageId1);
dependencies.add(messageId2);
......
......@@ -567,8 +567,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
MessageState state =
MessageState.fromValue(random.nextInt(4));
boolean shared = random.nextBoolean();
boolean temporary = random.nextBoolean();
ContactId sender = random.nextBoolean() ? c : null;
db.addMessage(txn, m, state, shared, sender);
db.addMessage(txn, m, state, shared, temporary, sender);
if (random.nextBoolean())
db.raiseRequestedFlag(txn, c, m.getId());
Metadata mm = getMetadata(METADATA_KEYS_PER_MESSAGE);
......@@ -597,7 +598,8 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
for (int j = 0; j < MESSAGES_PER_GROUP; j++) {
Message m = getMessage(g.getId());
messages.add(m);
db.addMessage(txn, m, DELIVERED, false, null);
boolean temporary = random.nextBoolean();
db.addMessage(txn, m, DELIVERED, false, temporary, null);
Metadata mm = getMetadata(METADATA_KEYS_PER_MESSAGE);
messageMeta.get(g.getId()).add(mm);
db.mergeMessageMetadata(txn, m.getId(), mm);
......
......@@ -42,6 +42,7 @@ public class LifecycleManagerImplTest extends BrambleMockTestCase {
oneOf(db).open(dbKey, lifecycleManager);
will(returnValue(false));
oneOf(db).transaction(with(false), withDbRunnable(txn));
oneOf(db).removeTemporaryMessages(txn);
allowing(eventBus).broadcast(with(any(LifecycleEvent.class)));
}});
......
......@@ -172,7 +172,7 @@ public class ClientVersioningManagerImplTest extends BrambleMockTestCase {
localVersionsBody);
will(returnValue(localVersions));
oneOf(db).addLocalMessage(txn, localVersions, new Metadata(),
false);
false, false);
// Inform contacts that client versions have changed
oneOf(db).getContacts(txn);
will(returnValue(singletonList(contact)));
......@@ -259,7 +259,7 @@ public class ClientVersioningManagerImplTest extends BrambleMockTestCase {
newLocalVersionsBody);
will(returnValue(newLocalVersions));
oneOf(db).addLocalMessage(txn, newLocalVersions, new Metadata(),
false);
false, false);
// Inform contacts that client versions have changed
oneOf(db).getContacts(txn);
will(returnValue(singletonList(contact)));
......@@ -355,7 +355,7 @@ public class ClientVersioningManagerImplTest extends BrambleMockTestCase {
newLocalVersionsBody);
will(returnValue(newLocalVersions));
oneOf(db).addLocalMessage(txn, newLocalVersions, new Metadata(),
false);
false, false);
// Inform contacts that client versions have changed
oneOf(db).getContacts(txn);
will(returnValue(singletonList(contact)));
......
......@@ -270,7 +270,7 @@ class IntroductionManagerImpl extends ConversationClientImpl
private MessageId createStorageId(Transaction txn) throws DbException {
Message m = clientHelper
.createMessageForStoringMetadata(localGroup.getId());
db.addLocalMessage(txn, m, new Metadata(), false);
db.addLocalMessage(txn, m, new Metadata(), false, false);
return m.getId();
}
......
......@@ -248,7 +248,7 @@ class GroupInvitationManagerImpl extends ConversationClientImpl
private MessageId createStorageId(Transaction txn, GroupId g)
throws DbException {
Message m = clientHelper.createMessageForStoringMetadata(g);
db.addLocalMessage(txn, m, new Metadata(), false);
db.addLocalMessage(txn, m, new Metadata(), false, false);
return m.getId();
}
......
......@@ -240,7 +240,7 @@ abstract class SharingManagerImpl<S extends Shareable>
private MessageId createStorageId(Transaction txn, GroupId g)
throws DbException {
Message m = clientHelper.createMessageForStoringMetadata(g);
db.addLocalMessage(txn, m, new Metadata(), false);
db.addLocalMessage(txn, m, new Metadata(), false, false);
return m.getId();
}
......
......@@ -226,7 +226,7 @@ public class GroupInvitationManagerImplTest extends BrambleMockTestCase {
.createMessageForStoringMetadata(contactGroup.getId());
will(returnValue(storageMessage));
oneOf(db).addLocalMessage(txn, storageMessage, new Metadata(),
false);
false, false);
}});
}
......
......@@ -221,7 +221,8 @@ public class BlogSharingManagerImplTest extends BrambleMockTestCase {
oneOf(clientHelper)
.createMessageForStoringMetadata(contactGroup.getId());
will(returnValue(message));
oneOf(db).addLocalMessage(txn, message, new Metadata(), false);
oneOf(db).addLocalMessage(txn, message, new Metadata(), false,
false);
oneOf(sessionEncoder).encodeSession(with(any(Session.class)));
will(returnValue(sessionDict));
oneOf(clientHelper).mergeMessageMetadata(txn, message.getId(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment