Skip to content

Commit

Permalink
Merge pull request #1115 from airbnb/support_parquet_format
Browse files Browse the repository at this point in the history
[core] Support to store data in Parquet format and create Athena tables by Terraform
  • Loading branch information
chunyong-lin authored Mar 6, 2020
2 parents 81365dc + f95c078 commit d57a2ab
Show file tree
Hide file tree
Showing 43 changed files with 944 additions and 290 deletions.
1 change: 0 additions & 1 deletion conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"use_prefix": true,
"buffer_interval": 900,
"buffer_size": 128,
"compression_format": "GZIP",
"enabled": false,
"enabled_logs": {}
},
Expand Down
3 changes: 2 additions & 1 deletion conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
},
"athena_partition_refresh_config": {
"concurrency_limit": 10,
"file_format": null,
"log_level": "info"
},
"classifier_config": {},
Expand Down Expand Up @@ -79,7 +80,7 @@
"timeout": 120
},
"rules_engine_config": {
"concurrency_limit": 200,
"concurrency_limit": 10,
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
Expand Down
Binary file removed docs/images/alerts-query.png
Binary file not shown.
Binary file added docs/images/athena-alerts-search.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/athena-data-search.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/images/athena-refresh-arch.png
Binary file not shown.
Binary file removed docs/images/athena-usage-1.png
Binary file not shown.
Binary file removed docs/images/athena-usage-2.png
Binary file not shown.
Binary file removed docs/images/athena-usage-3.png
Binary file not shown.
Binary file removed docs/images/athena-usage-4.png
Binary file not shown.
Binary file added docs/images/historical-search.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions docs/source/config-global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ was triggered, the source of the log, the date/time the alert was triggered, the
which the log came, and a variety of other fields.


.. _alerts_firehose_configuration:

Configuration
-------------
The following ``alerts_firehose`` configuration settings can be defined within the ``infrastructure``
Expand All @@ -110,8 +112,7 @@ section of ``global.json``:
"bucket_name": "<prefix>-streamalerts",
"buffer_size": 64,
"buffer_interval": 300,
"cloudwatch_log_retention": 14,
"compression_format": "GZIP"
"cloudwatch_log_retention": 14
}
}
}
Expand All @@ -127,7 +128,6 @@ Options
before delivering it to S3
``buffer_interval`` No ``300`` (seconds) Buffer incoming data for the specified period of time, in
seconds, before delivering it to S3
``compression_format`` No ``GZIP`` The compression algorithm to use on data stored in S3
``cloudwatch_log_retention`` No ``14`` (days) Days for which to retain error logs that are sent to CloudWatch
in relation to this Kinesis Firehose Delivery Stream
============================= ============ ========================== ===============
Expand Down Expand Up @@ -206,6 +206,8 @@ Options
=============== ============ =========== ===============


.. _firehose_configuration:

