Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a storage method for returning all current presence from all users #9650

Merged
merged 4 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9650.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a storage method for pulling all current user presence state from the database.
11 changes: 9 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,7 @@ def simple_select_list_paginate_txn(
retcols: Iterable[str],
filters: Optional[Dict[str, Any]] = None,
keyvalues: Optional[Dict[str, Any]] = None,
exclude_keyvalues: Optional[Dict[str, Any]] = None,
order_direction: str = "ASC",
) -> List[Dict[str, Any]]:
"""
Expand All @@ -1929,7 +1930,10 @@ def simple_select_list_paginate_txn(
apply a WHERE ? LIKE ? clause.
keyvalues:
column names and values to select the rows with, or None to not
apply a WHERE clause.
apply a WHERE key = value clause.
exclude_keyvalues:
column names and values to exclude rows with, or None to not
apply a WHERE key != value clause.
order_direction: Whether the results should be ordered "ASC" or "DESC".

Returns:
Expand All @@ -1938,7 +1942,7 @@ def simple_select_list_paginate_txn(
if order_direction not in ["ASC", "DESC"]:
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")

where_clause = "WHERE " if filters or keyvalues else ""
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
arg_list = [] # type: List[Any]
if filters:
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
Expand All @@ -1947,6 +1951,9 @@ def simple_select_list_paginate_txn(
if keyvalues:
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
arg_list += list(keyvalues.values())
if exclude_keyvalues:
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
arg_list += list(exclude_keyvalues.values())

sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
", ".join(retcols),
Expand Down
61 changes: 60 additions & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple
from typing import Dict, List, Tuple

from synapse.api.presence import UserPresenceState
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -157,5 +157,64 @@ async def get_presence_for_users(self, user_ids):

return {row["user_id"]: UserPresenceState(**row) for row in rows}

async def get_presence_for_all_users(
self,
include_offline: bool = True,
) -> Dict[str, UserPresenceState]:
"""Retrieve the current presence state for all users.

Note that the presence_stream table is culled frequently, so it should only
contain the latest presence state for each user.

Args:
include_offline: Whether to include offline presence states

Returns:
A dict of user IDs to their current UserPresenceState.
"""
users_to_state = {}

exclude_keyvalues = {}
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
if not include_offline:
# Exclude offline presence state
exclude_keyvalues = {"state": "offline"}

# This may be a very heavy database query.
# We paginate in order to not block a database connection.
limit = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 seems kind of low, I think we usually batch by 1000?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other usage of this function I see uses a limit of 100:

def _get_statistics_for_subject_txn(
self, txn, stats_type, stats_id, start, size=100
):
"""
Transaction-bound version of L{get_statistics_for_subject}.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
selected_columns = list(
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
)
slice_list = self.db_pool.simple_select_list_paginate_txn(
txn,
table + "_historical",
"end_ts",
start,
size,
retcols=selected_columns + ["bucket_size", "end_ts"],
keyvalues={id_col: stats_id},
order_direction="DESC",
)
return slice_list

But intuitively 1000 is probably also fine, given the overhead of spinning up another query?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. We can tweak it if we need to.

offset = 0
while True:
rows = await self.db_pool.runInteraction(
"get_presence_for_all_users",
self.db_pool.simple_select_list_paginate_txn,
"presence_stream",
orderby="stream_id",
start=offset,
limit=limit,
keyvalues={},
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
exclude_keyvalues=exclude_keyvalues,
retcols=(
"user_id",
"state",
"last_active_ts",
"last_federation_update_ts",
"last_user_sync_ts",
"status_msg",
"currently_active",
),
order_direction="ASC",
)

for row in rows:
users_to_state[row["user_id"]] = UserPresenceState(**row)

# We've ran out of updates to query
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
if len(rows) < limit:
break

offset += limit

return users_to_state

def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()