Commits (4)
......@@ -2,7 +2,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
# License-Filename: LICENSE.md
from os.path import isfile
import os
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from time import sleep
......@@ -25,7 +25,7 @@ class Api:
@staticmethod
def has_account():
return isfile(BRIAR_DB)
return os.path.isfile(BRIAR_DB)
def is_running(self):
return (self._process is not None) and (self._process.poll() is None)
......@@ -79,7 +79,7 @@ class Api:
def _login(self, password):
if not self.is_running():
raise Exception("Can't login; API not running")
self._process.communicate(("%s\n" % password).encode("utf-8"))
self._process.communicate((f"{password}\n").encode("utf-8"))
def _register(self, credentials):
if not self.is_running():
......
......@@ -16,21 +16,23 @@ class Contacts(Model):
API_ENDPOINT = "contacts/"
_on_contact_added_callback = None
def add_pending(self, link, alias):
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT + "add/" + "pending/")
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT + "add/pending/")
_post(url, headers=self._headers, json={"link": link, "alias": alias})
def get(self):
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT)
request = _get(url, headers=self._headers)
contacts = request.json()
contacts.sort(key=itemgetter("lastChatActivity"),
reverse=True)
contacts = Contacts._sort_contact_list(contacts)
return contacts
def get_link(self):
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT + "add/" + "link/")
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT + "add/link/")
request = _get(url, headers=self._headers).json()
return request['link']
def _sort_contact_list(contacts):
contacts.sort(key=itemgetter("lastChatActivity"),
reverse=True)
return contacts
......@@ -4,8 +4,7 @@
from urllib.parse import urljoin
from requests import get as _get
from requests import post as _post
import requests
from briar_wrapper.constants import BASE_HTTP_URL
from briar_wrapper.model import Model
......@@ -15,20 +14,17 @@ class PrivateChat(Model):
API_ENDPOINT = "messages/"
_contact_id = 0
_on_message_received_callback = None
def __init__(self, api, contact_id):
super().__init__(api)
self._contact_id = contact_id
def get(self):
url = urljoin(BASE_HTTP_URL,
self.API_ENDPOINT + "/%d" % self._contact_id)
request = _get(url, headers=self._headers)
self.API_ENDPOINT + str(self._contact_id))
request = requests.get(url, headers=self._headers)
return request.json()
def send(self, message):
url = urljoin(BASE_HTTP_URL,
self.API_ENDPOINT + "/%i" % self._contact_id)
_post(url, headers=self._headers, json={"text": message})
self.API_ENDPOINT + str(self._contact_id))
requests.post(url, headers=self._headers, json={"text": message})
......@@ -10,14 +10,32 @@ from briar_wrapper.models.contacts import Contacts
BASE_HTTP_URL = "http://localhost:7000/v1/contacts/"
TEST_LINK = "briar://wvui4uvhbfv4tzo6xwngknebsxrafainnhldyfj63x6ipp4q2vigy"
TEST_ALIAS = "Alice"
TEST_CONTACT_FIRST = {
"lastChatActivity": 1
}
TEST_CONTACT_SECOND = {
"lastChatActivity": 2
}
TEST_CONTACT_RESPONSE_SINGLE = [
TEST_CONTACT_FIRST
]
TEST_CONTACT_RESPONSE_TWO_UNORDERED = [
TEST_CONTACT_FIRST,
TEST_CONTACT_SECOND
]
TEST_CONTACT_RESPONSE_TWO_ORDERED = [
TEST_CONTACT_SECOND,
TEST_CONTACT_FIRST
]
TEST_LINK = "briar://wvui4uvhbfv4tzo6xwngknebsxrafainnhldyfj63x6ipp4q2vigy"
@requests_mock.Mocker(kw="requests_mock")
def test_add_pending(api, request_headers, requests_mock):
contacts = Contacts(api)
requests_mock.register_uri("POST",
BASE_HTTP_URL + "add/pending/",
request_headers=request_headers,
......@@ -25,15 +43,48 @@ def test_add_pending(api, request_headers, requests_mock):
contacts.add_pending(TEST_LINK, TEST_ALIAS)
def match_request_add_pending(request):
return {"alias": TEST_ALIAS, "link": TEST_LINK} == request.json()
@requests_mock.Mocker(kw='requests_mock')
def test_get(api, request_headers, requests_mock):
contacts = Contacts(api)
requests_mock.register_uri("GET", BASE_HTTP_URL,
request_headers=request_headers,
text=json.dumps(TEST_CONTACT_RESPONSE_SINGLE))
assert contacts.get() == TEST_CONTACT_RESPONSE_SINGLE
@requests_mock.Mocker(kw='requests_mock')
def test_get_empty(api, request_headers, requests_mock):
contacts = Contacts(api)
response = []
requests_mock.register_uri("GET", BASE_HTTP_URL,
request_headers=request_headers,
text=json.dumps([]))
assert contacts.get() == []
@requests_mock.Mocker(kw='requests_mock')
def test_get_unordered(api, request_headers, requests_mock):
contacts = Contacts(api)
requests_mock.register_uri("GET", BASE_HTTP_URL,
request_headers=request_headers,
text=json.dumps(response))
assert contacts.get() == response
text=json.dumps(
TEST_CONTACT_RESPONSE_TWO_UNORDERED)
)
assert contacts.get() == TEST_CONTACT_RESPONSE_TWO_ORDERED
@requests_mock.Mocker(kw='requests_mock')
def test_get_ordered(api, request_headers, requests_mock):
contacts = Contacts(api)
requests_mock.register_uri("GET", BASE_HTTP_URL,
request_headers=request_headers,
text=json.dumps(
TEST_CONTACT_RESPONSE_TWO_ORDERED)
)
assert contacts.get() == TEST_CONTACT_RESPONSE_TWO_ORDERED
@requests_mock.Mocker(kw='requests_mock')
......@@ -45,7 +96,3 @@ def test_get_link(api, request_headers, requests_mock):
request_headers=request_headers,
text=json.dumps(response))
assert contacts.get_link() == TEST_LINK
def match_request_add_pending(request):
return {"alias": TEST_ALIAS, "link": TEST_LINK} == request.json()
# Copyright (c) 2019 Nico Alt
# SPDX-License-Identifier: AGPL-3.0-only
# License-Filename: LICENSE.md
import pytest
import subprocess
from briar_wrapper.api import Api
from briar_wrapper.constants import BRIAR_DB
HEADLESS_JAR = 'briar-headless.jar'
PASSWORD = 'LjnM6/WPQ]V?@<=$'
CREDENTIALS = ('Alice', PASSWORD)
def test_has_account(mocker):
isfile_mock = mocker.patch('os.path.isfile')
isfile_mock.return_value = False
api = Api(HEADLESS_JAR)
assert api.has_account() is False
isfile_mock.assert_called_once_with(BRIAR_DB)
def test_is_running(mocker, process):
process.poll.return_value = None
api = Api(HEADLESS_JAR)
api._process = process
assert api.is_running() is True
def test_is_running_none():
api = Api(HEADLESS_JAR)
api._process = None
assert api.is_running() is False
def test_is_running_poll_none(mocker, process):
process.poll.return_value = 0
api = Api(HEADLESS_JAR)
api._process = process
assert api.is_running() is False
def test_login(callback, start_and_watch, thread):
api = Api(HEADLESS_JAR)
api.login(PASSWORD, callback)
start_and_watch.assert_called_once_with(callback)
thread.assert_called_once_with(target=api._login, args=(PASSWORD,),
daemon=True)
def test_login_already_running(callback, is_running, thread):
api = Api(HEADLESS_JAR)
with pytest.raises(Exception, match='API already running'):
api.login(PASSWORD, callback)
def test_login_not_running():
# TODO: Write test for failed login due to API not running
# Not easy to test because exception is thrown in Thread
pass
def test_login_communicate(callback, is_running, mocker,
process, start_and_watch):
api = Api(HEADLESS_JAR)
api._process = process
api.login(PASSWORD, callback)
process.communicate.assert_called_once_with(
(PASSWORD + "\n").encode("utf-8")
)
def test_register(callback, start_and_watch, thread):
api = Api(HEADLESS_JAR)
api.register(CREDENTIALS, callback)
start_and_watch.assert_called_once_with(callback)
thread.assert_called_once_with(target=api._register, args=(CREDENTIALS,),
daemon=True)
def test_register_already_running(callback, is_running, thread):
api = Api(HEADLESS_JAR)
with pytest.raises(Exception, match='API already running'):
api.register(CREDENTIALS, callback)
def test_register_invalid_credentials(callback):
api = Api(HEADLESS_JAR)
with pytest.raises(Exception, match="Can't process credentials"):
api.register(PASSWORD, callback)
def test_register_communicate(callback, is_running, mocker,
process, start_and_watch):
api = Api(HEADLESS_JAR)
api._process = process
api.register(CREDENTIALS, callback)
process.communicate.assert_called_once_with(
(CREDENTIALS[0] + '\n' +
CREDENTIALS[1] + '\n' +
CREDENTIALS[1] + '\n').encode("utf-8")
)
def test_stop(mocker, is_running, process):
api = Api(HEADLESS_JAR)
api._process = process
api.stop()
api._process.terminate.assert_called_once()
def test_stop_not_running():
api = Api(HEADLESS_JAR)
with pytest.raises(Exception, match='Nothing to stop'):
api.stop()
def test_start_and_watch():
# TODO: Various tests needed here, for both register and login
pass
@pytest.fixture
def callback(mocker):
return mocker.MagicMock()
@pytest.fixture
def is_running(mocker):
is_running_mock = mocker.patch(
'briar_wrapper.api.Api.is_running'
)
is_running_mock.return_value = True
return is_running_mock
@pytest.fixture
def process(mocker):
return mocker.MagicMock()
@pytest.fixture
def start_and_watch(mocker):
return mocker.patch('briar_wrapper.api.Api._start_and_watch')
@pytest.fixture
def thread(mocker):
return mocker.patch('briar_wrapper.api.Thread')
......@@ -2,25 +2,21 @@
# SPDX-License-Identifier: AGPL-3.0-only
# License-Filename: LICENSE.md
from random import choice
from string import ascii_letters, digits
from unittest import mock
import pytest
# pylint: disable=redefined-outer-name
@pytest.fixture
def api(auth_token):
api = mock.Mock()
def api(auth_token, mocker):
api = mocker.Mock()
api.auth_token = auth_token
return api
@pytest.fixture
def auth_token():
return ''.join(choice(ascii_letters + digits) for i in range(33))
return 'NMDArEDjYlagINGd77WkKdxqDUAkYCWwh'
@pytest.fixture
......