Firehose (Historical Data Retention)
====================================
StreamAlert also supports sending all logs to S3 for historical retention and searching based on
Expand All @@ -228,7 +230,6 @@ section of ``global.json``:
"bucket_name": "<prefix>-streamalert-data",
"buffer_size": 64,
"buffer_interval": 300,
"compression_format": "GZIP",
"enabled_logs": {
"osquery": {
"enable_alarm": true
Expand Down Expand Up @@ -258,7 +259,6 @@ Options
``bucket_name`` No ``<prefix>-streamalert-data`` Bucket name to override the default name
``buffer_size`` No ``64`` (MB) Buffer incoming data to the specified size, in megabytes, before delivering it to S3
``buffer_interval`` No ``300`` (seconds) Buffer incoming data for the specified period of time, in seconds, before delivering it to S3
``compression_format`` No ``GZIP`` The compression algorithm to use on data stored in S3
``enabled_logs`` No ``{}`` Which classified log types to send to Kinesis Firehose from the Classifier
function, along with specific settings per log type
======================= ============ ============================== ===============
Expand Down
4 changes: 2 additions & 2 deletions docs/source/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ If not, look for any errors in the CloudWatch Logs for the StreamAlert Lambda fu
`Amazon Athena <https://console.aws.amazon.com/athena>`_. Select your StreamAlert database in the
dropdown on the left and preview the ``alerts`` table:

.. figure:: ../images/alerts-query.png
.. figure:: ../images/athena-alerts-search.png
:alt: Query Alerts Table in Athena
:align: center
:target: _images/alerts-query.png
:target: _images/athena-alerts-search.png

(Here, my name prefix is ``testv2``.) If no records are returned, look for errors
in the ``athena_partition_refresh`` function or try invoking it directly.
Expand Down
219 changes: 71 additions & 148 deletions docs/source/historical-search.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,96 @@
Historical Search
#################

The historical data retention and search feature in StreamAlert is backed by Amazon Athena and S3.
Amazon Athena is a serverless query service used to analyze large volumes of data stored in S3.
StreamAlert historical search feature is backed by Amazon S3 and `Athena <https://aws.amazon.com/athena/>`_ services. By default, StreamAlert will send all alerts to S3 and those alerts will be searchable in Athena table. StreamAlert users have option to enable historical search feature for data as well.

Data in Athena is searchable via ANSI SQL and powered by Presto.
As of StreamAlert v3.1.0, a new field, ``file_format``, has been added to ``athena_partition_refresh_config`` in ``conf/lamba.json``, defaulting to ``null``. This field allows users to configure how the data processed by the Classifier is stored in S3 bucket—either in ``parquet`` or ``json``. Prior to v3.1.0, all data was stored in ``json``. When using this format, Athena's search performance degrades greatly when partition sizes grow. To address this, we've introduce support for ``parquet`` to provide better Athena search performance and cost saving.

StreamAlert uses Amazon Athena for historical searching of:
.. note::

* Generated alerts from StreamAlert, enabled within StreamAlert out of the box
* All incoming log data sent to StreamAlert, configurable after StreamAlert initialization
* When upgrading StreamAlert to v3.1.0, it is required to change the default ``file_format`` value to either ``parquet`` or ``json``, otherwise StreamAlert will raise ``MisconfigurationError`` exception when run ``python manage.py build``.
* For existing deployments, ``file_format`` can be set to ``json`` and there will have no change occurred. However, if the ``file_format`` is changed to ``parquet``, all Athena tables need to be created to load ``parquet`` format. The existing JSON data won't be searchable anymore unless we build a separated tables to process data in JSON format. (All data stay in S3 bucket, there is no data loss.).
* For new StreamAlert deployments, it is recommended to set ``file_format`` to ``parquet`` to take the advantage of better Athena search performance and save the cost when scanning data.
* In the future release, the default value of ``file_format`` will change to ``parquet``. So let's change now!

This works by:
************
Architecture
************

* Creating a ``streamalert`` Athena database
* Creating Athena tables to read S3 data
* Using a Lambda function to periodically refresh Athena to make the data searchable
.. image:: ../images/historical-search.png
:align: left

The pipeline is
* StreamAlert creates an Athena Database, alerts kinesis Firehose and ``alerts`` table during initial deployment
* Optional to create Firehose and Athena tables for data
* S3 events will be sent to SQS to invoke ``athena_partition_refresh`` lambda function to add new partitions when there are new alerts or data saved in S3 bucket via Firehose
* New alerts and data are available for searching via Athena console or SDK

****************
General Concepts
****************
* `Amazon Athena details <https://aws.amazon.com/athena/details/>`_
* `Amazon Athena tables <http://docs.aws.amazon.com/athena/latest/ug/creating-tables.html>`_
* `AWS Lambda FAQ <https://aws.amazon.com/athena/faqs/>`_
* `AWS Lambda pricing <https://aws.amazon.com/athena/pricing/>`_
.. _alerts_search:

*************
Alerts Search
*************

***************
Getting Started
***************
Searching of alerts is enabled within StreamAlert out of the box, and can be further extended to search all incoming log data.
* Review alert Firehose configuration, see :ref:`alerts_Firehose_configuration` in ``CONFIGURATION`` session. Athena database and Athena alerts table are created automatically when you first deploy StreamAlert.
* If the ``file_format`` is set to ``parquet``, you can run ``MSCK REPAIR TABLE alerts`` command in the Athena to load all available partitions and then alerts can be searchable. However, using ``MSCK REPAIR`` command can not load new partitions automatically.
* StreamAlert provides a lambda function ``athena_partition_refresh`` to load new partitions to Athena tables once the data arrives in the S3 buckets automatically. Update ``athena_partition_refresh_config`` if necessary. Open ``conf/lambda.json``. See more settings :ref:`configure_athena_partition_refresh_lambda`

To create tables for searching data sent to StreamAlert, run:
.. code-block:: bash
.. code-block:: bash
python manage.py athena create-table \
--bucket <prefix>-streamalert-data \
--table-name <log_name>
The log name above reflects an enabled log type in your StreamAlert deployment. These are also top level keys in the various files under the ``schemas`` directory.
{
"athena_partition_refresh_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
}
}
For example, if you have 'cloudwatch' in your sources, you would want to create tables for all possible subtypes. This includes ``cloudwatch:control_message``, ``cloudwatch:events``, and ``cloudwatch:flow_logs``. The ``:`` character is not an acceptable character in table names due to a Hive limitation, but your arguments can be either ``cloudwatch:events`` **or** ``cloudwatch_events``. Both will be handled properly by StreamAlert.
* Deploy athena_partition_refresh lambda function

Repeat this process for all relevant data tables in your deployment.
.. code-block:: bash
python manage.py deploy --function athena
Deploying
=========
Once the options above are set, deploy the infrastructure with the following commands:
* Search alerts in `Athena Console <https://console.aws.amazon.com/athena>`_

.. code-block:: bash
* Choose your ``Database`` from the dropdown on the left. Database name is ``PREFIX_streamalert``
* Write SQL query statement in the ``Query Editor`` on the right

python manage.py build
python manage.py deploy --function classifier
.. image:: ../images/athena-alerts-search.png

***********
Data Search
***********

.. _athena-architecture:
It is optional to store data in S3 bucket and available for search in Athena tables.

*******************
Athena Architecture
*******************
The Athena Partition Refresh function exists to periodically refresh Athena tables, enabling the searchability of alerts and log data.
* Enable Firehose in ``conf/global.json`` see :ref:`Firehose_configuration`
* Build the Firehose and Athena tables

The default refresh interval is 10 minutes but can be configured by the user.
.. code-block:: bash
python manage.py build
Concepts
========
The Athena Partition Refresh function utilizes:
* Deploy classifier so classifier will know to send data to S3 bucket via Firehose

* `Amazon S3 Event Notifications <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>`_
* `Amazon SQS <https://aws.amazon.com/sqs/details/>`_
* `AWS Lambda Invocations by Schedule <http://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html>`_
* `Amazon Athena Repair Table <https://docs.aws.amazon.com/athena/latest/ug/msck-repair-table.html>`_
.. code-block:: bash
python manage.py deploy --function classifier
Diagram
-------
.. figure:: ../images/athena-refresh-arch.png
:alt: StreamAlert Athena Refresh Partition Diagram
:align: center
:target: _images/athena-refresh-arch.png
* Search data `Athena Console <https://console.aws.amazon.com/athena>`_

* Choose your ``Database`` from the dropdown on the left. Database name is ``PREFIX_streamalert``
* Write SQL query statement in the ``Query Editor`` on the right

Internals
---------
Each time the Athena Partition Refresh Lambda function is invoked, it does the following:
.. image:: ../images/athena-data-search.png

* Polls the SQS queue for the latest S3 event notifications (up to 100)
* S3 event notifications contain context around any new object written to a data bucket (as configured below)
* A set of unique S3 Bucket IDs is deduplicated from the notifications
* Queries Athena to verify the ``streamalert`` database exists
* Refreshes the Athena tables for data in the relevant S3 buckets, as specified below in the list of ``buckets``
* Deletes messages off the queue once partitions are created

.. _configure_athena_partition_refresh_lambda:

*************************
Configure Lambda Settings
=========================
*************************

Open ``conf/lambda.json``, and fill in the following options:

=================================== ======== ==================== ===========
Expand All @@ -110,8 +102,8 @@ Key Required Default Descriptio
``log_level`` No ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries.
``memory`` No ``128`` The amount of memory (in MB) allocated to the Lambda function
``timeout`` No ``60`` The maximum duration of the Lambda function (in seconds)
``schedule_expression`` No ``rate(10 minutes)`` The rate of which the Athena Partition Refresh Lambda function is invoked in the form of a `CloudWatch schedule expression <http://amzn.to/2u5t0hS>`_.
``buckets`` Yes ``{}`` Key value pairs of S3 buckets and associated Athena table names. By default, the alerts bucket will exist in each deployment.
``file_format`` Yes ``null`` The alerts and data format stored in S3 bucket via Firehose, can be either ``parquet`` (preferred) or ``json``
``buckets`` No ``{}`` Key value pairs of S3 buckets and associated Athena table names. By default, the alerts bucket will exist in each deployment.
=================================== ======== ==================== ===========

**Example:**
Expand All @@ -125,95 +117,26 @@ Key Required Default Descriptio
"buckets": {
"alternative_bucket": "data"
},
"...": "...",
"file_format": "parquet",
"timeout": 60
}
}
Deployment
==========
If any of the settings above are changed from the initialized defaults, the Lambda function will need to be deployed in order for them to take effect:

