Skip to content

Commit

Permalink
another improvement of feed/group requests by allowing a larger numbe…
Browse files Browse the repository at this point in the history
…r of subs without impacting too negatively the performance.
  • Loading branch information
j-fbriere committed Feb 14, 2024
1 parent 0d7d9e2 commit 29ca7ac
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 101 deletions.
215 changes: 115 additions & 100 deletions lib/group/_feed.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import 'package:squawker/utils/crypto_util.dart';
import 'package:squawker/utils/iterables.dart';
import 'package:pref/pref.dart';
import 'package:provider/provider.dart';
import 'package:quiver/iterables.dart';
import 'package:synchronized/synchronized.dart';

class SubscriptionGroupFeed extends StatefulWidget {
Expand Down Expand Up @@ -148,15 +149,117 @@ class SubscriptionGroupFeedState extends State<SubscriptionGroupFeed> with Widge
}
}

Future<List<TweetChain>> _listParallelTweets(List<String> searchQueries) async {
var repository = await Repository.writable();
BasePrefService prefs = PrefService.of(context);
List<Future<List<TweetChain>>> futures = [];
RateFetchContext fetchContext = RateFetchContext(prefs.get(optionEnhancedFeeds) ? Twitter.graphqlSearchTimelineUriPath : Twitter.searchTweetsUriPath, searchQueries.length);
await fetchContext.init();
for (int i = 0; i < searchQueries.length; i++) {

futures.add(Future(() async {
List<TweetChain> tweets = <TweetChain>[];
String searchQuery = searchQueries[i];
String hash = await sha1Hash(searchQuery);
String? searchCursor;
String? cursorType;
bool requestToDo = false;

var storedChunks = await repository.query(tableFeedGroupChunk,
where: 'group_id = ? AND hash = ?', whereArgs: [widget.group.id, hash], orderBy: 'created_at DESC');
if (_data.isEmpty) {
requestToDo = true;
// Make sure we load any existing stored tweets from the chunk
var storedChunksTweets = storedChunks
.map((e) => jsonDecode(e['response'] as String))
.map((e) => List.from(e))
.expand((e) => e.map((c) => TweetChain.fromJson(c)))
.toList();

tweets.addAll(storedChunksTweets);

// Use the latest chunk's top cursor to load any new tweets since the last time we checked
var latestChunk = storedChunks.firstOrNull;
if (latestChunk != null) {
searchCursor = latestChunk['cursor_top'] as String;
cursorType = 'cursor_top';
} else {
// Otherwise we need to perform a fresh load from scratch for this chunk
searchCursor = null;
}
} else {
// We're currently at the end of our current feed, so get the oldest chunk's bottom cursor to load older tweets.
if (storedChunks.isNotEmpty) {
requestToDo = true;
searchCursor = storedChunks.last['cursor_bottom'] as String;
cursorType = 'cursor_bottom';
}
}

if (requestToDo) {
// Perform our search for the next page of results for this chunk, and add those tweets to our collection
TweetStatus result;
try {
if (prefs.get(optionEnhancedFeeds)) {
result = await Twitter.searchTweetsGraphql(searchQuery, widget.includeReplies, limit: 100,
cursor: searchCursor,
leanerFeeds: prefs.get(optionLeanerFeeds),
fetchContext: fetchContext);
}
else {
result = await Twitter.searchTweets(searchQuery, widget.includeReplies, limit: 100,
cursor: searchCursor,
cursorType: cursorType,
leanerFeeds: prefs.get(optionLeanerFeeds),
fetchContext: fetchContext);
}
}
catch (rsp) {
if (rsp is Exception) {
log.severe(rsp.toString());
}
_errorResponse = _errorResponse ?? (rsp is Exception ? ExceptionResponse(rsp) : rsp as Response);
return tweets;
}

if (result.chains.isNotEmpty) {
// avoid duplicates
for (var cElm in result.chains) {
if (tweets.firstWhereOrNull((tElm) => cElm.id == tElm.id) == null) {
tweets.add(cElm);
}
}

// Make sure we insert the set of cursors for this latest chunk, ready for the next time we paginate
await repository.insert(tableFeedGroupChunk, {
'group_id': widget.group.id,
'hash': hash,
'cursor_top': result.cursorTop,
'cursor_bottom': result.cursorBottom,
'response': jsonEncode(result.chains.map((e) => e.toJson()).toList())
});
}
}
else {
await fetchContext.fetchNoResponse();
}

return tweets;
}));
}

// Wait for all our searches to complete, then build our list of tweet conversations
List<List<TweetChain>> result = await Future.wait(futures);
return result.expand((e) => e).toList();
}

/// Search for our next "page" of tweets.
///
/// Here, each page is actually a set of mappings, where the ID of each set is the hash of all the user IDs in that
/// set. We store this along with the top and bottom pagination cursors, which we use to perform pagination for all
/// sets at the same time, allowing us to create a feed made up of individual search queries.
Future _listTweets() async {
Future<void> _listTweets() async {
try {
List<Future<List<TweetChain>>> futures = [];

var repository = await Repository.writable();

BasePrefService prefs = PrefService.of(context);
Expand All @@ -177,104 +280,16 @@ class SubscriptionGroupFeedState extends State<SubscriptionGroupFeed> with Widge
}

_errorResponse = null;
RateFetchContext fetchContext = RateFetchContext(prefs.get(optionEnhancedFeeds) ? Twitter.graphqlSearchTimelineUriPath : Twitter.searchTweetsUriPath, widget.searchQueries.length);
await fetchContext.init();
for (var searchQuery in widget.searchQueries) {
String hash = await sha1Hash(searchQuery);

futures.add(Future(() async {
var tweets = <TweetChain>[];

String? searchCursor;
String? cursorType;
bool requestToDo = false;

var storedChunks = await repository.query(tableFeedGroupChunk,
where: 'group_id = ? AND hash = ?', whereArgs: [widget.group.id, hash], orderBy: 'created_at DESC');
if (_data.isEmpty) {
requestToDo = true;
// Make sure we load any existing stored tweets from the chunk
var storedChunksTweets = storedChunks
.map((e) => jsonDecode(e['response'] as String))
.map((e) => List.from(e))
.expand((e) => e.map((c) => TweetChain.fromJson(c)))
.toList();

tweets.addAll(storedChunksTweets);

// Use the latest chunk's top cursor to load any new tweets since the last time we checked
var latestChunk = storedChunks.firstOrNull;
if (latestChunk != null) {
searchCursor = latestChunk['cursor_top'] as String;
cursorType = 'cursor_top';
} else {
// Otherwise we need to perform a fresh load from scratch for this chunk
searchCursor = null;
}
} else {
// We're currently at the end of our current feed, so get the oldest chunk's bottom cursor to load older tweets.
if (storedChunks.isNotEmpty) {
requestToDo = true;
searchCursor = storedChunks.last['cursor_bottom'] as String;
cursorType = 'cursor_bottom';
}
}

if (requestToDo) {
// Perform our search for the next page of results for this chunk, and add those tweets to our collection
TweetStatus result;
try {
if (prefs.get(optionEnhancedFeeds)) {
result = await Twitter.searchTweetsGraphql(searchQuery, widget.includeReplies, limit: 100,
cursor: searchCursor,
leanerFeeds: prefs.get(optionLeanerFeeds),
fetchContext: fetchContext);
}
else {
result = await Twitter.searchTweets(searchQuery, widget.includeReplies, limit: 100,
cursor: searchCursor,
cursorType: cursorType,
leanerFeeds: prefs.get(optionLeanerFeeds),
fetchContext: fetchContext);
}
}
catch (rsp) {
if (rsp is Exception) {
log.severe(rsp.toString());
}
_errorResponse = _errorResponse ?? (rsp is Exception ? ExceptionResponse(rsp) : rsp as Response);
return tweets;
}

if (result.chains.isNotEmpty) {
// avoid duplicates
for (var cElm in result.chains) {
if (tweets.firstWhereOrNull((tElm) => cElm.id == tElm.id) == null) {
tweets.add(cElm);
}
}

// Make sure we insert the set of cursors for this latest chunk, ready for the next time we paginate
await repository.insert(tableFeedGroupChunk, {
'group_id': widget.group.id,
'hash': hash,
'cursor_top': result.cursorTop,
'cursor_bottom': result.cursorBottom,
'response': jsonEncode(result.chains.map((e) => e.toJson()).toList())
});
}
}
else {
await fetchContext.fetchNoResponse();
}

return tweets;
}));
List<TweetChain> threads = [];
// no more than 5 parallel executions
List<List<String>> partSearchQueries = partition(widget.searchQueries, 5).toList();
for (int i = 0; _errorResponse == null && i < partSearchQueries.length; i++) {
List<String> searchQueries = partSearchQueries[i];
List<TweetChain> parallelThreads = await _listParallelTweets(searchQueries);
threads.addAll(parallelThreads);
}

// Wait for all our searches to complete, then build our list of tweet conversations
var result = (await Future.wait(futures));
var threads = result.expand((element) => element).sorted((a, b) {
threads.sort((a, b) {
var aCreatedAt = a.tweets[0].createdAt;
var bCreatedAt = b.tweets[0].createdAt;

Expand All @@ -283,7 +298,7 @@ class SubscriptionGroupFeedState extends State<SubscriptionGroupFeed> with Widge
}

return bCreatedAt.compareTo(aCreatedAt);
}).toList();
});

if (!mounted) {
return;
Expand Down
1 change: 0 additions & 1 deletion lib/group/group_screen.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'package:flutter/material.dart';
import 'package:flutter_triple/flutter_triple.dart';
import 'package:provider/provider.dart';
import 'package:quiver/iterables.dart';
import 'package:scrollable_positioned_list/scrollable_positioned_list.dart';
import 'package:squawker/database/entities.dart';
import 'package:squawker/generated/l10n.dart';
Expand Down

0 comments on commit 29ca7ac

Please sign in to comment.