diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Database.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Database.kt index b8bbd6427bf2df9bddd4c7e88eb52311a116e3cd..c02284f46eafe00c74f48c6b0a7da63addfdf5ac 100644 --- a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Database.kt +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Database.kt @@ -4,7 +4,7 @@ import org.briarproject.mailbox.core.api.Contact import org.briarproject.mailbox.core.settings.Settings import java.sql.Connection -interface Database { +interface Database : TransactionManager { /** * Opens the database and returns true if the database already existed. @@ -18,51 +18,32 @@ interface Database { @Throws(DbException::class) fun close() - /** - * Starts a new transaction and returns an object representing it. - */ - @Throws(DbException::class) - fun startTransaction(): Connection - /** * Aborts the given transaction - no changes made during the transaction * will be applied to the database. */ - fun abortTransaction(txn: Connection) + fun abortTransaction(connection: Connection) /** * Commits the given transaction - all changes made during the transaction * will be applied to the database. */ @Throws(DbException::class) - fun commitTransaction(txn: Connection) - - @Throws(DbException::class) - fun getSettings(txn: Connection, namespace: String?): Settings + fun commitTransaction(connection: Connection) @Throws(DbException::class) - fun mergeSettings(txn: Connection, s: Settings, namespace: String?) + fun getSettings(txn: Transaction, namespace: String): Settings @Throws(DbException::class) - fun addContact(txn: Connection, contact: Contact) + fun mergeSettings(txn: Transaction, s: Settings, namespace: String) @Throws(DbException::class) - fun getContact(txn: Connection, id: Int): Contact? + fun addContact(txn: Transaction, contact: Contact) @Throws(DbException::class) - fun removeContact(txn: Connection, id: Int) + fun getContact(txn: Transaction, id: Int): Contact? - /** - * Runs the given task within a transaction. - */ - @Throws(DbException::class) - fun transaction(readOnly: Boolean, task: (Connection) -> Unit) - - /** - * Runs the given task within a transaction and returns the result of the - * task. - */ @Throws(DbException::class) - fun <R> transactionWithResult(readOnly: Boolean, task: (Connection) -> R): R + fun removeContact(txn: Transaction, id: Int) } diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/JdbcDatabase.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/JdbcDatabase.kt index 249058cb1c8023eada8839776e049283edc67b0d..4639361c4b694fdb096d5c3d3c7e4118bd8ad7b7 100644 --- a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/JdbcDatabase.kt +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/JdbcDatabase.kt @@ -22,6 +22,7 @@ import java.util.Arrays import java.util.LinkedList import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.locks.ReentrantReadWriteLock import javax.annotation.concurrent.GuardedBy abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val clock: Clock) : @@ -64,6 +65,8 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc @Volatile private var wasDirtyOnInitialisation = false + private val lock = ReentrantReadWriteLock(true) + fun open(driverClass: String, reopen: Boolean, listener: MigrationListener?) { // Load the JDBC driver try { @@ -73,25 +76,25 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } // Open the database and create the tables and indexes if necessary val compact: Boolean - var txn = startTransaction() + var connection = startTransaction() try { compact = if (reopen) { - val s: Settings = getSettings(txn, DB_SETTINGS_NAMESPACE) + val s: Settings = getSettings(connection, DB_SETTINGS_NAMESPACE) wasDirtyOnInitialisation = isDirty(s) - migrateSchema(txn, s, listener) || isCompactionDue(s) + migrateSchema(connection, s, listener) || isCompactionDue(s) } else { wasDirtyOnInitialisation = false - createTables(txn) - initialiseSettings(txn) + createTables(connection) + initialiseSettings(connection) false } if (LOG.isInfoEnabled) { LOG.info("db dirty? $wasDirtyOnInitialisation") } - createIndexes(txn) - commitTransaction(txn) + createIndexes(connection) + commitTransaction(connection) } catch (e: DbException) { - abortTransaction(txn) + abortTransaction(connection) throw e } // Compact the database if necessary @@ -102,12 +105,12 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc logDuration(LOG, { "Compacting database" }, start) // Allow the next transaction to reopen the DB synchronized(connectionsLock) { closed = false } - txn = startTransaction() + connection = startTransaction() try { - storeLastCompacted(txn) - commitTransaction(txn) + storeLastCompacted(connection) + commitTransaction(connection) } catch (e: DbException) { - abortTransaction(txn) + abortTransaction(connection) throw e } } @@ -126,7 +129,11 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc * current code and cannot be migrated */ @Throws(DbException::class) - private fun migrateSchema(txn: Connection, s: Settings, listener: MigrationListener?): Boolean { + private fun migrateSchema( + connection: Connection, + s: Settings, + listener: MigrationListener?, + ): Boolean { var dataSchemaVersion = s.getInt(SCHEMA_VERSION_KEY, -1) if (dataSchemaVersion == -1) throw DbException() if (dataSchemaVersion == CODE_SCHEMA_VERSION) return false @@ -139,9 +146,9 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc if (LOG.isInfoEnabled) LOG.info("Migrating from schema $start to $end") listener?.onDatabaseMigration() // Apply the migration - m.migrate(txn) + m.migrate(connection) // Store the new schema version - storeSchemaVersion(txn, end) + storeSchemaVersion(connection, end) dataSchemaVersion = end } } @@ -162,20 +169,43 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc @Throws(DbException::class) protected abstract fun compactAndClose() - override fun startTransaction(): Connection { - var txn: Connection? + override fun startTransaction(readOnly: Boolean): Transaction { + // Don't allow reentrant locking + check(lock.readHoldCount <= 0) + check(lock.writeHoldCount <= 0) + val start = now() + if (readOnly) { + lock.readLock().lock() + logDuration(LOG, { "Waiting for read lock" }, start) + } else { + lock.writeLock().lock() + logDuration(LOG, { "Waiting for write lock" }, start) + } + return try { + Transaction(startTransaction(), readOnly) + } catch (e: DbException) { + if (readOnly) lock.readLock().unlock() else lock.writeLock().unlock() + throw e + } catch (e: RuntimeException) { + if (readOnly) lock.readLock().unlock() else lock.writeLock().unlock() + throw e + } + } + + private fun startTransaction(): Connection { + var connection: Connection? connectionsLock.lock() - txn = try { + connection = try { if (closed) throw DbClosedException() connections.poll() } finally { connectionsLock.unlock() } try { - if (txn == null) { + if (connection == null) { // Open a new connection - txn = createConnection() - txn.autoCommit = false + connection = createConnection() + connection.autoCommit = false connectionsLock.lock() try { openConnections++ @@ -186,15 +216,15 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } catch (e: SQLException) { throw DbException(e) } - return txn + return connection } - override fun abortTransaction(txn: Connection) { + override fun abortTransaction(connection: Connection) { try { - txn.rollback() + connection.rollback() connectionsLock.lock() try { - connections.add(txn) + connections.add(connection) connectionsChanged.signalAll() } finally { connectionsLock.unlock() @@ -202,7 +232,7 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } catch (e: SQLException) { // Try to close the connection logException(LOG, e) - tryToClose(txn, LOG) + tryToClose(connection, LOG) // Whatever happens, allow the database to close connectionsLock.lock() try { @@ -214,15 +244,15 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } } - override fun commitTransaction(txn: Connection) { + override fun commitTransaction(connection: Connection) { try { - txn.commit() + connection.commit() } catch (e: SQLException) { throw DbException(e) } connectionsLock.lock() try { - connections.add(txn) + connections.add(connection) connectionsChanged.signalAll() } finally { connectionsLock.unlock() @@ -260,10 +290,10 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - fun setDirty(txn: Connection, dirty: Boolean) { + fun setDirty(connection: Connection, dirty: Boolean) { val s = Settings() s.putBoolean(DIRTY_KEY, dirty) - mergeSettings(txn, s, DB_SETTINGS_NAMESPACE) + mergeSettings(connection, s, DB_SETTINGS_NAMESPACE) } private fun isCompactionDue(s: Settings): Boolean { @@ -274,32 +304,32 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - private fun storeSchemaVersion(txn: Connection, version: Int) { + private fun storeSchemaVersion(connection: Connection, version: Int) { val s = Settings() s.putInt(SCHEMA_VERSION_KEY, version) - mergeSettings(txn, s, DB_SETTINGS_NAMESPACE) + mergeSettings(connection, s, DB_SETTINGS_NAMESPACE) } @Throws(DbException::class) - private fun storeLastCompacted(txn: Connection) { + private fun storeLastCompacted(connection: Connection) { val s = Settings() s.putLong(LAST_COMPACTED_KEY, clock.currentTimeMillis()) - mergeSettings(txn, s, DB_SETTINGS_NAMESPACE) + mergeSettings(connection, s, DB_SETTINGS_NAMESPACE) } @Throws(DbException::class) - private fun initialiseSettings(txn: Connection) { + private fun initialiseSettings(connection: Connection) { val s = Settings() s.putInt(SCHEMA_VERSION_KEY, CODE_SCHEMA_VERSION) s.putLong(LAST_COMPACTED_KEY, clock.currentTimeMillis()) - mergeSettings(txn, s, DB_SETTINGS_NAMESPACE) + mergeSettings(connection, s, DB_SETTINGS_NAMESPACE) } @Throws(DbException::class) - private fun createTables(txn: Connection) { + private fun createTables(connection: Connection) { var s: Statement? = null try { - s = txn.createStatement() + s = connection.createStatement() s.executeUpdate(dbTypes.replaceTypes(CREATE_SETTINGS)) s.executeUpdate(dbTypes.replaceTypes(CREATE_CONTACTS)) s.close() @@ -310,10 +340,10 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - private fun createIndexes(txn: Connection) { + private fun createIndexes(connection: Connection) { var s: Statement? = null try { - s = txn.createStatement() + s = connection.createStatement() // s.executeUpdate(INDEX_SOMETABLE_BY_SOMECOLUMN) s.close() } catch (e: SQLException) { @@ -323,7 +353,13 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - override fun getSettings(txn: Connection, namespace: String?): Settings { + override fun getSettings(txn: Transaction, namespace: String): Settings { + val connection: Connection = txn.unbox() as Connection + return getSettings(connection, namespace) + } + + @Throws(DbException::class) + private fun getSettings(connection: Connection, namespace: String): Settings { var ps: PreparedStatement? = null var rs: ResultSet? = null return try { @@ -332,7 +368,7 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc WHERE namespace = ? """.trimIndent() - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) ps.setString(1, namespace) rs = ps.executeQuery() val s = Settings() @@ -348,7 +384,13 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - override fun mergeSettings(txn: Connection, s: Settings, namespace: String?) { + override fun mergeSettings(txn: Transaction, s: Settings, namespace: String) { + val connection: Connection = txn.unbox() as Connection + mergeSettings(connection, s, namespace) + } + + @Throws(DbException::class) + fun mergeSettings(connection: Connection, s: Settings, namespace: String) { var ps: PreparedStatement? = null try { // Update any settings that already exist @@ -357,7 +399,7 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc WHERE namespace = ? AND settingKey = ? """.trimIndent() - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) for ((key, value) in s) { ps.setString(1, value) ps.setString(2, namespace) @@ -376,7 +418,7 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc VALUES (?, ?, ?) """.trimIndent() - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) var updateIndex = 0 var inserted = 0 for ((key, value) in s) { @@ -400,13 +442,14 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - override fun addContact(txn: Connection, contact: Contact) { + override fun addContact(txn: Transaction, contact: Contact) { + val connection: Connection = txn.unbox() as Connection var ps: PreparedStatement? = null try { val sql = """INSERT INTO contacts (contactId, token, inbox, outbox) VALUES (?, ?, ?, ?) """.trimIndent() - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) ps.setInt(1, contact.id) ps.setString(2, contact.token) ps.setString(3, contact.inboxId) @@ -421,14 +464,15 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - override fun getContact(txn: Connection, id: Int): Contact? { + override fun getContact(txn: Transaction, id: Int): Contact? { + val connection: Connection = txn.unbox() as Connection var ps: PreparedStatement? = null var rs: ResultSet? = null try { val sql = """SELECT token, inbox, outbox FROM contacts WHERE contactId = ? """.trimIndent() - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) ps.setInt(1, id) rs = ps.executeQuery() if (!rs.next()) return null @@ -446,11 +490,12 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } @Throws(DbException::class) - override fun removeContact(txn: Connection, id: Int) { + override fun removeContact(txn: Transaction, id: Int) { + val connection: Connection = txn.unbox() as Connection var ps: PreparedStatement? = null try { val sql = "DELETE FROM contacts WHERE contactId = ?" - ps = txn.prepareStatement(sql) + ps = connection.prepareStatement(sql) ps.setInt(1, id) val affected = ps.executeUpdate() if (affected != 1) throw DbStateException() @@ -461,34 +506,43 @@ abstract class JdbcDatabase(private val dbTypes: DatabaseTypes, private val cloc } } - override fun transaction(readOnly: Boolean, task: (Connection) -> Unit) { - val txn = startTransaction() - var success = false + @Throws(DbException::class) + override fun commitTransaction(txn: Transaction) { + val connection: Connection = txn.unbox() as Connection + check(!txn.isCommitted) + txn.setCommitted() + commitTransaction(connection) + } + + override fun endTransaction(txn: Transaction) { + try { + val connection: Connection = txn.unbox() as Connection + if (!txn.isCommitted) { + abortTransaction(connection) + } + } finally { + if (txn.isReadOnly) lock.readLock().unlock() else lock.writeLock().unlock() + } + } + + override fun transaction(readOnly: Boolean, task: (Transaction) -> Unit) { + val txn = startTransaction(readOnly) try { task(txn) - success = true + commitTransaction(txn) } finally { - if (success) { - commitTransaction(txn) - } else { - abortTransaction(txn) - } + endTransaction(txn) } } - override fun <R> transactionWithResult(readOnly: Boolean, task: (Connection) -> R): R { - val txn = startTransaction() - var success = false + override fun <R> transactionWithResult(readOnly: Boolean, task: (Transaction) -> R): R { + val txn = startTransaction(readOnly) try { val result = task(txn) - success = true + commitTransaction(txn) return result } finally { - if (success) { - commitTransaction(txn) - } else { - abortTransaction(txn) - } + endTransaction(txn) } } diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Transaction.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Transaction.kt new file mode 100644 index 0000000000000000000000000000000000000000..92283126e53e29202a449149277333dc9cf05708 --- /dev/null +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/Transaction.kt @@ -0,0 +1,33 @@ +package org.briarproject.mailbox.core.db + +class Transaction( + private val txn: Any, + /** + * Returns true if the transaction can only be used for reading. + */ + val isReadOnly: Boolean, +) { + + /** + * Returns true if the transaction has been committed. + */ + var isCommitted = false + private set + + /** + * Returns the database transaction. The type of the returned object + * depends on the database implementation. + */ + fun unbox(): Any { + return txn + } + + /** + * Marks the transaction as committed. This method should only be called + * by the DatabaseComponent. It must not be called more than once. + */ + fun setCommitted() { + check(!isCommitted) + isCommitted = true + } +} diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/TransactionManager.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/TransactionManager.kt new file mode 100644 index 0000000000000000000000000000000000000000..8df28a720248df553d05347794e23b81e1c9b219 --- /dev/null +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/db/TransactionManager.kt @@ -0,0 +1,42 @@ +package org.briarproject.mailbox.core.db + +interface TransactionManager { + /** + * Starts a new transaction and returns an object representing it. + * + * + * This method acquires locks, so it must not be called while holding a + * lock. + * + * @param readOnly true if the transaction will only be used for reading. + */ + @Throws(DbException::class) + fun startTransaction(readOnly: Boolean): Transaction + + /** + * Commits a transaction to the database. + */ + @Throws(DbException::class) + fun commitTransaction(txn: Transaction) + + /** + * Ends a transaction. If the transaction has not been committed, + * it will be aborted. If the transaction has been committed, + * any events attached to the transaction are broadcast. + * The database lock will be released in either case. + */ + fun endTransaction(txn: Transaction) + + /** + * Runs the given task within a transaction. + */ + @Throws(DbException::class) + fun transaction(readOnly: Boolean, task: (Transaction) -> Unit) + + /** + * Runs the given task within a transaction and returns the result of the + * task. + */ + @Throws(DbException::class) + fun <R> transactionWithResult(readOnly: Boolean, task: (Transaction) -> R): R +} diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManager.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManager.kt index 2ec721b8eebabc6658256f8366e2f9d08aa3efb5..691517279963a38a37f96fc82337859ae76e34b4 100644 --- a/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManager.kt +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManager.kt @@ -1,7 +1,7 @@ package org.briarproject.mailbox.core.settings import org.briarproject.mailbox.core.db.DbException -import java.sql.Connection +import org.briarproject.mailbox.core.db.Transaction interface SettingsManager { /** @@ -14,7 +14,7 @@ interface SettingsManager { * Returns all settings in the given namespace. */ @Throws(DbException::class) - fun getSettings(txn: Connection, namespace: String): Settings + fun getSettings(txn: Transaction, namespace: String): Settings /** * Merges the given settings with any existing settings in the given diff --git a/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManagerImpl.kt b/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManagerImpl.kt index 2c9801d0049e564bc297f3269c2a1dec608ffc0e..c1ccc66733d9d8efd8f9782d23b742db83d6caf2 100644 --- a/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManagerImpl.kt +++ b/mailbox-core/src/main/java/org/briarproject/mailbox/core/settings/SettingsManagerImpl.kt @@ -2,7 +2,7 @@ package org.briarproject.mailbox.core.settings import org.briarproject.mailbox.core.db.Database import org.briarproject.mailbox.core.db.DbException -import java.sql.Connection +import org.briarproject.mailbox.core.db.Transaction import javax.annotation.concurrent.Immutable import javax.inject.Inject @@ -11,18 +11,18 @@ internal class SettingsManagerImpl @Inject constructor(private val db: Database) @Throws(DbException::class) override fun getSettings(namespace: String): Settings { - return db.transactionWithResult(true) { txn: Connection -> + return db.transactionWithResult(true) { txn -> db.getSettings(txn, namespace) } } @Throws(DbException::class) - override fun getSettings(txn: Connection, namespace: String): Settings { + override fun getSettings(txn: Transaction, namespace: String): Settings { return db.getSettings(txn, namespace) } @Throws(DbException::class) override fun mergeSettings(s: Settings, namespace: String) { - db.transaction(false) { txn: Connection -> db.mergeSettings(txn, s, namespace) } + db.transaction(false) { txn -> db.mergeSettings(txn, s, namespace) } } } diff --git a/mailbox-core/src/test/java/org/briarproject/mailbox/core/db/JdbcDatabaseTest.kt b/mailbox-core/src/test/java/org/briarproject/mailbox/core/db/JdbcDatabaseTest.kt index 279d77f645c61916f99cb55435af914994fd9031..9fe113ef34c253b321bdcb949069c603467750e9 100644 --- a/mailbox-core/src/test/java/org/briarproject/mailbox/core/db/JdbcDatabaseTest.kt +++ b/mailbox-core/src/test/java/org/briarproject/mailbox/core/db/JdbcDatabaseTest.kt @@ -37,7 +37,7 @@ abstract class JdbcDatabaseTest { open fun testPersistence() { // Store some records var db: Database = open(false) - var txn = db.startTransaction() + var txn = db.startTransaction(false) val contact1 = Contact( 1, @@ -60,8 +60,7 @@ abstract class JdbcDatabaseTest { // Check that the records are still there db = open(true) - txn = db.startTransaction() - + txn = db.startTransaction(false) val contact1Reloaded1 = db.getContact(txn, 1) val contact2Reloaded1 = db.getContact(txn, 2) assertEquals(contact1, contact1Reloaded1) @@ -75,7 +74,7 @@ abstract class JdbcDatabaseTest { // Check that the record is gone db = open(true) - txn = db.startTransaction() + txn = db.startTransaction(true) val contact1Reloaded2 = db.getContact(txn, 1) val contact2Reloaded2 = db.getContact(txn, 2) @@ -99,7 +98,7 @@ abstract class JdbcDatabaseTest { merged["baz"] = "qux" var db: Database = open(false) - var txn = db.startTransaction() + var txn = db.startTransaction(false) // store 'before' db.mergeSettings(txn, before, "namespace")