.. code-block:: bash
python manage.py deploy --function athena
Going forward, if the deploy flag ``--function all`` is used, it will redeploy this function along with the ``rule`` function and ``alert`` function.


Monitoring
----------
To ensure the function is operating as expected, monitor the following SQS metrics for ``<prefix>_streamalert_athena_s3_notifications``:

* ``NumberOfMessagesReceived``
* ``NumberOfMessagesSent``
* ``NumberOfMessagesDeleted``

All three of these metrics should have very close values.

If the ``NumberOfMessagesSent`` is much higher than the other two metrics, the ``schedule_expression`` should be increased in the configuration.

For high throughput production environments, an interval of 1 to 2 minutes is recommended.


.. _athena_user_guide:

*****************
Athena User Guide
Athena References
*****************

Concepts
========
* `SQL <https://www.w3schools.com/sql/sql_intro.asp>`_
* `Athena Partitions <http://docs.aws.amazon.com/athena/latest/ug/partitions.html>`_


Querying Data
=============
All alerts generated by StreamAlert will be sent to an ``alerts`` S3 bucket via Firehose. These will then be searchable within Athena.

To get started with querying of this data, navigate to the AWS Console, click Services, and type 'Athena'.

When the service loads, switch the ``DATABASE`` option in the dropdown to ``streamalert``:

