Skip to content

nielst/dbrunner-py3

Repository files navigation

dbrunner-py3

POC for finding differences over time in SQL results and raising those to segment.com.

Scenario is that our data warehouses enable us to combine data across all aspects of our business and generate powerful insights, such as find our low usage, high value, up-for-renewal customers. These reports are great, but to take action we need this data in other tools such as email automation. This POC enables us to run these queries, detect differences in the results and pass into Segment - which then pass into all relevant tools.

Configurations

Configuration for each job is stored in Dynamodb in the following format:

{
  "id": "1",
  "query": "SELECT userid as id, firstname, lastname, total_quantity\nFROM   (SELECT buyerid, sum(qtysold) total_quantity\nFROM  sales\nGROUP BY buyerid\nORDER BY total_quantity desc limit 10) Q, users\nWHERE Q.buyerid = userid\nORDER BY Q.total_quantity desc",
  "warehouse": {
    "dbname": "",
    "host": "",
    "password": "",
    "port": "",
    "user": ""
  },
  "workplace": {
    "dbname": "",
    "host": "",
    "password": "",
    "port": "",
    "user": ""
  }
}

The query will be executed against the warehouse and results stored in workplace. Use the id to run the job

The configurations are managed the following REST API:

GET /api/v1.0/configs

GET /api/v1.0/configs/{id}

POST /api/v1.0/configs

PUT /api/v1.0/configs/{id}

Live test: https://j7394qf30g.execute-api.us-west-2.amazonaws.com/prod (requires api key)

How to run

POST to the following endpoint to run a stored configuration: Configuration id 1 is in working state

POST: /run/{config_id}
Content-Type: application/json
Required payload: {}

The endpoint will synchronously do the following

  • execute the query against the warehouse

  • store the result in a new table in Postgressql

  • compare the contents to the previous snapshot and find updated records

  • if given a writekey, send an identify call to Segment for each updated record

      Payload: { "writekey": "123" }
    

Usually there will not be any changes, since the warehouse contains stale sample data. However you can easily force a single record to update:

Payload: { "force_change": "true" }

See the integration test for more details. You can also make your own updates to the redshift warehouse, or connect to your own. (uh-oh, schema issues ahead?)

Unit & integration tests

Query and compare logic implemented in executed by Postgre. These unit tests requires postgres installed.

#python3 -m "nose" -v

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages