-
Notifications
You must be signed in to change notification settings - Fork 63
How to make a data source
- Overview
- Installation code explained
- Search workers explained
- Web tool interface options
- Walkthru Example
4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the former.
Data sources are a collection of workers, processors and interface elements that extend 4CAT to allow scraping, processing and/or retrieving data for a given platform (such as Instagram, Reddit or Telegram). 4CAT has APIs that can do most of the scaffolding around this for you so data source can be quite lightweight and mostly focus on retrieving the actual data while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.
Data sources are defined as an arbitrarily-named folder in the datasources
folder in the 4CAT root. It is recommended to use the datasource ID (see below) as the data source folder name. However, since Python files included in the folder will be included as modules by 4CAT, folder names should be allowed as module names. Concretely this means (among other things) that data source folder names cannot start with a number (hence the fourchan
data source).
WARNING: Data sources in multiple ways can define arbitrary code that will be run by either the 4CAT server or client-side browsers. Be careful when running a data source supplied by someone else.
A data source will at least contain the following:
- An
__init__.py
containing data source metadata and initialisation code - A search worker, which can collect data according to provided parameters and format it as a CSV or NDJSON file that 4CAT can work with.
It may contain additional components:
- Any processors that are specific to datasets created by this data source
- Views for the web app that allow more advanced behaviour of the web tool interface
- Database or Sphinx index definitions
The instructions below describe how to format and create these components (work in progress!)
The data source root should contain a file __init__.py
which in turn defines the following:
DATASOURCE = "datasource-identifier"
This constant defines the data source ID. This is used by 4CAT internally to figure out what data source a dataset belongs to and so on.
def init_datasource(database, logger, queue, name):
pass
This function is called when 4CAT starts, if the data source is enabled, and should set up anything the data source needs to function (e.g. queueing any recurring workers). A default implementation of this function can be used instead (and when defining your own, it is advised to still call it as part of your own implementation):
from backend.lib.helpers import init_datasource
The search worker is run when a dataset is created by someone, and collects the data for that dataset (i.e. the posts from the platform matching the given dataset parameters), writing it to the dataset result file. It is contained in an arbitrarily named Python file in the data source root (we recommend search_[datasource].py
). The file should define a class that extends backend.abstract.search.Search
. This class should define the following attributes and methods:
-
str type
: Identifier used by the scheduler to know what code to run for jobs for this data source. Should be[datasource-id]-search
,datasource-id
being equal to the ID defined in__init__.py
. -
str extension
: Optional. The extension (format) of the output data file. If omitted,csv
is assumed; the other format currently supported isndjson
. Using any other extension will result in aNotImplementedError
being raised. -
int max_workers
: Optional, default 1. The amount of search workers that may run in parallel for this data source. Usually, you want to keep this at 1, unless you are confident your server can handle multiple parallel workers of this type. -
dict options
: Optional, default empty. Defines parameters that can be configured when querying this data source. These can be defined via a dictionary here, or via theget_options()
method: see Input fields for data sources and processors for more information.
-
validate_query(dict query, Request request, User user) -> dict
: Called statically by the web tool whenever a new dataset is created by someone.query
contains the form fields as set in the web interface; this method should return a sanitised version of that query, containing only fields and values relevant to this search worker. On invalid input, acommon.lib.exceptions.QueryParametersException
should be raised which will prompt the one creating the dataset to change their input and resubmit. You can also raise acommon.lib.exceptions.QueryNeedsFurtherInputException(config)
whereconfig
is a definition of further form fields that need to be completed, which will be shown in the interface while asking the user to submit again. The form fields can be defined in the same format as the 'normal' search parameter options (seeget_options()
). -
get_items(self, dict query) -> generator
: Yields items matching the query parameters. These are the 'search results' that will comprise the dataset. -
import_from_file(self, str path) -> generator
: Similar toget_items()
, but takes a file path as parameter and yields items from that path as items to save in the dataset. Support for this is currently limited but it will serve as the basis for a generic 'import' feature for 4CAT in the future. A generic version of this method is part of the abstract class but it will usually require overriding to fit the nuances of the data source. -
after_search(self, list items) -> list
: Optional. If defined, this will be called after all posts have been retrieved with the methods listed above and, if appropriate, any sampling or such. This method should yield items, likeget_items()
. You can use it to e.g. perform additional item filtering or processing should your data source require it. -
get_options(cls, parent_dataset, user) -> dict
: Optional. If defined, this will be called to determine the options displayed in the 4CAT web interface when querying the data source, analogous to theoptions
class property (this method overrides that property, if present). See Input fields for data sources and processors for more information.
You can also descend from SearchWithScope
, which has built-in support for a couple of more advanced modes of querying data. This is particularly useful if data is available locally, and doesn't require round-trips to some remote server. Specifically, SearchWithScope
has functionality to do "full-thread" querying (i.e. all posts in a thread that contains a particular amount of matching posts). To this end, it requires definition of the following additional methods:
-
get_search_mode(self, dict query) -> str
: Returnsimple
orcomplex
. Ifsimple
,get_items_simple()
is used to retrieve posts. Else,get_items_complex()
is used. This can be used to define 'fast lane' search methods for simpler queries if shortcuts can be taken. Of course, you can also always make it return either of the options if that is not relevant to your data source. -
get_items_simple(self, dict query) -> generator
: Get posts via the 'simple' path. -
get_items_complex(self, dict query) -> generator
: Get posts via the 'complex' path. -
fetch_posts(self, list post_ids) -> list
: Should be used byget_items_*()
to retrieve the actual item data. Takes a list ofpost_ids
(as determined by theget_items_*
method) and retrieves data for those post IDs, e.g. via an API or a local database. -
fetch_threads(self, list thread_ids) -> list
: Retrieves all posts for the giventhread_ids
. -
get_thread_lengths(self, list thread_ids, int min_length) -> dict
: Should return a dictionary with thread IDs as keys and amount of posts per thread as values, for all threads with at leastmin_length
posts.
Additionally, because search workers are (after a fashion) architecturally equivalent to processors, they have access to all the attributes a processor has access to, e.g. dataset
. See the page for processors for more information on these. In particular, the map_item
method can be useful to define for data sources that return complex (e.g. multi-dimensional) data.
People can use 4CAT to create new datasets with your data source. To this end, the data source should define an interface through which dataset parameters may be set via the options
property or get_options()
method (see above). Data sources can additionally contain a folder webtool
with the following files:
-
views.py
: Optional. This can define additional views for the 4CAT Flask app. Any function defined in this file will be available as a view via/api/datasource-call/[datasource-id]/[function name]/
. Functions should have the signaturefunction(request, user, **kwargs)
:request
anduser
are objects supplied by Flask,**kwargs
is all HTTP GET parameters as keyword arguments. The function should return an object (remember that in Python everything is an object), which will be serialised as JSON as the view output.
Let’s create a simple datasource that searches Wikipedia articles and collects the results. According to the Wikimedia API documentation, we need to create an account and generate an API key (for personal use is fine).
- We will create a new folder in 4CAT’s datasources folder called “wikipedia”.
- Inside this folder, we create a file called
__init__.py
and one calledsearch_wikipedia.py
.
datasources/
wikipedia/
__init__.py
search_wikipedia.py
A simple __init__.py
file is all we need for 4CAT to recognize our new datasource.
"""
Initialize Wikipedia data source
"""
# An init_datasource function is expected to be available to initialize this
# data source. A default function that does this is available from the
# backend helpers library.
from common.lib.helpers import init_datasource
# Internal identifier for this data source
DATASOURCE = "wikipedia" # this will be used internally
NAME = "Wikipedia" # this will be displayed via the UI
This file will contain a new class from our generic Search
class. There are some basic information needed:
-
type
acts as the job identifier in our database; Search workers must end in-search
otherwise they may not be identified correctly -
title
will display in the frontend UI for users to distinguish this datasource -
extension
defaults to “csv”, but this API returns JSON objects so we will store them as “ndjson” (which stores one JSON object per line in a file) -
is_local
is eitherTrue
orFalse
and denotes whether the data is already store locally (e.g., in a database); this is not as it is retrieved from an API -
is_static
is eitherTrue
orFalse
and denotes whether the data is continuously updated (only applies to local datasources) -
references
is not required, but, if provided, can give context to other users as to how the data is collected and what it represents
"""
Wikipedia article search
"""
from backend.lib.search import Search
class SearchWikipedia(Search):
"""
Get Wikipedia articles via Wikimedia's API
"""
type = "wikipedia-search" # job ID
title = "Wikipedia Search"
extension = "ndjson"
is_local = False # Whether this datasource is locally scraped
is_static = False # Whether this datasource is still updated
references = [
"[Wikimedia API](https://api.wikimedia.org/wiki/Searching_for_Wikipedia_articles_using_Python)",
]
Next we will add the required methods to our class. Only three methods are required: get_options
, validate_query
, and get_items
, but a fourth method map_item
is needed since we are handling JSON data.
First, get_options
defines the parameters a user inputs in 4CAT’s frontend UI to create a new dataset. We will simply ask for the API key and a query, but there are many more possible option types which you can read about here.
Add this import to the top of your search worker file:
from common.lib.user_input import UserInput
And this method to the SearchWikipedia
class:
@classmethod
def get_options(cls, parent_dataset=None, user=None):
"""
Options for the user to provide in order to run the search
:param parent_dataset: Should always be None
:param user: User to provide options for
:return dict: Data source options
"""
options = {
"intro-1": {
"type": UserInput.OPTION_INFO,
"help": "Search Wikipedia articles and retreive their metadata. You will need a Wikimedia API key which can be obtained by creating a user account and request a personal API key. They provide [instructions here.](https://api.wikimedia.org/wiki/Getting_started_with_Wikimedia_APIs)"
},
"api_key": {
"type": UserInput.OPTION_TEXT,
"sensitive": True, # Ensures 4CAT knows to keep this value secret and not stored in the 4CAT database
"cache": True, # Allows the value to be cached by the user's browser to use again
"help": "API Key" # “help” text is displayed for the user to view in the UI
},
"query": {
"type": UserInput.OPTION_TEXT,
"help": "Search query"
},
}
return options
Next the validate_query
method is required. It takes the option inputs provided by the user and can be used to ensure they meet certain requirements. It then provides the dictionary of options to the get_items
method. For our example, we will just ensure they are not blank.
Add the query exception to the top of our search worker file:
from common.lib.exceptions import QueryParametersException
And add the method to the SearchWikipedia
class:
@staticmethod
def validate_query(query, request, user):
"""
Validate the options input needed to query the Wikipedia data source.
Will raise a QueryParametersException if invalid parameters are
encountered. Parameters are additionally sanitised.
:param dict query: Query parameters, from client-side.
:param request: Flask request
:param User user: User object of user who has submitted the query
:return dict: Safe query parameters
"""
# Please provide an API key
if not query.get("api_key", None):
raise QueryParametersException("Please provide an API key.")
# Please provide some query
if not query.get("query", None):
raise QueryParametersException("Please provide a query.")
# Great, query is fine to return
return query
Next, we will do the meat of the work in the get_items
method. This takes the parameters provided and collects the results from the API yielding them one by one (or all at once if desired).
Add a couple more imports:
from datetime import datetime, timezone
import requests
from common.lib.exceptions import ProcessorInterruptedException
And this method to our SearchWikipedia class:
def get_items(self, query):
"""
Use the Wikimedia API to collect articles
:param query:
:return:
"""
api_key = query.get("api_key") # corresponds to the "api_key" option in `get_options`
if api_key is None:
# Because API keys are not stored per the `sensitive` parameter in `get_options`, if this dataset was interrupted, it cannot be resumed
self.dataset.update_status(
"Wikipedia query failed or was interrupted; please create new query in order to provide your API key again.",
is_final=True)
return [] # No items to return
# Get the query
language_code = 'en' # Wikipedia needs a language code; we could add an option for this
search_query = query.get("query") # corresponds to the "query" option in `get_options`
number_of_results = 10 # Another great option to add
headers = {
'Authorization': api_key,
'User-Agent': '4CAT (4cat.nl)' # Be nice to open source resources and let them know who is using their API
}
base_url = 'https://api.wikimedia.org/core/v1/wikipedia/'
endpoint = '/search/page'
url = base_url + language_code + endpoint
parameters = {'q': search_query, 'limit': number_of_results}
self.dataset.update_status("Querying Wikimedia API for {}".format(search_query)) # update_status adds a message to the dataset's status log and to the UI
response = requests.get(url, headers=headers, params=parameters)
collection_time = datetime.now(tz=timezone.utc).timestamp() # Add a collection timestamp for metadata
if response.status_code != 200:
self.dataset.update_status(
"Wikimedia API query failed with status code {} and reason {}.".format(response.status_code, response.reason),
is_final=True)
return []
total_results = len(response.json()['pages'])
# Get the data
for i, article_result in enumerate(response.json()['pages']):
if self.interrupted:
# In this example, it is not necessary to check for interruptions, but if we were following up with more
# queries or collecting the actual articles, we would need to check 4CAT for interruptions
raise ProcessorInterruptedException("Interrupted while fetching articles from the Wikimedia API")
# It is a good practice to add some metadata to items such as collection time
article_result['4CAT_metadata'] = {"collected_at": collection_time, "query": search_query, 'language_code': language_code}
yield article_result
self.dataset.update_progress(i+1 / total_results) # update_progress updates the progress bar in the UI
Finally, we need to map the resultant JSON for 4CAT to understand which fields are relevant to the user. In a CSV file, 4CAT will use each column heading as the field names, but with JSONs this is not always possible. Our Wikimedia articles are formatted like so:
{'id': 26903,
'key': 'Solar_System',
'title': 'Solar System',
'excerpt': 'The <span class="searchmatch">Solar</span> <span class="searchmatch">System</span> is the gravitationally bound <span class="searchmatch">system</span> of the Sun and the objects that orbit it. The largest of these objects are the eight planets, which',
'matched_title': None,
'description': 'The Sun and objects orbiting it',
'thumbnail': {'mimetype': 'image/jpeg',
'width': 60,
'height': 34,
'duration': None,
'url': '//upload.wikimedia.org/wikipedia/commons/thumb/1/19/Solar_System_true_color.jpg/60px-Solar_System_true_color.jpg'}}
We will use map_item
to format as desired. The original NDJSON file will be saved, but analyses that require users to select columns or need specific fields will use this map_item
method.
Additional imports:
from common.lib.item_mapping import MappedItem
And the map_item
method to add to SearchWikipedia
:
@staticmethod
def map_item(item):
"""
Map a nested Wikipedia object to a flat dictionary
:param item: Wikipedia object as originally returned by the VK API
:return dict: Dictionary in the format expected by 4CAT
"""
# We return a MappedItem object as that can allow us some flexibility to handle issues in the data (such as
# missing fields) and notify users if needed
fourcat_metadata = item.get('4CAT_metadata', {})
collected_at = datetime.fromtimestamp(item.get('4CAT_metadata', {}).get('collected_at'))
language = fourcat_metadata.get('language_code')
display_title = item['title']
article_url = 'https://' + language + '.wikipedia.org/wiki/' + item['key']
return MappedItem({
# Some fields are required by 4CAT
"id": item["id"],
"timestamp": collected_at.strftime("%Y-%m-%d %H:%M:%S"),
"body": item.get("excerpt", ""), # The "body" field is often used by default in 4CAT as the main text of the item
"author": "", # We don't have an author for Wikipedia articles, but it is a required field
"thread_id": "", # We don't have a thread ID for Wikipedia articles, but it is a required field
# Additional data
"link": article_url,
"subject": display_title, # "subject" is a commonly used field in 4CAT
"description": item.get("description", ""),
"image_url": "https:" + item.get("thumbnail", {}).get("url", "") if item.get("thumbnail") else "", # "image_url" is a commonly used field in 4CAT and should be a valid link for 4CAT to download later
# Metadata if desired; can be useful if we collected multiple queries and/or languages
"language_code": language,
"query": fourcat_metadata.get('query', ""),
"unix_timestamp": int(collected_at.timestamp()),
})
There we have it! Now you can restart 4CAT (both the backend and frontend as each need to register the new datasource) and activate your datasource via the Control Panel:
- Control Panel -> Settings -> Data sources
- Find your new datasource to enable and save!
- Go to Create Dataset and try it out.
- You receive a 500 error message when trying to view the datasource
Check your 4CAT frontend logs to see the error message. Most likely it has to do with
map_item
no properly handling the results.
🐈🐈🐈🐈