Skip to content
Snippets Groups Projects

Add method to watch connection state of contacts

Merged Nico requested to merge watch-contact-state into main
Files
2
@@ -15,6 +15,9 @@ from briar_wrapper.model import Model
class Contacts(Model):
API_ENDPOINT = "contacts/"
CONNECTION_EVENTS = ("ContactConnectedEvent", "ContactDisconnectedEvent")
_connections_callback = None
def add_pending(self, link, alias):
url = urljoin(BASE_HTTP_URL, self.API_ENDPOINT + "add/pending/")
@@ -32,6 +35,25 @@ class Contacts(Model):
request = _get(url, headers=self._headers).json()
return request['link']
def watch_connections(self, callback):
self._connections_callback = callback
signal_ids = list()
event_callback = self._handle_connections_callback
for event in self.CONNECTION_EVENTS:
signal_id = self._api.socket_listener.connect(event,
event_callback)
signal_ids.append(signal_id)
return signal_ids
def _handle_connections_callback(self, message):
contact_id = message["data"]["contactId"]
if message["name"] == "ContactConnectedEvent":
self._connections_callback(contact_id, True)
elif message["name"] == "ContactDisconnectedEvent":
self._connections_callback(contact_id, False)
else:
raise Exception(f"Wrong event in callback: {message['name']}")
def _sort_contact_list(contacts):
contacts.sort(key=itemgetter("lastChatActivity"),
reverse=True)
Loading