-
Notifications
You must be signed in to change notification settings - Fork 63
How to make a processor
4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the latter.
Processors are bits of code that produce a dataset. Typically, their input is another dataset. As such they can be used to analyse data; for example, a processor can take a csv file containing posts as input, count how many posts occur per month, and produce another csv file with the amount of posts per month (one month per row) as output. Processors always produce the following things:
- A set of metadata for the Dataset the processor will produce. This is stored in 4CAT's PostgreSQL database. The record for the database is created when the processor's job is first queued, and updated by the processor.
- A result file, which may have an arbitrary format. This file contains whatever the processor produces, e.g. a list of frequencies, an image wall or a zip archive containing word embedding models.
- A log file, with the same file name as the result file but with a '.log' extension. This documents any output from the processor while it was producing the result file.
4CAT has an API that can do most of the scaffolding around this for you so processors can be quite lightweight and mostly focus on the analysis while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.
This is a minimal, annotated example of a 4CAT processor:
"""
A minimal example 4CAT processor
"""
from backend.abstract.processor import BasicProcessor
class ExampleProcessor(BasicProcessor):
"""
Example Processor
"""
type = "example-processor" # job type ID
category = "Examples" # category
title = "A simple example" # title displayed in UI
description = "This doesn't do much" # description displayed in UI
extension = "csv" # extension of result file, used internally and in UI
def process(self):
"""
Saves a CSV file with one column ("value") and one row with a value ("Hello
world") and marks the dataset as finished.
"""
data = [{"value": "Hello world!"}]
self.write_csv_items_and_finish(data)
- Create a new file in the
processors
directory- For example:
processors/conversion/example.py
- For example:
- Copy the sample code from above to that new file
- Copy that file to BOTH the 4CAT backend and frontend Docker containers
- From your 4CAT directory run
docker cp processors/conversion/example.py 4cat_backend:/usr/src/app/processors/conversion/example.py
thendocker cp processors/conversion/example.py 4cat_frontend:/usr/src/app/processors/conversion/example.py
- From your 4CAT directory run
- Start or restart your Docker containers
- From the Docker GUI, click the play button (or click stop then start)
- Or from the command line run
docker-compose stop
thendocker-compose start
- Navigate with your browser to any existing dataset (or create a new one)
- Scroll down to the new Examples category and run your new processor!
Note: strictly speaking, you can create this file directly in the docker containers and do not need a clone/copy of 4CAT outside of the Docker containers, but it will likely be useful to use your text editor/IDE of choice and manage a git fork/branch.
Processor settings and metadata is stored as a class property. The following properties are available and can be set:
-
type
(string) - A unique identifier for this processor type. Used internally to queue jobs, store results, et cetera. Should be a string without any spaces in it. -
category
(string) - A category for this processor, used in the 4CAT web interface to group processors thematically. Displayed as-is in the web interface. -
title
(string) - Processor title, displayed as-is in the web interface. -
description
(string) - Description, displayed in the web interface. Markdown in the description will be parsed. -
extension
(string) - Extension for files produced by this processor, e.g.csv
orndjson
. -
options
(dictionary) - A dictionary of configurable input fields for this processor. See the page on input fields for more details. -
interrupted
(bool) -false
by default. This may be set totrue
by 4CAT's scheduler. Once it istrue
, the processor should return as soon as possible, even if processing is not complete yet. If processing has not finished yet, the result so far (if any) should be discarded and the dataset status updated to reflect that it has been interrupted. Since abort procedures are different per processor, this is the processor's responsibility. The simplest way of addressing this is raising aProcessorInterruptedException
(found incommon.lib.exceptions
); this will gracefully stop the processor and clean up so it may be attempted again later. -
followups
(list) - Optional. A list of processor type IDs that can be listed in the interface as being possible to run on the result of this processor. This is purely informative to the user - actual compatibility is determined by theis_compatible_with
method (see below).
The full API available to processors is as follows. All of these are members of the processor object, i.e. they are accessed via self.property
within the processor code. While other methods or classes may be available within processors, relying on them is discouraged and unsupported; when possible, use only the API documented here.
-
process() -> void
: This method should contain the processor's logic. Other methods may be defined, but this one will be called when an analysis with this processor is queued. -
iterate_archive_contents(Path source_file) -> Generator
: This yieldsPath
objects for each file in the given archive. Files are temporarily uncompressed so they can be worked with, and then the temporary file is deleted afterwards. -
write_csv_items_and_finish(list data) -> void
: Writes a list of dictionaries as a CSV as the result of this processor, and finishes processing. Raises aProcessorInterruptedException
if the processor has been interrupted while writing. -
write_archive_and_finish(list|Path files) -> void
: Compresses all files in listfiles
, or all files in folderfiles
, into a zip archive and saves that as the dataset result. Files are deleted after adding them, and iffiles
is a folder the folder is deleted too. Useful when processor output cannot be conveniently saved into one file. -
map_item(item) -> dict
: Optional. If defined, any item yielded byDataSet.iterate_items()
(see below) for datasets produced by this processor will be passed through this method first, and the result of this method will be passed instead. This is especially useful for datasets that are stored as NDJSON, as items may be stored as nested objects with arbitrary depth in that format, andmap_item()
can be used to reduce them to 'flat' dictionaries before processing. -
is_compatible_with(DataSet) -> bool
: Optional. If defined, this method should be decorated as a@classmethod
and takecls
as its first argument. For every dataset, this method is then used to determine if the processor can run on that dataset. If the processor does not define this method, it will be available for any 'top-level' dataset (i.e. any dataset created via 'Create Dataset'). -
get_options(DataSet) -> dict
: Optional. If defined, this method should be decorated as a@classmethod
and takecls
as its first argument. The method should then return a processor options definition (see below) that is compatible with the given dataset. If this method is not defined, the (static)options
attribute of the processor class is used instead.
-
dataset
(DataSet
): The dataset produced by this processor-
dataset.finish(int num_items_in_result) -> void
: Manually mark the dataset as finished. Either this method orwrite_csv_and_finish()
should always be called at the end ofprocess()
. Ifnum_items_in_result
is 0 or less, no download link is displayed for the result in the web interface. If it is 0, "No results" is additionally displayed in the interface. -
dataset.get_result_path() -> pathlib.Path
: Returns aPath
object referencing the location the processor result should be saved to. -
dataset.update_status(str status, bool is_final=False) -> void
: Updates the dataset status. This can be used to indicate processor progress to the user through the web interface. Note that this updates the database, which is relatively expensive, and you should not call it too often (for example, not every iteration of a loop, but only every 500 iterations). Ifis_final
is set, subsequent calls to this method have no effect. -
dataset.update_progress(float progress) -> void
: Updates the dataset progress, which is a float between 0 and 1 and is used in the web interface to show the progress of the processor. Same caveats regarding calling frequency apply as fordataset.update_status()
. -
dataset.iterate_items(BasicProcessor) -> Generator
: This yields one item from the dataset's data file per call and raises aProcessorInterruptedException
if the processor has been interrupted while iterating. Can be used on datasets that are CSV or NDJSON files. If possible, you should always use this method instead of interacting with the source file directly. If themap_item()
method (see below) is defined for the processor that produced the dataset, the result of that method called with the item as an argument will be yielded instead. -
dataset.get_staging_area() -> pathlib.Path
: Returns aPath
object referencing a newly created folder that can be used as a staging area; files can be stored here while the processor is active. After the processor finishes, files here may be deleted and cannot be relied on. You should nevertheless take care of this within the processor, and delete the path (e.g. withshutil.rmtree()
) after it is no longer needed, or use theprocessor.write_archive_and_finish()
method with the staging area Path as an argument, which will also delete it.
-
-
source_dataset
: Same structure as above, but this is the finished dataset the result of which this processor is running on, i.e. the source data that is being processed. -
source_file
(pathlib.Path
): The file to be processed as input, i.e. the data file corresponding to theparent
Dataset -
parameters
(dict
): Options set by the user for this processor
🐈🐈🐈🐈