...
 
Commits (6)
......@@ -35,14 +35,15 @@ public interface ClientHelper {
Message createMessageForStoringMetadata(GroupId g);
Message getMessage(MessageId m) throws DbException;
Message getSmallMessage(MessageId m) throws DbException;
Message getMessage(Transaction txn, MessageId m) throws DbException;
Message getSmallMessage(Transaction txn, MessageId m) throws DbException;
BdfList getMessageAsList(MessageId m) throws DbException, FormatException;
BdfList getSmallMessageAsList(MessageId m)
throws DbException, FormatException;
BdfList getMessageAsList(Transaction txn, MessageId m) throws DbException,
FormatException;
BdfList getSmallMessageAsList(Transaction txn, MessageId m)
throws DbException, FormatException;
BdfDictionary getGroupMetadataAsDictionary(GroupId g) throws DbException,
FormatException;
......
package org.briarproject.bramble.api.db;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContact;
......@@ -151,50 +152,42 @@ public interface DatabaseComponent extends TransactionManager {
void deleteMessageMetadata(Transaction txn, MessageId m) throws DbException;
/**
* Returns an acknowledgement for the given contact, or null if there are
* no messages to acknowledge.
* Returns a possibly empty ack for the given contact.
* <p/>
* Sync protocol v0.
*/
@Nullable
Ack generateAck(Transaction txn, ContactId c, int maxMessages)
Ack generateAckV0(Transaction txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns a batch of messages for the given contact, with a total length
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
* Returns a possibly empty batch of single-block messages for the given
* contact, for transmission over a transport with the given maximum
* latency.
* <p/>
* Sync protocol v0.
*/
@Nullable
Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
Collection<Message> generateBatchV0(Transaction txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no
* messages to offer.
* Returns a possibly empty offer for the given contact, for transmission
* over a transport with the given maximum latency.
* <p/>
* Sync protocol v0.
*/
@Nullable
Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
Offer generateOfferV0(Transaction txn, ContactId c, int maxMessages,
int maxLatency) throws DbException;
/**
* Returns a request for the given contact, or null if there are no
* messages to request.
*/
@Nullable
Request generateRequest(Transaction txn, ContactId c, int maxMessages)
throws DbException;
/**
* Returns a batch of messages for the given contact, with a total length
* less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Only messages that have been
* requested by the contact are returned. Returns null if there are no
* sendable messages that fit in the given length.
* Returns a possibly empty batch of single-block messages for the given
* contact, for transmission over a transport with the given maximum
* latency. Only messages that have been requested by the contact are
* returned.
* <p/>
* Sync protocol v0.
*/
@Nullable
Collection<Message> generateRequestedBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
Collection<Message> generateRequestedBatchV0(Transaction txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the contact with the given ID.
......@@ -272,13 +265,14 @@ public interface DatabaseComponent extends TransactionManager {
Collection<Identity> getIdentities(Transaction txn) throws DbException;
/**
* Returns the message with the given ID.
* Returns the single-block message with the given ID.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
* @throws MessageTooLargeException if the message has more than one block
*/
Message getMessage(Transaction txn, MessageId m) throws DbException;
Message getSmallMessage(Transaction txn, MessageId m) throws DbException;
/**
* Returns the IDs of all delivered messages in the given group.
......@@ -472,24 +466,35 @@ public interface DatabaseComponent extends TransactionManager {
/**
* Processes an ack from the given contact.
* <p/>
* Sync protocol v0.
*/
void receiveAck(Transaction txn, ContactId c, Ack a) throws DbException;
void receiveAckV0(Transaction txn, ContactId c, Ack a) throws DbException;
/**
* Processes a message from the given contact.
* Processes a single-block message from the given contact.
* <p/>
* Sync protocol v0.
*/
void receiveMessage(Transaction txn, ContactId c, Message m)
void receiveMessageV0(Transaction txn, ContactId c, Message m)
throws DbException;
/**
* Processes an offer from the given contact.
* Processes an offer from the given contact. Returns an ack (which may be
* empty) for any messages that should be acked, and a request (which may
* be empty) for any messages that should be requested.
* <p/>
* Sync protocol v0.
*/
void receiveOffer(Transaction txn, ContactId c, Offer o) throws DbException;
Pair<Ack, Request> receiveOfferV0(Transaction txn, ContactId c, Offer o)
throws DbException;
/**
* Processes a request from the given contact.
* <p/>
* Sync protocol v0.
*/
void receiveRequest(Transaction txn, ContactId c, Request r)
void receiveRequestV0(Transaction txn, ContactId c, Request r)
throws DbException;
/**
......
package org.briarproject.bramble.api.db;
/**
* Thrown when a large message is requested from the database using a method
* that's only suitable for small messages.
*/
public class MessageTooLargeException extends DbException {
}
package org.briarproject.bramble.api.event;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An event that can be consumed by a listener. All listeners receive the event
* but only one can consume it.
*/
public class ConsumableEvent extends Event {
private final AtomicBoolean consumed = new AtomicBoolean(false);
/**
* Tries to consume the event. Returns true if the caller successfully
* consumed the event or false if another caller had already consumed it.
*/
public boolean consume() {
return consumed.getAndSet(true);
}
}
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
import org.briarproject.bramble.api.event.ConsumableEvent;
import org.briarproject.bramble.api.sync.Ack;
/**
* An event that is broadcast when a message is offered by a contact and needs
* to be requested.
* An event that is broadcast when an ack needs to be sent to a contact. The
* consumer should send the ack.
*/
@Immutable
@NotNullByDefault
public class MessageToRequestEvent extends Event {
public class AckToSendEvent extends ConsumableEvent {
private final ContactId contactId;
private final Ack ack;
public MessageToRequestEvent(ContactId contactId) {
public AckToSendEvent(ContactId contactId, Ack ack) {
this.contactId = contactId;
this.ack = ack;
}
public ContactId getContactId() {
return contactId;
}
public Ack getAck() {
return ack;
}
}
......@@ -7,8 +7,8 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a message is received from, or offered by, a
* contact and needs to be acknowledged.
* An event that is broadcast when a message is received from a contact and
* needs to be acknowledged.
*/
@Immutable
@NotNullByDefault
......
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.ConsumableEvent;
import org.briarproject.bramble.api.sync.Request;
/**
* An event that is broadcast when a request needs to be sent to a contact. The
* consumer should send the request.
*/
public class RequestToSendEvent extends ConsumableEvent {
private final ContactId contactId;
private final Request request;
public RequestToSendEvent(ContactId contactId, Request request) {
this.contactId = contactId;
this.request = request;
}
public ContactId getContactId() {
return contactId;
}
public Request getRequest() {
return request;
}
}
......@@ -116,25 +116,27 @@ class ClientHelperImpl implements ClientHelper {
}
@Override
public Message getMessage(MessageId m) throws DbException {
return db.transactionWithResult(true, txn -> getMessage(txn, m));
public Message getSmallMessage(MessageId m) throws DbException {
return db.transactionWithResult(true, txn -> getSmallMessage(txn, m));
}
@Override
public Message getMessage(Transaction txn, MessageId m) throws DbException {
return db.getMessage(txn, m);
public Message getSmallMessage(Transaction txn, MessageId m)
throws DbException {
return db.getSmallMessage(txn, m);
}
@Override
public BdfList getMessageAsList(MessageId m) throws DbException,
public BdfList getSmallMessageAsList(MessageId m) throws DbException,
FormatException {
return db.transactionWithResult(true, txn -> getMessageAsList(txn, m));
return db.transactionWithResult(true, txn ->
getSmallMessageAsList(txn, m));
}
@Override
public BdfList getMessageAsList(Transaction txn, MessageId m)
public BdfList getSmallMessageAsList(Transaction txn, MessageId m)
throws DbException, FormatException {
return toList(db.getMessage(txn, m).getBody());
return toList(db.getSmallMessage(txn, m).getBody());
}
@Override
......
......@@ -12,6 +12,7 @@ import org.briarproject.bramble.api.db.DataTooOldException;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MessageDeletedException;
import org.briarproject.bramble.api.db.MessageTooLargeException;
import org.briarproject.bramble.api.db.Metadata;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.identity.Author;
......@@ -125,11 +126,6 @@ interface Database<T> {
void addMessageDependency(T txn, Message dependent, MessageId dependency,
MessageState dependentState) throws DbException;
/**
* Records that a message has been offered by the given contact.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/**
* Stores a pending contact.
*/
......@@ -218,13 +214,6 @@ interface Database<T> {
boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Returns the number of messages offered by the given contact.
* <p/>
* Read-only.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
/**
* Deletes the message with the given ID. Unlike
* {@link #removeMessage(Object, MessageId)}, the message ID and any other
......@@ -332,13 +321,14 @@ interface Database<T> {
Collection<Identity> getIdentities(T txn) throws DbException;
/**
* Returns the message with the given ID.
* Returns the single-block message with the given ID.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
* @throws MessageTooLargeException if the message has more than one block
*/
Message getMessage(T txn, MessageId m) throws DbException;
Message getSmallMessage(T txn, MessageId m) throws DbException;
/**
* Returns the IDs and states of all dependencies of the given message.
......@@ -439,40 +429,37 @@ interface Database<T> {
throws DbException;
/**
* Returns the IDs of some messages received from the given contact that
* need to be acknowledged, up to the given number of messages.
* Returns the IDs of some single-block messages that need to be
* acknowledged to the given contact, up to the given number of messages.
* <p/>
* Sync protocol v0.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToAck(T txn, ContactId c, int maxMessages)
throws DbException;
Collection<MessageId> getMessagesToAckV0(T txn, ContactId c,
int maxMessages) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be offered to the
* given contact, up to the given number of messages.
* Returns the IDs of some single-block messages that are eligible to be
* offered to the given contact, up to the given number of messages.
* <p/>
* Sync protocol v0.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToOffer(T txn, ContactId c,
Collection<MessageId> getMessagesToOfferV0(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* Returns the IDs of some single-block messages that are eligible to be
* sent to the given contact, up to the given number of messages.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length.
* Sync protocol v0.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
int maxLatency) throws DbException;
Collection<MessageId> getMessagesToSendV0(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of any messages that need to be validated.
......@@ -523,14 +510,16 @@ interface Database<T> {
Collection<PendingContact> getPendingContacts(T txn) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given
* total length.
* Returns the IDs of some single-block messages that are eligible to be
* sent to the given contact and have been requested by the contact, up to
* the given number of messages.
* <p/>
* Sync protocol v0.
* <p/>
* Read-only.
*/
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
Collection<MessageId> getRequestedMessagesToSendV0(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException;
/**
* Returns all settings in the given namespace.
......@@ -561,18 +550,22 @@ interface Database<T> {
throws DbException;
/**
* Marks the given messages as not needing to be acknowledged to the
* given contact.
* Marks the given single-block messages as not needing to be acknowledged
* to the given contact.
* <p/>
* Sync protocol v0.
*/
void lowerAckFlag(T txn, ContactId c, Collection<MessageId> acked)
void lowerAckFlagV0(T txn, ContactId c, Collection<MessageId> acked)
throws DbException;
/**
* Marks the given messages as not having been requested by the given
* contact.
* Marks the given single-block messages as not having been requested by
* the given contact.
* <p/>
* Sync protocol v0.
*/
void lowerRequestedFlag(T txn, ContactId c, Collection<MessageId> requested)
throws DbException;
void lowerRequestedFlagV0(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Merges the given metadata with the existing metadata for the given
......@@ -595,19 +588,28 @@ interface Database<T> {
void mergeSettings(T txn, Settings s, String namespace) throws DbException;
/**
* Marks a message as needing to be acknowledged to the given contact.
* Marks a single-block message as needing to be acknowledged to the given
* contact.
* <p/>
* Sync protocol v0.
*/
void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException;
void raiseAckFlagV0(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks a message as having been requested by the given contact.
* Marks a single-block message as having been requested by the given
* contact.
* <p/>
* Sync protocol v0.
*/
void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException;
void raiseRequestedFlagV0(T txn, ContactId c, MessageId m)
throws DbException;
/**
* Marks a message as having been seen by the given contact.
* Marks a single-block message as having been seen by the given contact.
* <p/>
* Sync protocol v0.
*/
void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException;
void raiseSeenFlagV0(T txn, ContactId c, MessageId m) throws DbException;
/**
* Removes a contact from the database.
......@@ -636,13 +638,6 @@ interface Database<T> {
*/
void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes the given offered messages that were offered by the given
* contact.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/**
* Removes a pending contact (and all associated state) from the database.
*/
......@@ -666,10 +661,12 @@ interface Database<T> {
throws DbException;
/**
* Resets the transmission count and expiry time of the given message with
* respect to the given contact.
* Resets the transmission count and expiry time of the given single-block
* message with respect to the given contact.
* <p/>
* Sync protocol v0.
*/
void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException;
void resetExpiryTimeV0(T txn, ContactId c, MessageId m) throws DbException;
/**
* Marks the given contact as verified.
......@@ -732,11 +729,13 @@ interface Database<T> {
/**
* Updates the transmission count, expiry time and estimated time of arrival
* of the given message with respect to the given contact, using the latency
* of the transport over which it was sent.
* of the given single-block message with respect to the given contact,
* using the latency of the transport over which it was sent.
* <p/>
* Sync protocol v0.
*/
void updateExpiryTimeAndEta(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException;
void updateExpiryTimeAndEtaV0(T txn, ContactId c, MessageId m,
int maxLatency) throws DbException;
/**
* Stores the given transport keys, deleting any keys they have replaced.
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContact;
......@@ -62,7 +63,6 @@ import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.api.sync.event.SyncVersionsUpdatedEvent;
......@@ -91,7 +91,6 @@ import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
import static org.briarproject.bramble.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.bramble.util.LogUtils.logDuration;
import static org.briarproject.bramble.util.LogUtils.logException;
import static org.briarproject.bramble.util.LogUtils.now;
......@@ -389,89 +388,66 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.deleteMessageMetadata(txn, m);
}
@Nullable
@Override
public Ack generateAck(Transaction transaction, ContactId c,
public Ack generateAckV0(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToAck(txn, c, maxMessages);
if (ids.isEmpty()) return null;
db.lowerAckFlag(txn, c, ids);
Collection<MessageId> ids = db.getMessagesToAckV0(txn, c, maxMessages);
if (!ids.isEmpty()) db.lowerAckFlagV0(txn, c, ids);
return new Ack(ids);
}
@Nullable
@Override
public Collection<Message> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
public Collection<Message> generateBatchV0(Transaction transaction,
ContactId c, int maxMessages, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToSend(txn, c, maxLength, maxLatency);
db.getMessagesToSendV0(txn, c, maxMessages, maxLatency);
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
messages.add(db.getSmallMessage(txn, m));
db.updateExpiryTimeAndEtaV0(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
if (!ids.isEmpty()) db.lowerRequestedFlagV0(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return messages;
}
@Nullable
@Override
public Offer generateOffer(Transaction transaction, ContactId c,
public Offer generateOfferV0(Transaction transaction, ContactId c,
int maxMessages, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToOffer(txn, c, maxMessages, maxLatency);
if (ids.isEmpty()) return null;
db.getMessagesToOfferV0(txn, c, maxMessages, maxLatency);
for (MessageId m : ids)
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
db.updateExpiryTimeAndEtaV0(txn, c, m, maxLatency);
return new Offer(ids);
}
@Nullable
@Override
public Request generateRequest(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
public Collection<Message> generateRequestedBatchV0(Transaction transaction,
ContactId c, int maxMessages, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToRequest(txn, c,
maxMessages);
if (ids.isEmpty()) return null;
db.removeOfferedMessages(txn, c, ids);
return new Request(ids);
}
@Nullable
@Override
public Collection<Message> generateRequestedBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
Collection<MessageId> ids = db.getRequestedMessagesToSendV0(txn, c,
maxMessages, maxLatency);
List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
messages.add(db.getMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
messages.add(db.getSmallMessage(txn, m));
db.updateExpiryTimeAndEtaV0(txn, c, m, maxLatency);
}
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
if (!ids.isEmpty()) db.lowerRequestedFlagV0(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids));
return messages;
}
......@@ -559,12 +535,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Override
public Message getMessage(Transaction transaction, MessageId m)
public Message getSmallMessage(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
return db.getMessage(txn, m);
return db.getSmallMessage(txn, m);
}
@Override
......@@ -786,7 +762,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Override
public void receiveAck(Transaction transaction, ContactId c, Ack a)
public void receiveAckV0(Transaction transaction, ContactId c, Ack a)
throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
......@@ -795,7 +771,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
Collection<MessageId> acked = new ArrayList<>();
for (MessageId m : a.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m);
db.raiseSeenFlagV0(txn, c, m);
acked.add(m);
}
}
......@@ -805,16 +781,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Override
public void receiveMessage(Transaction transaction, ContactId c, Message m)
throws DbException {
public void receiveMessageV0(Transaction transaction, ContactId c,
Message m) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
if (db.getGroupVisibility(txn, c, m.getGroupId()) != INVISIBLE) {
if (db.containsMessage(txn, m.getId())) {
db.raiseSeenFlag(txn, c, m.getId());
db.raiseAckFlag(txn, c, m.getId());
db.raiseSeenFlagV0(txn, c, m.getId());
db.raiseAckFlagV0(txn, c, m.getId());
} else {
db.addMessage(txn, m, UNKNOWN, false, false, c);
transaction.attach(new MessageAddedEvent(m, c));
......@@ -824,32 +800,28 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
}
@Override
public void receiveOffer(Transaction transaction, ContactId c, Offer o)
throws DbException {
public Pair<Ack, Request> receiveOfferV0(Transaction transaction,
ContactId c, Offer o) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
boolean ack = false, request = false;
int count = db.countOfferedMessages(txn, c);
List<MessageId> ack = new ArrayList<>();
List<MessageId> request = new ArrayList<>();
for (MessageId m : o.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m);
db.raiseAckFlag(txn, c, m);
ack = true;
} else if (count < MAX_OFFERED_MESSAGES) {
db.addOfferedMessage(txn, c, m);
request = true;
count++;
db.raiseSeenFlagV0(txn, c, m);
ack.add(m);
} else {
request.add(m);
}
}
if (ack) transaction.attach(new MessageToAckEvent(c));
if (request) transaction.attach(new MessageToRequestEvent(c));
return new Pair<>(new Ack(ack), new Request(request));
}
@Override
public void receiveRequest(Transaction transaction, ContactId c, Request r)
throws DbException {
public void receiveRequestV0(Transaction transaction, ContactId c,
Request r) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
......@@ -857,8 +829,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
boolean requested = false;
for (MessageId m : r.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) {
db.raiseRequestedFlag(txn, c, m);
db.resetExpiryTime(txn, c, m);
db.raiseRequestedFlagV0(txn, c, m);
db.resetExpiryTimeV0(txn, c, m);
requested = true;
}
}
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.TransactionManager;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.lifecycle.ShutdownManager;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import java.sql.Connection;
......@@ -22,9 +21,8 @@ public class DatabaseModule {
@Provides
@Singleton
Database<Connection> provideDatabase(DatabaseConfig config,
MessageFactory messageFactory, Clock clock) {
return new H2Database(config, messageFactory, clock);
Database<Connection> provideDatabase(DatabaseConfig config, Clock clock) {
return new H2Database(config, clock);
}
@Provides
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.DbClosedException;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.util.StringUtils;
......@@ -50,9 +49,8 @@ class H2Database extends JdbcDatabase {
private volatile SecretKey key = null;
@Inject
H2Database(DatabaseConfig config, MessageFactory messageFactory,
Clock clock) {
super(dbTypes, messageFactory, clock);
H2Database(DatabaseConfig config, Clock clock) {
super(dbTypes, clock);
this.config = config;
File dir = config.getDatabaseDirectory();
String path = new File(dir, "db").getAbsolutePath();
......
......@@ -6,7 +6,6 @@ import org.briarproject.bramble.api.db.DbClosedException;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.util.StringUtils;
......@@ -49,9 +48,8 @@ class HyperSqlDatabase extends JdbcDatabase {
private volatile SecretKey key = null;
@Inject
HyperSqlDatabase(DatabaseConfig config, MessageFactory messageFactory,
Clock clock) {
super(dbTypes, messageFactory, clock);
HyperSqlDatabase(DatabaseConfig config, Clock clock) {
super(dbTypes, clock);
this.config = config;
File dir = config.getDatabaseDirectory();
String path = new File(dir, "db").getAbsolutePath();
......
......@@ -37,6 +37,7 @@ class Migration38_39 implements Migration<Connection> {
s.execute("ALTER TABLE incomingKeys"
+ " ALTER COLUMN contactId"
+ " SET NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -36,6 +36,7 @@ class Migration39_40 implements Migration<Connection> {
s.execute("ALTER TABLE statuses"
+ " ALTER COLUMN eta"
+ " SET NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -38,6 +38,7 @@ class Migration40_41 implements Migration<Connection> {
s = txn.createStatement();
s.execute("ALTER TABLE contacts"
+ dbTypes.replaceTypes(" ADD alias _STRING"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -89,6 +89,7 @@ class Migration41_42 implements Migration<Connection> {
+ " FOREIGN KEY (keySetId)"
+ " REFERENCES outgoingHandshakeKeys (keySetId)"
+ " ON DELETE CASCADE)"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -44,6 +44,7 @@ class Migration42_43 implements Migration<Connection> {
+ " ADD COLUMN handshakePublicKey _BINARY"));
s.execute("ALTER TABLE contacts"
+ " DROP COLUMN active");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -50,6 +50,7 @@ class Migration43_44 implements Migration<Connection> {
+ " ADD COLUMN rootKey _SECRET"));
s.execute("ALTER TABLE outgoingKeys"
+ " ADD COLUMN alice BOOLEAN");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -31,6 +31,7 @@ class Migration44_45 implements Migration<Connection> {
try {
s = txn.createStatement();
s.execute("ALTER TABLE pendingContacts DROP COLUMN state");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -32,6 +32,7 @@ class Migration45_46 implements Migration<Connection> {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN temporary BOOLEAN DEFAULT FALSE NOT NULL");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
......@@ -39,6 +39,7 @@ class Migration46_47 implements Migration<Connection> {
s.execute(dbTypes.replaceTypes("ALTER TABLE contacts"
+ " ADD COLUMN syncVersions"
+ " _BINARY DEFAULT '00' NOT NULL"));
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
......
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.lang.System.arraycopy;
import static java.sql.Types.BINARY;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration47_48 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration47_48.class.getName());
private final DatabaseTypes dbTypes;
Migration47_48(DatabaseTypes dbTypes) {
this.dbTypes = dbTypes;
}
@Override
public int getStartVersion() {
return 47;
}
@Override
public int getEndVersion() {
return 48;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
s = txn.createStatement();
s.execute("ALTER TABLE messages"
+ " ADD COLUMN deleted BOOLEAN DEFAULT FALSE NOT NULL");
s.execute("UPDATE messages SET deleted = (raw IS NULL)");
s.execute("ALTER TABLE messages"
+ " ALTER COLUMN length RENAME TO dataLength");
s.execute("UPDATE messages SET dataLength = dataLength - "
+ MESSAGE_HEADER_LENGTH);
s.execute("ALTER TABLE messages"
+ " ADD COLUMN blockCount INT DEFAULT 1 NOT NULL");
s.execute(dbTypes.replaceTypes("CREATE TABLE blocks"
+ " (blockId _HASH NOT NULL,"
+ " groupId _HASH NOT NULL,"
+ " timestamp BIGINT NOT NULL,"
+ " blockCount INT NOT NULL,"
+ " blockNumber INT NOT NULL,"
+ " messageId _HASH," // Null if not yet known
+ " backHash _HASH," // Null for single block
+ " prevBackHash _HASH," // Null for single or first block
+ " nextBlockId _HASH," // Null for single or last block
+ " dataLength INT NOT NULL," // Excluding header
+ " data BLOB," // Null if message has been deleted
+ " PRIMARY KEY (blockId),"
+ " FOREIGN KEY (groupId)"
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
+ " ON DELETE CASCADE)"));
rs = s.executeQuery("SELECT messageId, groupId, timestamp,"
+ " dataLength, raw"
+ " FROM messages");
ps = txn.prepareStatement("INSERT INTO blocks"
+ " (blockId, groupId, timestamp, blockCount, blockNumber,"
+ " messageId, dataLength, data)"
+ " VALUES (?, ?, ?, 1, 0, ?, ?, ?)");
int migrated = 0;
while (rs.next()) {
byte[] messageId = rs.getBytes(1);
byte[] groupId = rs.getBytes(2);
long timestamp = rs.getLong(3);
int dataLength = rs.getInt(4);
byte[] raw = rs.getBytes(5);
// For existing messages, block ID equals message ID
ps.setBytes(1, messageId);
ps.setBytes(2, groupId);
ps.setLong(3, timestamp);
ps.setBytes(4, messageId);
ps.setInt(5, dataLength);
if (raw == null) {
ps.setNull(6, BINARY);
} else {
byte[] data = new byte[raw.length - MESSAGE_HEADER_LENGTH];
arraycopy(raw, MESSAGE_HEADER_LENGTH, data, 0, data.length);
ps.setBytes(6, data);
}
if (ps.executeUpdate() != 1) throw new DbStateException();
migrated++;
}
ps.close();
rs.close();
s.execute("ALTER TABLE messages DROP COLUMN raw");
if (LOG.isLoggable(INFO))
LOG.info("Migrated " + migrated + " messages");
s.execute(dbTypes.replaceTypes("CREATE TABLE blockStatuses"
+ " (blockId _HASH NOT NULL,"
+ " contactId INT NOT NULL,"
+ " groupId _HASH NOT NULL," // Denormalised
+ " timestamp BIGINT NOT NULL," // Denormalised
+ " blockCount INT NOT NULL," // Denormalised
+ " blockNumber INT NOT NULL," // Denormalised
+ " messageId _HASH," // Denormalised, null if not yet known
+ " groupShared BOOLEAN NOT NULL," // Denormalised
+ " messageShared BOOLEAN NOT NULL," // Denormalised
+ " deleted BOOLEAN NOT NULL," // Denormalised
+ " blocksToAck INT," // Non-null for first block in message
+ " blocksSeen INT," // Non-null for first block in message
+ " canSendOffer BOOLEAN NOT NULL,"
+ " sendAck BOOLEAN NOT NULL,"
+ " seen BOOLEAN NOT NULL,"
+ " requested BOOLEAN NOT NULL,"
+ " expiry BIGINT NOT NULL,"
+ " txCount INT NOT NULL,"
+ " eta BIGINT NOT NULL,"
+ " PRIMARY KEY (blockId, contactId),"
+ " FOREIGN KEY (blockId)"
+ " REFERENCES blocks (blockId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (groupId)"
+ " REFERENCES groups (groupId)"
+ " ON DELETE CASCADE,"
+ " FOREIGN KEY (messageId)"
+ " REFERENCES messages (messageId)"
+ " ON DELETE CASCADE)"));
rs = s.executeQuery("SELECT messageId, contactId, groupId,"
+ " timestamp, groupShared, messageShared, deleted, ack,"
+ " seen, requested, expiry, txCount, eta"
+ " FROM statuses");
ps = txn.prepareStatement("INSERT INTO blockStatuses"
+ " (blockId, contactId, groupId, timestamp, blockCount,"
+ " blockNumber, messageId, groupShared, messageShared,"
+ " deleted, blocksToAck, blocksSeen, canSendOffer,"
+ " sendAck, seen, requested, expiry, txCount, eta)"
+ " VALUES (?, ?, ?, ?, 1, 0, ?, ?, ?, ?, 1, ?, TRUE, ?,"
+ " ?, ?, ?, ?, ?)");
migrated = 0;
while (rs.next()) {
byte[] messageId = rs.getBytes(1);
int contactId = rs.getInt(2);
byte[] groupId = rs.getBytes(3);
long timestamp = rs.getLong(4);
boolean groupShared = rs.getBoolean(5);
boolean messageShared = rs.getBoolean(6);
boolean deleted = rs.getBoolean(7);
boolean ack = rs.getBoolean(8);
boolean seen = rs.getBoolean(9);
boolean requested = rs.getBoolean(10);
long expiry = rs.getLong(11);
int txCount = rs.getInt(12);
long eta = rs.getLong(13);
// For existing messages, block ID equals message ID
ps.setBytes(1, messageId);
ps.setInt(2, contactId);
ps.setBytes(3, groupId);
ps.setLong(4, timestamp);
ps.setBytes(5, messageId);
ps.setBoolean(6, groupShared);
ps.setBoolean(7, messageShared);
ps.setBoolean(8, deleted);
ps.setInt(9, seen ? 1 : 0);
ps.setBoolean(10, ack);
ps.setBoolean(11, seen);
ps.setBoolean(12, requested);
ps.setLong(13, expiry);
ps.setInt(14, txCount);
ps.setLong(15, eta);
if (ps.executeUpdate() != 1) throw new DbStateException();
migrated++;
}
ps.close();
rs.close();
s.execute("CREATE INDEX IF NOT EXISTS"
+ " blockStatusesByContactIdGroupId"
+ " ON blockStatuses (contactId, groupId)");
s.execute("CREATE INDEX IF NOT EXISTS"
+ " blockStatusesByContactIdTimestamp"
+ " ON blockStatuses (contactId, timestamp)");
s.execute("CREATE INDEX IF NOT EXISTS"
+ " blockStatusesByMessageIdContactId"
+ " ON blockStatuses (messageId, contactId)");
s.execute("DROP TABLE statuses");
s.close();
if (LOG.isLoggable(INFO))
LOG.info("Migrated " + migrated + " statuses");
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
tryToClose(rs, LOG, WARNING);
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration48_49 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration48_49.class.getName());
@Override
public int getStartVersion() {
return 48;
}
@Override
public int getEndVersion() {
return 49;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("DROP TABLE offers");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}
\ No newline at end of file
......@@ -155,7 +155,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
Map<TransportId, LatestUpdate> latest = findLatestLocal(txn);
// Retrieve and parse the latest local properties
for (Entry<TransportId, LatestUpdate> e : latest.entrySet()) {
BdfList message = clientHelper.getMessageAsList(txn,
BdfList message = clientHelper.getSmallMessageAsList(txn,
e.getValue().messageId);
local.put(e.getKey(), parseProperties(message));
}
......@@ -176,7 +176,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
true);
if (latest != null) {
// Retrieve and parse the latest local properties
BdfList message = clientHelper.getMessageAsList(txn,
BdfList message = clientHelper.getSmallMessageAsList(txn,
latest.messageId);
p = parseProperties(message);
}
......@@ -207,7 +207,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
if (latest == null) return new TransportProperties();
// Retrieve and parse the latest remote properties
BdfList message =
clientHelper.getMessageAsList(txn, latest.messageId);
clientHelper.getSmallMessageAsList(txn, latest.messageId);
return parseProperties(message);
} catch (FormatException e) {
throw new DbException(e);
......@@ -235,7 +235,7 @@ class TransportPropertyManagerImpl implements TransportPropertyManager,
merged = p;
changed = true;
} else {
BdfList message = clientHelper.getMessageAsList(txn,
BdfList message = clientHelper.getSmallMessageAsList(txn,
latest.messageId);
TransportProperties old = parseProperties(message);
merged = new TransportProperties(old);
......
......@@ -18,11 +18,12 @@ import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
import org.briarproject.bramble.api.sync.event.AckToSendEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.RequestToSendEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter;
......@@ -42,7 +43,6 @@ import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
import static org.briarproject.bramble.util.LogUtils.logException;
......@@ -60,8 +60,11 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG =
getLogger(DuplexOutgoingSession.class.getName());
private static final int MAX_MESSAGES_PER_BATCH = 10;
private static final ThrowingRunnable<IOException> CLOSE = () -> {
};
private static final ThrowingRunnable<IOException>
NEXT_SEND_TIME_DECREASED = () -> {
};
......@@ -79,8 +82,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false);
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE);
private volatile boolean interrupted = false;
......@@ -112,7 +113,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
generateAck();
generateBatch();
generateOffer();
generateRequest();
long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime;
boolean dataToFlush = true;
......@@ -184,11 +184,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateOffer());
}
private void generateRequest() {
if (generateRequestQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateRequest());
}
private void setNextSendTime(long time) {
long old = nextSendTime.getAndSet(time);
if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED);
......@@ -217,12 +212,17 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof MessageToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId))
generateAck();
} else if (e instanceof MessageToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId))
generateRequest();
} else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt();
} else if (e instanceof AckToSendEvent) {
AckToSendEvent a = (AckToSendEvent) e;
if (a.getContactId().equals(contactId) && a.consume())
writerTasks.add(new WriteAck(a.getAck()));
} else if (e instanceof RequestToSendEvent) {
RequestToSendEvent r = (RequestToSendEvent) e;
if (r.getContactId().equals(contactId) && r.consume())
writerTasks.add(new WriteRequest(r.getRequest()));
}
}
......@@ -234,11 +234,12 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
if (!generateAckQueued.getAndSet(false)) throw new AssertionError();
try {
Ack a = db.transactionWithNullableResult(false, txn ->
db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
Ack a = db.transactionWithResult(false, txn ->
db.generateAckV0(txn, contactId, MAX_MESSAGE_IDS));
boolean empty = a.getMessageIds().isEmpty();
if (LOG.isLoggable(INFO))
LOG.info("Generated ack: " + (a != null));
if (a != null) writerTasks.add(new WriteAck(a));
LOG.info("Generated ack: " + !empty);
if (!empty) writerTasks.add(new WriteAck(a));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
......@@ -273,18 +274,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateBatchQueued.getAndSet(false))
throw new AssertionError();
try {
Collection<Message> b =
db.transactionWithNullableResult(false, txn -> {
Collection<Message> batch =
db.generateRequestedBatch(txn, contactId,
MAX_RECORD_PAYLOAD_BYTES,
maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
return batch;
});
Collection<Message> b = db.transactionWithResult(false, txn -> {
Collection<Message> batch = db.generateRequestedBatchV0(txn,
contactId, MAX_MESSAGES_PER_BATCH, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
return batch;
});
boolean empty = b.isEmpty();
if (LOG.isLoggable(INFO))
LOG.info("Generated batch: " + (b != null));
if (b != null) writerTasks.add(new WriteBatch(b));
LOG.info("Generated batch: " + !empty);
if (!empty) writerTasks.add(new WriteBatch(b));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
......@@ -319,15 +318,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateOfferQueued.getAndSet(false))
throw new AssertionError();
try {
Offer o = db.transactionWithNullableResult(false, txn -> {
Offer offer = db.generateOffer(txn, contactId,
Offer o = db.transactionWithResult(false, txn -> {
Offer offer = db.generateOfferV0(txn, contactId,
MAX_MESSAGE_IDS, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
return offer;
});
boolean empty = o.getMessageIds().isEmpty();
if (LOG.isLoggable(INFO))
LOG.info("Generated offer: " + (o != null));
if (o != null) writerTasks.add(new WriteOffer(o));
LOG.info("Generated offer: " + !empty);
if (!empty) writerTasks.add(new WriteOffer(o));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
......@@ -353,27 +353,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
}
}
private class GenerateRequest implements Runnable {
@DatabaseExecutor
@Override
public void run() {
if (interrupted) return;
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try {
Request r = db.transactionWithNullableResult(false, txn ->
db.generateRequest(txn, contactId, MAX_MESSAGE_IDS));
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
}
private class WriteRequest implements ThrowingRunnable<IOException> {
private final Request request;
......@@ -388,7 +367,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return;
recordWriter.writeRequest(request); <