...
 
Commits (8)
package org.briarproject.bramble.api;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
* An item that can be consumed once.
*/
@NotNullByDefault
public class Consumable<T> {
private final AtomicReference<T> reference;
public Consumable(T item) {
reference = new AtomicReference<>(item);
}
@Nullable
public T consume() {
return reference.getAndSet(null);
}
}
......@@ -163,27 +163,23 @@ public interface DatabaseComponent extends TransactionManager {
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
*
* @param small True if only single-block messages should be sent
*/
@Nullable
Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
int maxLength, int maxLatency, boolean small) throws DbException;
/**
* Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no
* messages to offer.
*
* @param small True if only single-block messages should be offered
*/
@Nullable
Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
int maxLatency) throws DbException;
/**
* Returns a request for the given contact, or null if there are no
* messages to request.
*/
@Nullable
Request generateRequest(Transaction txn, ContactId c, int maxMessages)
throws DbException;
int maxLatency, boolean small) throws DbException;
/**
* Returns a batch of messages for the given contact, with a total length
......
......@@ -29,10 +29,15 @@ public interface SyncConstants {
*/
int MESSAGE_HEADER_LENGTH = UniqueId.LENGTH + 8;
/**
* The maximum length of a block in bytes.
*/
int MAX_BLOCK_LENGTH = 32 * 1024; // 32 KiB
/**
* The maximum length of a message body in bytes.
*/
int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB
int MAX_MESSAGE_BODY_LENGTH = MAX_BLOCK_LENGTH;
/**
* The maximum length of a message in bytes.
......
......@@ -7,16 +7,16 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a message is received from, or offered by, a
* contact and needs to be acknowledged.
* An event that is broadcast when one or more messages are received from, or
* offered by, a contact and need to be acknowledged.
*/
@Immutable
@NotNullByDefault
public class MessageToAckEvent extends Event {
public class MessagesToAckEvent extends Event {
private final ContactId contactId;
public MessageToAckEvent(ContactId contactId) {
public MessagesToAckEvent(ContactId contactId) {
this.contactId = contactId;
}
......
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.Consumable;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageId;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a message is offered by a contact and needs
* to be requested.
* An event that is broadcast when one or more messages are offered by a
* contact and need to be requested.
*/
@Immutable
@NotNullByDefault
public class MessageToRequestEvent extends Event {
public class MessagesToRequestEvent extends Event {
private final ContactId contactId;
private final Consumable<Collection<MessageId>> ids;
public MessageToRequestEvent(ContactId contactId) {
public MessagesToRequestEvent(ContactId contactId,
Collection<MessageId> ids) {
this.contactId = contactId;
this.ids = new Consumable<>(ids);
}
public ContactId getContactId() {
return contactId;
}
@Nullable
public Collection<MessageId> consumeIds() {
return ids.consume();
}
}
......@@ -125,11 +125,6 @@ interface Database<T> {
void addMessageDependency(T txn, Message dependent, MessageId dependency,
MessageState dependentState) throws DbException;
/**
* Records that a message has been offered by the given contact.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Stores a pending contact.
*/
......@@ -218,13 +213,6 @@ interface Database<T> {
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Returns the number of messages offered by the given contact.
* <p/>
* Read-only.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
/**
* Deletes the message with the given ID. Unlike
* {@link #removeMessage(Object, MessageId)}, the message ID and any other
......@@ -447,13 +435,13 @@ interface Database<T> {
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* Returns the IDs of some single-block messages that are eligible to be
* offered to the given contact, up to the given number of messages.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
Collection<MessageId> getSmallMessagesToOffer(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
......@@ -464,6 +452,15 @@ interface Database<T> {
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
int maxLatency) throws DbException;
/**
* Returns the IDs of some single-block messages that are eligible to be
* sent to the given contact, up to the given total length.
* <p/>
* Read-only.
*/
Collection<MessageId> getSmallMessagesToSend(T txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
* Returns the IDs of any messages that need to be validated.
* <p/>
......@@ -626,13 +623,6 @@ interface Database<T> {
*/
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes the given offered messages that were offered by the given
* contact.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes a pending contact (and all associated state) from the database.
*/
......
......@@ -61,10 +61,10 @@ import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.sync.event.SyncVersionsUpdatedEvent;
import org.briarproject.bramble.api.sync.validation.MessageState;
import org.briarproject.bramble.api.transport.KeySetId;
......@@ -91,7 +91,6 @@ import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
import static org.briarproject.bramble.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.bramble.util.LogUtils.logDuration;
import static org.briarproject.bramble.util.LogUtils.logException;
import static org.briarproject.bramble.util.LogUtils.now;
......@@ -401,19 +400,22 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Collection<Message> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
ContactId c, int maxLength, int maxLatency, boolean small)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToSend(txn, c, maxLength, maxLatency);
Collection<MessageId> ids;
if (small)
ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency);
else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency);
if (ids.isEmpty()) return null;
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return messages;
......@@ -422,34 +424,21 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable
@Override
public Offer generateOffer(Transaction transaction, ContactId c,
int maxMessages, int maxLatency) throws DbException {
int maxMessages, int maxLatency, boolean small) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
Collection<MessageId> ids;
if (small)
ids = db.getSmallMessagesToOffer(txn, c, maxMessages, maxLatency);
else ids = db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null;
for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
return new Offer(ids);
}
@Nullable
@Override
public Request generateRequest(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToRequest(txn, c,
maxMessages);
if (ids.isEmpty()) return null;
db.removeOfferedMessages(txn, c, ids);
return new Request(ids);
}
@Nullable
@Override
public Collection<Message> generateRequestedBatch(Transaction transaction,
......@@ -460,12 +449,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
if (ids.isEmpty()) return null;
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return messages;
......@@ -814,7 +803,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.addMessage(txn, m, UNKNOWN, false, false, c);
transaction.attach(new MessageAddedEvent(m, c));
}
transaction.attach(new MessageToAckEvent(c));
transaction.attach(new MessagesToAckEvent(c));
}
}
......@@ -825,21 +814,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
boolean ack = false, request = false;
int count = db.countOfferedMessages(txn, c);
boolean ack = false;
List<MessageId> request = new ArrayList<>(o.getMessageIds().size());
for (MessageId m : o.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m);
db.raiseAckFlag(txn, c, m);
ack = true;
} else if (count < MAX_OFFERED_MESSAGES) {
db.addOfferedMessage(txn, c, m);
request = true;
count++;
} else {
request.add(m);
}
}
if (ack) transaction.attach(new MessageToAckEvent(c));
if (request) transaction.attach(new MessageToRequestEvent(c));
if (ack) transaction.attach(new MessagesToAckEvent(c));
if (!request.isEmpty())
transaction.attach(new MessagesToRequestEvent(c, request));
}
@Override
......
......@@ -6,13 +6,6 @@ import static java.util.concurrent.TimeUnit.DAYS;
interface DatabaseConstants {
/**
* The maximum number of offered messages from each contact that will be
* stored. If offers arrive more quickly than requests can be sent and this
* limit is reached, additional offers will not be stored.
*/
int MAX_OFFERED_MESSAGES = 1000;
/**
* The namespace of the {@link Settings} where the database schema version
* is stored.
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.TransactionManager;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.lifecycle.ShutdownManager;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import java.sql.Connection;
......@@ -22,9 +21,8 @@ public class DatabaseModule {
@Provides
@Singleton
Database<Connection> provideDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new H2Database(config, messageFactory, clock);
Database<Connection> provideDatabase(DatabaseConfig config, Clock clock) {
return new H2Database(config, clock);
}
@Provides
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.DbClosedException;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.util.StringUtils;
......@@ -50,9 +49,8 @@ class H2Database extends JdbcDatabase {
private volatile SecretKey key = null;
@Inject
H2Database(DatabaseConfig config, MessageFactory messageFactory,
Clock clock) {
super(dbTypes, messageFactory, clock);
H2Database(DatabaseConfig config, Clock clock) {
super(dbTypes, clock);
this.config = config;
File dir = config.getDatabaseDirectory();
String path = new File(dir, "db").getAbsolutePath();
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.DbClosedException;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.util.StringUtils;
......@@ -49,9 +48,8 @@ class HyperSqlDatabase extends JdbcDatabase {
private volatile SecretKey key = null;
@Inject
HyperSqlDatabase(DatabaseConfig config, MessageFactory messageFactory,
Clock clock) {
super(dbTypes, messageFactory, clock);
HyperSqlDatabase(DatabaseConfig config, Clock clock) {
super(dbTypes, clock);
this.config = config;
File dir = config.getDatabaseDirectory();
String path = new File(dir, "db").getAbsolutePath();
......
......@@ -37,6 +37,7 @@ class Migration38_39 implements Migration<Connection> {
s.execute("ALTER TABLE incomingKeys"
+ " ALTER COLUMN contactId"
+ " SET NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -36,6 +36,7 @@ class Migration39_40 implements Migration<Connection> {
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN eta"
+ " SET NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -38,6 +38,7 @@ class Migration40_41 implements Migration<Connection> {
s = txn.createStatement();
s.execute("ALTER TABLE contacts"
+ dbTypes.replaceTypes(" ADD alias _STRING"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -89,6 +89,7 @@ class Migration41_42 implements Migration<Connection> {
+ " FOREIGN KEY (keySetId)"
+ " REFERENCES outgoingHandshakeKeys (keySetId)"
+ " ON DELETE CASCADE)"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -44,6 +44,7 @@ class Migration42_43 implements Migration<Connection> {
+ " ADD COLUMN handshakePublicKey _BINARY"));
s.execute("ALTER TABLE contacts"
+ " DROP COLUMN active");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -50,6 +50,7 @@ class Migration43_44 implements Migration<Connection> {
+ " ADD COLUMN rootKey _SECRET"));
s.execute("ALTER TABLE outgoingKeys"
+ " ADD COLUMN alice BOOLEAN");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -31,6 +31,7 @@ class Migration44_45 implements Migration<Connection> {
try {
s = txn.createStatement();
s.execute("ALTER TABLE pendingContacts DROP COLUMN state");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -32,6 +32,7 @@ class Migration45_46 implements Migration<Connection> {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN temporary BOOLEAN DEFAULT FALSE NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -39,6 +39,7 @@ class Migration46_47 implements Migration<Connection> {
s.execute(dbTypes.replaceTypes("ALTER TABLE contacts"
+ " ADD COLUMN syncVersions"
+ " _BINARY DEFAULT '00' NOT NULL"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.lang.System.arraycopy;
import static java.sql.Types.BINARY;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration47_48 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration47_48.class.getName());
private final DatabaseTypes dbTypes;
Migration47_48(DatabaseTypes dbTypes) {
this.dbTypes = dbTypes;
}
@Override
public int getStartVersion() {
return 47;
}
@Override
public int getEndVersion() {
return 48;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN deleted BOOLEAN DEFAULT FALSE NOT NULL");
s.execute("UPDATE messages SET deleted = (raw IS NULL)");
s.execute("ALTER TABLE messages"
+ " ADD COLUMN blockCount INT DEFAULT 1 NOT NULL");
s.execute(dbTypes.replaceTypes("CREATE TABLE blocks"
+ " (messageId _HASH NOT NULL,"
+ " blockNumber INT NOT NULL,"
+ " blockLength INT NOT NULL," // Excludes block header
+ " data BLOB," // Null if message has been deleted
+ " PRIMARY KEY (messageId, blockNumber),"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
+ " ON DELETE CASCADE)"));
rs = s.executeQuery("SELECT messageId, length, raw FROM messages");
ps = txn.prepareStatement("INSERT INTO blocks"
+ " (messageId, blockNumber, blockLength, data)"
+ " VALUES (?, 0, ?, ?)");
int migrated = 0;
while (rs.next()) {
byte[] id = rs.getBytes(1);
int length = rs.getInt(2);
byte[] raw = rs.getBytes(3);
ps.setBytes(1, id);
ps.setInt(2, length - MESSAGE_HEADER_LENGTH);
if (raw == null) {
ps.setNull(3, BINARY);
} else {
byte[] data = new byte[raw.length - MESSAGE_HEADER_LENGTH];
arraycopy(raw, MESSAGE_HEADER_LENGTH, data, 0, data.length);
ps.setBytes(3, data);
}
if (ps.executeUpdate() != 1) throw new DbStateException();
migrated++;
}
ps.close();
rs.close();
s.execute("ALTER TABLE messages DROP COLUMN raw");
s.close();
if (LOG.isLoggable(INFO))
LOG.info("Migrated " + migrated + " messages");
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
tryToClose(rs, LOG, WARNING);
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration48_49 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration48_49.class.getName());
@Override
public int getStartVersion() {
return 48;
}
@Override
public int getEndVersion() {
return 49;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("DROP TABLE offers");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}
......@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
......@@ -21,8 +22,8 @@ import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter;
......@@ -79,8 +80,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false);
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE);
private volatile boolean interrupted = false;
......@@ -112,7 +111,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
generateAck();
generateBatch();
generateOffer();
generateRequest();
long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime;
boolean dataToFlush = true;
......@@ -184,11 +182,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateOffer());
}
private void generateRequest() {
if (generateRequestQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateRequest());
}
private void setNextSendTime(long time) {
long old = nextSendTime.getAndSet(time);
if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED);
......@@ -214,12 +207,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
generateBatch();
} else if (e instanceof MessageToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId))
} else if (e instanceof MessagesToAckEvent) {
if (((MessagesToAckEvent) e).getContactId().equals(contactId))
generateAck();
} else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId))
generateRequest();
} else if (e instanceof MessagesToRequestEvent) {
MessagesToRequestEvent m = (MessagesToRequestEvent) e;
if (m.getContactId().equals(contactId)) {
Collection<MessageId> ids = m.consumeIds();
if (ids != null)
writerTasks.add(new WriteRequest(new Request(ids)));
}
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
......@@ -321,7 +318,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
try {
Offer o = db.transactionWithNullableResult(false, txn -> {
Offer offer = db.generateOffer(txn, contactId,
MAX_MESSAGE_IDS, maxLatency);
MAX_MESSAGE_IDS, maxLatency, true);
setNextSendTime(db.getNextSendTime(txn, contactId));
return offer;
});
......@@ -353,27 +350,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
}
}
private class GenerateRequest implements Runnable {
@DatabaseExecutor
@Override
public void run() {
if (interrupted) return;
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try {
Request r = db.transactionWithNullableResult(false, txn ->
db.generateRequest(txn, contactId, MAX_MESSAGE_IDS));
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
}
private class WriteRequest implements ThrowingRunnable<IOException> {
private final Request request;
......@@ -388,7 +364,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
recordWriter.writeRequest(request);
LOG.info("Sent request");
generateRequest();
}
}
}
......@@ -174,7 +174,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
Collection<Message> b =
db.transactionWithNullableResult(false, txn ->
db.generateBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES, maxLatency));
MAX_RECORD_PAYLOAD_BYTES, maxLatency,
true));
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b == null) decrementOutstandingQueries();
......
......@@ -44,10 +44,10 @@ import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.transport.IncomingKeys;
import org.briarproject.bramble.api.transport.KeySetId;
import org.briarproject.bramble.api.transport.OutgoingKeys;
......@@ -76,7 +76,6 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE;
import static org.briarproject.bramble.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.bramble.test.TestUtils.getAgreementPrivateKey;
import static org.briarproject.bramble.test.TestUtils.getAgreementPublicKey;
import static org.briarproject.bramble.test.TestUtils.getAuthor;
......@@ -295,11 +294,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
throws Exception {
context.checking(new Expectations() {{
// Check whether the contact is in the DB (which it's not)
exactly(18).of(database).startTransaction();
exactly(17).of(database).startTransaction();
will(returnValue(txn));
exactly(18).of(database).containsContact(txn, contactId);
exactly(17).of(database).containsContact(txn, contactId);
will(returnValue(false));
exactly(18).of(database).abortTransaction(txn);
exactly(17).of(database).abortTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
......@@ -323,7 +322,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try {
db.transaction(false, transaction ->
db.generateBatch(transaction, contactId, 123, 456));
db.generateBatch(transaction, contactId, 123, 456, true));
fail();
} catch (NoSuchContactException expected) {
// Expected
......@@ -331,15 +330,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
try {
db.transaction(false, transaction ->
db.generateOffer(transaction, contactId, 123, 456));
fail();
} catch (NoSuchContactException expected) {
// Expected
}
try {
db.transaction(false, transaction ->
db.generateRequest(transaction, contactId, 123));
db.generateOffer(transaction, contactId, 123, 456, true));
fail();
} catch (NoSuchContactException expected) {
// Expected
......@@ -865,7 +856,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToSend(txn, contactId,
oneOf(database).getSmallMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2, maxLatency);
will(returnValue(ids));
oneOf(database).getMessage(txn, messageId);
......@@ -885,7 +876,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
db.transaction(false, transaction ->
assertEquals(messages, db.generateBatch(transaction, contactId,
MAX_MESSAGE_LENGTH * 2, maxLatency)));
MAX_MESSAGE_LENGTH * 2, maxLatency, true)));
}
@Test
......@@ -897,7 +888,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToOffer(txn, contactId, 123, maxLatency);
oneOf(database).getSmallMessagesToOffer(txn, contactId, 123,
maxLatency);
will(returnValue(ids));
oneOf(database).updateExpiryTimeAndEta(txn, contactId, messageId,
maxLatency);
......@@ -909,36 +901,13 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency);
Offer o = db.generateOffer(transaction, contactId, 123, maxLatency,
true);
assertNotNull(o);
assertEquals(ids, o.getMessageIds());
});
}
@Test
public void testGenerateRequest() throws Exception {
MessageId messageId1 = new MessageId(getRandomId());
Collection<MessageId> ids = asList(messageId, messageId1);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToRequest(txn, contactId, 123);
will(returnValue(ids));
oneOf(database).removeOfferedMessages(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Request r = db.generateRequest(transaction, contactId, 123);
assertNotNull(r);
assertEquals(ids, r.getMessageIds());
});
}
@Test
public void testGenerateRequestedBatch() throws Exception {
Collection<MessageId> ids = asList(messageId, messageId1);
......@@ -1018,10 +987,10 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).raiseAckFlag(txn, contactId, messageId);
oneOf(database).commitTransaction(txn);
// First time: the message was received and added
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageAddedEvent.class)));
// Second time: the message needs to be acked
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
......@@ -1049,7 +1018,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).raiseAckFlag(txn, contactId, messageId);
oneOf(database).commitTransaction(txn);
// The message was received but not added
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
......@@ -1080,19 +1049,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
public void testReceiveOffer() throws Exception {
MessageId messageId1 = new MessageId(getRandomId());
MessageId messageId2 = new MessageId(getRandomId());
MessageId messageId3 = new MessageId(getRandomId());
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
// There's room for two more offered messages
oneOf(database).countOfferedMessages(txn, contactId);
will(returnValue(MAX_OFFERED_MESSAGES - 2));
// The first message isn't visible - request it
oneOf(database).containsVisibleMessage(txn, contactId, messageId);
will(returnValue(false));
oneOf(database).addOfferedMessage(txn, contactId, messageId);
// The second message is visible - ack it
oneOf(database).containsVisibleMessage(txn, contactId, messageId1);
will(returnValue(true));
......@@ -1101,19 +1066,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// The third message isn't visible - request it
oneOf(database).containsVisibleMessage(txn, contactId, messageId2);
will(returnValue(false));
oneOf(database).addOfferedMessage(txn, contactId, messageId2);
// The fourth message isn't visible, but there's no room to store it
oneOf(database).containsVisibleMessage(txn, contactId, messageId3);
will(returnValue(false));
oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageToRequestEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessagesToRequestEvent.class)));
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
Offer o = new Offer(asList(messageId, messageId1,
messageId2, messageId3));
Offer o = new Offer(asList(messageId, messageId1, messageId2));
db.transaction(false, transaction ->
db.receiveOffer(transaction, contactId, o));
}
......
......@@ -3,11 +3,9 @@ package org.briarproject.bramble.db;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.system.SystemClock;
import org.briarproject.bramble.test.TestDatabaseConfig;
import org.briarproject.bramble.test.TestMessageFactory;
import org.briarproject.bramble.test.UTest;
import java.io.IOException;
......@@ -32,8 +30,7 @@ public abstract class DatabasePerformanceComparisonTest
private SecretKey databaseKey = getSecretKey();
abstract Database<Connection> createDatabase(boolean conditionA,
DatabaseConfig databaseConfig, MessageFactory messageFactory,
Clock clock);
DatabaseConfig databaseConfig, Clock clock);
@Override
protected void benchmark(String name,
......@@ -76,8 +73,7 @@ public abstract class DatabasePerformanceComparisonTest
private Database<Connection> openDatabase(boolean conditionA)
throws DbException {
Database<Connection> db = createDatabase(conditionA,
new TestDatabaseConfig(testDir), new TestMessageFactory(),
new SystemClock());
new TestDatabaseConfig(testDir), new SystemClock());
db.open(databaseKey, null);
return db;
}
......
......@@ -41,7 +41,6 @@ import static org.briarproject.bramble.test.TestUtils.getGroup;
import static org.briarproject.bramble.test.TestUtils.getIdentity;
import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
import static org.briarproject.bramble.test.UTest.Result.INCONCLUSIVE;
import static org.briarproject.bramble.test.UTest.Z_CRITICAL_0_1;
......@@ -81,7 +80,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
private static final int METADATA_KEYS_PER_MESSAGE = 5;
private static final int METADATA_KEY_LENGTH = 10;
private static final int METADATA_VALUE_LENGTH = 100;
private static final int OFFERED_MESSAGES_PER_CONTACT = 100;
/**
* How many benchmark iterations to run in each block.
......@@ -192,16 +190,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
});
}
@Test
public void testCountOfferedMessages() throws Exception {
String name = "countOfferedMessages(T, ContactId)";
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.countOfferedMessages(txn, pickRandom(contacts).getId());
db.commitTransaction(txn);
});
}
@Test
public void testGetContact() throws Exception {
String name = "getContact(T, ContactId)";
......@@ -454,17 +442,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
});
}
@Test
public void testGetMessagesToRequest() throws Exception {
String name = "getMessagesToRequest(T, ContactId, int)";
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getMessagesToRequest(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS);
db.commitTransaction(txn);
});
}
@Test
public void testGetMessagesToSend() throws Exception {
String name = "getMessagesToSend(T, ContactId, int)";
......@@ -583,9 +560,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
groupMessages.get(g.getId()).add(m.getId());
}
}
for (int j = 0; j < OFFERED_MESSAGES_PER_CONTACT; j++) {
db.addOfferedMessage(txn, c, new MessageId(getRandomId()));
}
}
for (int i = 0; i < LOCAL_GROUPS; i++) {
Group g = getGroup(clientIds.get(i % CLIENTS), 123);
......
......@@ -3,11 +3,9 @@ package org.briarproject.bramble.db;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.system.SystemClock;
import org.briarproject.bramble.test.TestDatabaseConfig;
import org.briarproject.bramble.test.TestMessageFactory;
import org.briarproject.bramble.util.IoUtils;
import java.io.File;
......@@ -26,7 +24,7 @@ public abstract class DatabaseTraceTest extends DatabasePerformanceTest {
private SecretKey databaseKey = getSecretKey();
abstract Database<Connection> createDatabase(DatabaseConfig databaseConfig,
MessageFactory messageFactory, Clock clock);
Clock clock);
@Nullable
protected abstract File getTraceFile();
......@@ -48,8 +46,7 @@ public abstract class DatabaseTraceTest extends DatabasePerformanceTest {
private Database<Connection> openDatabase() throws DbException {
Database<Connection> db = createDatabase(
new TestDatabaseConfig(testDir), new TestMessageFactory(),
new SystemClock());
new TestDatabaseConfig(testDir), new SystemClock());
db.open(databaseKey, null);
return db;
}
......
......@@ -16,6 +16,6 @@ public class H2DatabasePerformanceTest extends SingleDatabasePerformanceTest {
@Override
protected JdbcDatabase createDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new H2Database(config, messageFactory, clock);
return new H2Database(config, clock);
}
}
......@@ -9,6 +9,6 @@ public class H2DatabaseTest extends JdbcDatabaseTest {
@Override
protected JdbcDatabase createDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new H2Database(config, messageFactory, clock);
return new H2Database(config, clock);
}
}
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.junit.Ignore;
......@@ -15,8 +14,8 @@ public class H2DatabaseTraceTest extends DatabaseTraceTest {
@Override
Database<Connection> createDatabase(DatabaseConfig databaseConfig,
MessageFactory messageFactory, Clock clock) {
return new H2Database(databaseConfig, messageFactory, clock) {
Clock clock) {
return new H2Database(databaseConfig, clock) {
@Override
@Nonnull
String getUrl() {
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.junit.Ignore;
......@@ -13,11 +12,11 @@ public class H2HyperSqlDatabasePerformanceComparisonTest
@Override
Database<Connection> createDatabase(boolean conditionA,
DatabaseConfig databaseConfig, MessageFactory messageFactory,
DatabaseConfig databaseConfig,
Clock clock) {
if (conditionA)
return new H2Database(databaseConfig, messageFactory, clock);
else return new HyperSqlDatabase(databaseConfig, messageFactory, clock);
return new H2Database(databaseConfig, clock);
else return new HyperSqlDatabase(databaseConfig, clock);
}
@Override
......
......@@ -11,7 +11,7 @@ public class H2MigrationTest extends DatabaseMigrationTest {
@Override
Database<Connection> createDatabase(
List<Migration<Connection>> migrations) {
return new H2Database(config, messageFactory, clock) {
return new H2Database(config, clock) {
@Override
List<Migration<Connection>> getMigrations() {
return migrations;
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.junit.Ignore;
......@@ -18,9 +17,9 @@ public class H2SelfDatabasePerformanceComparisonTest
@Override
Database<Connection> createDatabase(boolean conditionA,
DatabaseConfig databaseConfig, MessageFactory messageFactory,
DatabaseConfig databaseConfig,
Clock clock) {
return new H2Database(databaseConfig, messageFactory, clock);
return new H2Database(databaseConfig, clock);
}
@Override
......
......@@ -3,7 +3,6 @@ package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.junit.Ignore;
......@@ -20,12 +19,11 @@ public class H2SleepDatabasePerformanceComparisonTest
@Override
Database<Connection> createDatabase(boolean conditionA,
DatabaseConfig databaseConfig, MessageFactory messageFactory,
Clock clock) {
DatabaseConfig databaseConfig, Clock clock) {
if (conditionA) {
return new H2Database(databaseConfig, messageFactory, clock);
return new H2Database(databaseConfig, clock);
} else {
return new H2Database(databaseConfig, messageFactory, clock) {
return new H2Database(databaseConfig, clock) {
@Override
@NotNullByDefault
public void commitTransaction(Connection txn)
......
......@@ -17,6 +17,6 @@ public class HyperSqlDatabasePerformanceTest
@Override
protected JdbcDatabase createDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new HyperSqlDatabase(config, messageFactory, clock);
return new HyperSqlDatabase(config, clock);
}
}
......@@ -9,6 +9,6 @@ public class HyperSqlDatabaseTest extends JdbcDatabaseTest {
@Override
protected JdbcDatabase createDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new HyperSqlDatabase(config, messageFactory ,clock);
return new HyperSqlDatabase(config, clock);
}
}
......@@ -11,7 +11,7 @@ public class HyperSqlMigrationTest extends DatabaseMigrationTest {
@Override
Database<Connection> createDatabase(
List<Migration<Connection>> migrations) {
return new HyperSqlDatabase(config, messageFactory, clock) {
return new HyperSqlDatabase(config, clock) {
@Override
List<Migration<Connection>> getMigrations() {
return migrations;
......
......@@ -40,7 +40,6 @@ import org.junit.Test;
import java.io.File;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
......@@ -1149,35 +1148,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close();
}
@Test
public void testOfferedMessages() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a contact - initially there should be no offered messages
db.addIdentity(txn, identity);
assertEquals(contactId,
db.addContact(txn, author, localAuthor.getId(), null, true));
assertEquals(0, db.countOfferedMessages(txn, contactId));
// Add some offered messages and count them
List<MessageId> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageId m = new MessageId(getRandomId());
db.addOfferedMessage(txn, contactId, m);
ids.add(m);
}
assertEquals(10, db.countOfferedMessages(txn, contactId));
// Remove some of the offered messages and count again
List<MessageId> half = ids.subList(0, 5);
db.removeOfferedMessages(txn, contactId, half);
assertEquals(5, db.countOfferedMessages(txn, contactId));
db.commitTransaction(txn);
db.close();
}
@Test
public void testGroupMetadata() throws Exception {
Database<Connection> db = open(false);
......
......@@ -61,7 +61,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
......@@ -98,7 +98,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(singletonList(message)));
oneOf(recordWriter).writeMessage(message);
// No more acks
......@@ -110,7 +110,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
oneOf(db).transactionWithNullableResult(with(false),
withNullableDbCallable(noMsgTxn));
oneOf(db).generateBatch(with(noMsgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY));
with(any(int.class)), with(MAX_LATENCY), with(true));
will(returnValue(null));
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
......