.. figure:: ../images/athena-usage-1.png
:alt: StreamAlert Athena Database Selection
:align: center
:target: _images/athena-usage-1.png

To view the schema of the ``alerts`` table, click the eye icon:

.. figure:: ../images/athena-usage-2.png
:alt: StreamAlert Athena Alerts Schema
:align: center
:target: _images/athena-usage-2.

To make a query, type a SQL statement in the Query Editor, and click Run Query:

.. figure:: ../images/athena-usage-3.png
:alt: StreamAlert Athena Run Query
:align: center
:target: _images/athena-usage-3.

The query shown above will show the most recent 10 alerts.


Tips
====
Data is partitioned in the following format ``YYYY-MM-DD-hh-mm``.

An example is ``2017-08-01-22-00``.

To increase query performance, filter data within a specific partition or range of partitions.
* `Introduction to SQL <https://www.w3schools.com/sql/sql_intro.asp>`_
* `Amazon Athena Getting Started <https://docs.aws.amazon.com/athena/latest/ug/getting-started.html>`_
* `Presto Documenation <https://prestodb.io/docs/0.172/index.html#>`_

With StreamAlert tables, the date partition is the ``dt`` column.
.. tip::

As an example, the query below counts all alerts during a given minute:
* Alerts and data are partitioned by ``dt`` in the format ``YYYY-MM-DD-hh``
* To improve query performance, filter data within a specific partition or range of partitions

.. figure:: ../images/athena-usage-4.png
:alt: StreamAlert Athena Run Query with Partition
:align: center
:target: _images/athena-usage-4.
.. code-block:: sql
For additional guidance on using SQL, visit the link under Concepts.
SELECT * FROM "PREFIX_streamalert"."alerts"
WHERE dt BETWEEN 2020-02-28-00 AND 2020-02-29-00
2 changes: 1 addition & 1 deletion docs/source/rules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The following table provides an overview of each rule option, with more details
:outputs:

The ``outputs`` keyword argument defines the alert destination if the return value of a rule is ``True``.
Alerts are always sent to an :ref:`Athena table <athena_user_guide>` which is easy to query.
Alerts are always sent to an :ref:`Athena alerts table <alerts_search>` which is easy to query.
Any number of additional `outputs <outputs.html>`_ can be specified.

:req_subkeys:
Expand Down
Loading

0 comments on commit d57a2ab

Please sign in to comment.