diff --git a/CHANGES b/CHANGES index b4841ddb2b..2abf62766f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,5 @@ - + * Fix start_id type for XAUTOCLAIM + * Remove verbose logging from cluster.py * Add retry mechanism to async version of Connection * Compare commands case-insensitively in the asyncio command parser * Allow negative `retries` for `Retry` class to retry forever diff --git a/docs/examples.rst b/docs/examples.rst index 722fae2d03..3fed8b4195 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -12,3 +12,5 @@ Examples examples/set_and_get_examples examples/search_vector_similarity_examples examples/pipeline_examples + examples/timeseries_examples + examples/redis-stream-example.ipynb diff --git a/docs/examples/redis-stream-example.ipynb b/docs/examples/redis-stream-example.ipynb new file mode 100644 index 0000000000..9303b527ca --- /dev/null +++ b/docs/examples/redis-stream-example.ipynb @@ -0,0 +1,754 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Redis Stream Examples" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## basic config" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "redis_host = \"redis\"\n", + "stream_key = \"skey\"\n", + "stream2_key = \"s2key\"\n", + "group1 = \"grp1\"\n", + "group2 = \"grp2\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## connection" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import redis\n", + "from time import time\n", + "from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError\n", + "\n", + "r = redis.Redis( redis_host )\n", + "r.ping()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## xadd and xread" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### add some data to the stream" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n" + ] + } + ], + "source": [ + "for i in range(0,10):\n", + " r.xadd( stream_key, { 'ts': time(), 'v': i } )\n", + "print( f\"stream length: {r.xlen( stream_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### read some data from the stream" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]\n" + ] + } + ], + "source": [ + "## read 2 entries from stream_key\n", + "l = r.xread( count=2, streams={stream_key:0} )\n", + "print(l)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### extract data from the returned structure" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got data from stream: b'skey'\n", + "id: b'1657571033115-0' value: b'0'\n", + "id: b'1657571033117-0' value: b'1'\n" + ] + } + ], + "source": [ + "first_stream = l[0]\n", + "print( f\"got data from stream: {first_stream[0]}\")\n", + "fs_data = first_stream[1]\n", + "for id, value in fs_data:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### read more data from the stream\n", + "if we call the `xread` with the same arguments we will get the same data" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033115-0' value: b'0'\n", + "id: b'1657571033117-0' value: b'1'\n" + ] + } + ], + "source": [ + "l = r.xread( count=2, streams={stream_key:0} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "to get new data we need to change the key passed to the call" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033118-0' value: b'2'\n", + "id: b'1657571033119-0' value: b'3'\n" + ] + } + ], + "source": [ + "last_id_returned = l[0][1][-1][0]\n", + "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: b'1657571033119-1' value: b'4'\n", + "id: b'1657571033121-0' value: b'5'\n" + ] + } + ], + "source": [ + "last_id_returned = l[0][1][-1][0]\n", + "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n", + "for id, value in l[0][1]:\n", + " print( f\"id: {id} value: {value[b'v']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "to get only newer entries" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n", + "after 5s block, got an empty list [], no *new* messages on the stream\n", + "stream length: 10\n" + ] + } + ], + "source": [ + "print( f\"stream length: {r.xlen( stream_key )}\")\n", + "# wait for 5s for new messages\n", + "l = r.xread( count=1, block=5000, streams={stream_key: '$'} )\n", + "print( f\"after 5s block, got an empty list {l}, no *new* messages on the stream\")\n", + "print( f\"stream length: {r.xlen( stream_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2nd stream\n", + "Add some messages to a 2nd stream" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream length: 10\n" + ] + } + ], + "source": [ + "for i in range(1000,1010):\n", + " r.xadd( stream2_key, { 'v': i } )\n", + "print( f\"stream length: {r.xlen( stream2_key )}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "get messages from the 2 streams" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]\n", + "got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]\n" + ] + } + ], + "source": [ + "l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )\n", + "for k,d in l:\n", + " print(f\"got from {k} the entry {d}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# stream groups\n", + "With the groups is possible track, for many consumers, and at the Redis side, which message have been already consumed.\n", + "## add some data to streams\n", + "Creating 2 streams with 10 messages each." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "stream 'skey' length: 20\n", + "stream 's2key' length: 20\n" + ] + } + ], + "source": [ + "def add_some_data_to_stream( sname, key_range ):\n", + " for i in key_range:\n", + " r.xadd( sname, { 'ts': time(), 'v': i } )\n", + " print( f\"stream '{sname}' length: {r.xlen( stream_key )}\")\n", + "\n", + "add_some_data_to_stream( stream_key, range(0,10) )\n", + "add_some_data_to_stream( stream2_key, range(1000,1010) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## use a group to read from the stream\n", + "* create a group `grp1` with the stream `skey`, and\n", + "* create a group `grp2` with the streams `skey` and `s2key`\n", + "\n", + "Use the `xinfo_group` to verify the result of the group creation." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id\n", + "skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n", + "s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n" + ] + } + ], + "source": [ + "## create the group\n", + "def create_group( skey, gname ):\n", + " try:\n", + " r.xgroup_create( name=skey, groupname=gname, id=0 )\n", + " except ResponseError as e:\n", + " print(f\"raised: {e}\")\n", + "\n", + "# group1 read the stream 'skey'\n", + "create_group( stream_key, group1 )\n", + "# group2 read the streams 'skey' and 's2key'\n", + "create_group( stream_key, group2 )\n", + "create_group( stream2_key, group2 )\n", + "\n", + "def group_info( skey ):\n", + " res = r.xinfo_groups( name=skey )\n", + " for i in res:\n", + " print( f\"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}\"\n", + " + f\" as last read id\")\n", + " \n", + "group_info( stream_key )\n", + "group_info( stream2_key )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## group read\n", + "The `xreadgroup` method permit to read from a stream group." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "def print_xreadgroup_reply( reply, group = None, run = None):\n", + " for d_stream in reply:\n", + " for element in d_stream[1]:\n", + " print( f\"got element {element[0]}\"\n", + " + f\"from stream {d_stream[0]}\" )\n", + " if run is not None:\n", + " run( d_stream[0], group, element[0] )" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033115-0'from stream b'skey'\n", + "got element b'1657571033117-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# read some messages on group1 with consumer 'c' \n", + "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "A **2nd consumer** for the same stream group will get not delivered messages." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033118-0'from stream b'skey'\n", + "got element b'1657571033119-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# read some messages on group1 with consumer 'c' \n", + "d = r.xreadgroup( groupname=group1, consumername='c2', block=10,\n", + " count=2, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But a **2nd stream group** can read the already delivered messages again.\n", + "\n", + "Note that the 2nd stream group include also the 2nd stream.\n", + "That can be identified in the reply (1st element of the reply list)." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033115-0'from stream b'skey'\n", + "got element b'1657571033117-0'from stream b'skey'\n", + "got element b'1657571042111-0'from stream b's2key'\n", + "got element b'1657571042113-0'from stream b's2key'\n" + ] + } + ], + "source": [ + "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>',stream2_key:'>'})\n", + "print_xreadgroup_reply( d2 )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To check for pending messages (delivered messages without acknowledgment) we can use the `xpending`." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "4 pending messages on 'skey' for group 'grp1'\n", + "2 pending messages on 'skey' for group 'grp2'\n", + "2 pending messages on 's2key' for group 'grp2'\n" + ] + } + ], + "source": [ + "# check pending status (read messages without a ack)\n", + "def print_pending_info( key_group ):\n", + " for s,k in key_group:\n", + " pr = r.xpending( name=s, groupname=k )\n", + " print( f\"{pr.get('pending')} pending messages on '{s}' for group '{k}'\" )\n", + " \n", + "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ack\n", + "Acknowledge some messages with `xack`." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033118-0'from stream b'skey'\n", + "got element b'1657571033119-0'from stream b'skey'\n" + ] + } + ], + "source": [ + "# do acknowledges for group1\n", + "toack = lambda k,g,e: r.xack( k,g, e )\n", + "print_xreadgroup_reply( d, group=group1, run=toack )" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2 pending messages on 'skey' for group 'grp1'\n", + "2 pending messages on 'skey' for group 'grp2'\n", + "2 pending messages on 's2key' for group 'grp2'\n" + ] + } + ], + "source": [ + "# check pending again\n", + "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "ack all messages on the `group1`." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571033119-1'from stream b'skey'\n", + "got element b'1657571033121-0'from stream b'skey'\n", + "got element b'1657571033121-1'from stream b'skey'\n", + "got element b'1657571033121-2'from stream b'skey'\n", + "got element b'1657571033122-0'from stream b'skey'\n", + "got element b'1657571033122-1'from stream b'skey'\n", + "got element b'1657571049557-0'from stream b'skey'\n", + "got element b'1657571049557-1'from stream b'skey'\n", + "got element b'1657571049558-0'from stream b'skey'\n", + "got element b'1657571049559-0'from stream b'skey'\n", + "got element b'1657571049559-1'from stream b'skey'\n", + "got element b'1657571049559-2'from stream b'skey'\n", + "got element b'1657571049560-0'from stream b'skey'\n", + "got element b'1657571049562-0'from stream b'skey'\n", + "got element b'1657571049563-0'from stream b'skey'\n", + "got element b'1657571049563-1'from stream b'skey'\n", + "2 pending messages on 'skey' for group 'grp1'\n" + ] + } + ], + "source": [ + "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n", + " count=100, streams={stream_key:'>'})\n", + "print_xreadgroup_reply( d, group=group1, run=toack)\n", + "print_pending_info( ((stream_key,group1),) )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But stream length will be the same after the `xack` of all messages on the `group1`." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "20" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "r.xlen(stream_key)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## delete all\n", + "To remove the messages with need to remote them explicitly with `xdel`." + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [], + "source": [ + "s1 = r.xread( streams={stream_key:0} )\n", + "for streams in s1:\n", + " stream_name, messages = streams\n", + " # del all ids from the message list\n", + " [ r.xdel( stream_name, i[0] ) for i in messages ]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "stream length" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "r.xlen(stream_key)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "But with the `xdel` the 2nd group can read any not processed message from the `skey`." + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "got element b'1657571042113-1'from stream b's2key'\n", + "got element b'1657571042114-0'from stream b's2key'\n" + ] + } + ], + "source": [ + "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n", + " count=2, streams={stream_key:'>',stream2_key:'>'})\n", + "print_xreadgroup_reply( d2 )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/examples/timeseries_examples.ipynb b/docs/examples/timeseries_examples.ipynb new file mode 100644 index 0000000000..fefc0c8f37 --- /dev/null +++ b/docs/examples/timeseries_examples.ipynb @@ -0,0 +1,631 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Timeseries\n", + "\n", + "`redis-py` supports [RedisTimeSeries](https://github.com/RedisTimeSeries/RedisTimeSeries/) which is a time-series-database module for Redis.\n", + "\n", + "This example shows how to handle timeseries data with `redis-py`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Health check" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import redis \n", + "\n", + "r = redis.Redis(decode_responses=True)\n", + "ts = r.ts()\n", + "\n", + "r.ping()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Simple example" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a timeseries" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.create(\"ts_key\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add samples to the timeseries\n", + "\n", + "We can either set the timestamp with an UNIX timestamp in milliseconds or use * to set the timestamp based en server's clock." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1657272304448" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.add(\"ts_key\", 1657265437756, 1)\n", + "ts.add(\"ts_key\", \"1657265437757\", 2)\n", + "ts.add(\"ts_key\", \"*\", 3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get the last sample" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(1657272304448, 3.0)" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.get(\"ts_key\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get samples between two timestamps\n", + "\n", + "The minimum and maximum possible timestamps can be expressed with respectfully - and +." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(1657265437756, 1.0), (1657265437757, 2.0), (1657272304448, 3.0)]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.range(\"ts_key\", \"-\", \"+\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(1657265437756, 1.0), (1657265437757, 2.0)]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.range(\"ts_key\", 1657265437756, 1657265437757)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Delete samples between two timestamps" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Before deletion: [(1657265437756, 1.0), (1657265437757, 2.0), (1657272304448, 3.0)]\n", + "After deletion: [(1657272304448, 3.0)]\n" + ] + } + ], + "source": [ + "print(\"Before deletion: \", ts.range(\"ts_key\", \"-\", \"+\"))\n", + "ts.delete(\"ts_key\", 1657265437756, 1657265437757)\n", + "print(\"After deletion: \", ts.range(\"ts_key\", \"-\", \"+\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Multiple timeseries with labels" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.create(\"ts_key1\")\n", + "ts.create(\"ts_key2\", labels={\"label1\": 1, \"label2\": 2})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add samples to multiple timeseries" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[1657272306147, 1657272306147]" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.madd([(\"ts_key1\", \"*\", 1), (\"ts_key2\", \"*\", 2)])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add samples with labels" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1657272306457" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.add(\"ts_key2\", \"*\", 2, labels={\"label1\": 1, \"label2\": 2})\n", + "ts.add(\"ts_key2\", \"*\", 2, labels={\"label1\": 3, \"label2\": 4})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get the last sample matching specific label\n", + "\n", + "Get the last sample that matches \"label1=1\", see [Redis documentation](https://redis.io/commands/ts.mget/) to see the posible filter values." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'ts_key2': [{}, 1657272306457, 2.0]}]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.mget([\"label1=1\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Get also the label-value pairs of the sample:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'ts_key2': [{'label1': '1', 'label2': '2'}, 1657272306457, 2.0]}]" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.mget([\"label1=1\"], with_labels=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Retention period\n", + "\n", + "You can specify a retention period when creating timeseries objects or when adding a sample timeseries object. Once the retention period has elapsed, the sample is removed from the timeseries." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "retention_time = 1000\n", + "ts.create(\"ts_key_ret\", retention_msecs=retention_time)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Base timeseries: [(1657272307670, 1.0)]\n", + "Timeseries after 1000 milliseconds: [(1657272307670, 1.0)]\n" + ] + } + ], + "source": [ + "import time\n", + "# this will be deleted in 1000 milliseconds\n", + "ts.add(\"ts_key_ret\", \"*\", 1, retention_msecs=retention_time)\n", + "print(\"Base timeseries: \", ts.range(\"ts_key_ret\", \"-\", \"+\"))\n", + "# sleeping for 1000 milliseconds (1 second)\n", + "time.sleep(1)\n", + "print(\"Timeseries after 1000 milliseconds: \", ts.range(\"ts_key_ret\", \"-\", \"+\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The two lists are the same, this is because the oldest values are deleted when a new sample is added." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1657272308849" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.add(\"ts_key_ret\", \"*\", 10)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(1657272308849, 10.0)]" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.range(\"ts_key_ret\", \"-\", \"+\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here the first sample has been deleted." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Specify duplicate policies\n", + "\n", + "By default, the policy for duplicates timestamp keys is set to \"BLOCK\", we cannot create two samples with the same timestamp:" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TSDB: Error at upsert, update is not supported when DUPLICATE_POLICY is set to BLOCK mode\n" + ] + } + ], + "source": [ + "ts.add(\"ts_key\", 123456789, 1)\n", + "try:\n", + " ts.add(\"ts_key\", 123456789, 2)\n", + "except Exception as err:\n", + " print(err)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can change this default behaviour using `duplicate_policy` parameter, for instance:" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(123456789, 2.0), (1657272304448, 3.0)]" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# using policy \"LAST\", we keep the last added sample\n", + "ts.add(\"ts_key\", 123456789, 2, duplicate_policy=\"LAST\")\n", + "ts.range(\"ts_key\", \"-\", \"+\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For more informations about duplicate policies, see [Redis documentation](https://redis.io/commands/ts.add/)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using Redis TSDB to keep track of a value" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1657272310241" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.add(\"ts_key_incr\", \"*\", 0)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Increment the value:" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "for _ in range(10):\n", + " ts.incrby(\"ts_key_incr\", 1)\n", + " # sleeping a bit so the timestamp are not duplicates\n", + " time.sleep(0.01)" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(1657272310241, 0.0),\n", + " (1657272310533, 1.0),\n", + " (1657272310545, 2.0),\n", + " (1657272310556, 3.0),\n", + " (1657272310567, 4.0),\n", + " (1657272310578, 5.0),\n", + " (1657272310589, 6.0),\n", + " (1657272310600, 7.0),\n", + " (1657272310611, 8.0),\n", + " (1657272310622, 9.0),\n", + " (1657272310632, 10.0)]" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts.range(\"ts_key_incr\", \"-\", \"+\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.9.2 64-bit", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.2" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/redis/cluster.py b/redis/cluster.py index 8293dbb3cd..b05cf307db 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,5 +1,4 @@ import copy -import logging import random import socket import sys @@ -15,7 +14,6 @@ from redis.exceptions import ( AskError, AuthenticationError, - BusyLoadingError, ClusterCrossSlotError, ClusterDownError, ClusterError, @@ -39,8 +37,6 @@ str_if_bytes, ) -log = logging.getLogger(__name__) - def get_node_name(host: str, port: Union[str, int]) -> str: return f"{host}:{port}" @@ -535,7 +531,6 @@ def __init__( " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," " ClusterNode('localhost', 6378)])" ) - log.debug(f"startup_nodes : {startup_nodes}") # Update the connection arguments # Whenever a new connection is established, RedisCluster's on_connect # method should be run @@ -666,13 +661,8 @@ def set_default_node(self, node): :return True if the default node was set, else False """ if node is None or self.get_node(node_name=node.name) is None: - log.info( - "The requested node does not exist in the cluster, so " - "the default node was not changed." - ) return False self.nodes_manager.default_node = node - log.info(f"Changed the default cluster node to {node}") return True def monitor(self, target_node=None): @@ -816,8 +806,6 @@ def _determine_nodes(self, *args, **kwargs): else: # get the nodes group for this command if it was predefined command_flag = self.command_flags.get(command) - if command_flag: - log.debug(f"Target node/s for {command}: {command_flag}") if command_flag == self.__class__.RANDOM: # return a random node return [self.get_random_node()] @@ -841,7 +829,6 @@ def _determine_nodes(self, *args, **kwargs): node = self.nodes_manager.get_node_from_slot( slot, self.read_from_replicas and command in READ_COMMANDS ) - log.debug(f"Target for {args}: slot {slot}") return [node] def _should_reinitialized(self): @@ -1019,7 +1006,7 @@ def execute_command(self, *args, **kwargs): res[node.name] = self._execute_command(node, *args, **kwargs) # Return the processed result return self._process_result(args[0], res, **kwargs) - except BaseException as e: + except Exception as e: if type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. @@ -1059,10 +1046,6 @@ def _execute_command(self, target_node, *args, **kwargs): ) moved = False - log.debug( - f"Executing command {command} on target node: " - f"{target_node.server_type} {target_node.name}" - ) redis_node = self.get_redis_connection(target_node) connection = get_connection(redis_node, *args, **kwargs) if asking: @@ -1077,12 +1060,9 @@ def _execute_command(self, target_node, *args, **kwargs): response, **kwargs ) return response - - except (RedisClusterException, BusyLoadingError, AuthenticationError) as e: - log.exception(type(e)) + except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: - log.exception(type(e)) # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. @@ -1101,7 +1081,7 @@ def _execute_command(self, target_node, *args, **kwargs): # and try again with the new setup target_node.redis_connection = None self.nodes_manager.initialize() - raise + raise e except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1111,7 +1091,6 @@ def _execute_command(self, target_node, *args, **kwargs): # the same client object is shared between multiple threads. To # reduce the frequency you can set this variable in the # RedisCluster constructor. - log.exception("MovedError") self.reinitialize_counter += 1 if self._should_reinitialized(): self.nodes_manager.initialize() @@ -1121,29 +1100,21 @@ def _execute_command(self, target_node, *args, **kwargs): self.nodes_manager.update_moved_exception(e) moved = True except TryAgainError: - log.exception("TryAgainError") - if ttl < self.RedisClusterRequestTTL / 2: time.sleep(0.05) except AskError as e: - log.exception("AskError") - redirect_addr = get_node_name(host=e.host, port=e.port) asking = True except ClusterDownError as e: - log.exception("ClusterDownError") # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout # and retry executing the command time.sleep(0.25) self.nodes_manager.initialize() raise e - except ResponseError as e: - message = e.__str__() - log.exception(f"ResponseError: {message}") - raise e - except BaseException as e: - log.exception("BaseException") + except ResponseError: + raise + except Exception as e: if connection: connection.disconnect() raise e @@ -1280,11 +1251,6 @@ def get_node(self, host=None, port=None, node_name=None): elif node_name: return self.nodes_cache.get(node_name) else: - log.error( - "get_node requires one of the following: " - "1. node name " - "2. host and port" - ) return None def update_moved_exception(self, exception): @@ -1432,7 +1398,6 @@ def initialize(self): :startup_nodes: Responsible for discovering other nodes in the cluster """ - log.debug("Initializing the nodes' topology of the cluster") self.reset() tmp_nodes_cache = {} tmp_slots = {} @@ -1460,17 +1425,9 @@ def initialize(self): ) cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True - except (ConnectionError, TimeoutError) as e: - msg = e.__str__ - log.exception( - "An exception occurred while trying to" - " initialize the cluster using the seed node" - f" {startup_node.name}:\n{msg}" - ) + except (ConnectionError, TimeoutError): continue except ResponseError as e: - log.exception('ReseponseError sending "cluster slots" to redis server') - # Isn't a cluster connection, so it won't parse these # exceptions automatically message = e.__str__() @@ -2042,12 +1999,6 @@ def _send_cluster_commands( # If a lot of commands have failed, we'll be setting the # flag to rebuild the slots table from scratch. # So MOVED errors should correct themselves fairly quickly. - log.exception( - f"An exception occurred during pipeline execution. " - f"args: {attempt[-1].args}, " - f"error: {type(attempt[-1].result).__name__} " - f"{str(attempt[-1].result)}" - ) self.reinitialize_counter += 1 if self._should_reinitialized(): self.nodes_manager.initialize() diff --git a/redis/commands/core.py b/redis/commands/core.py index 027d3dbc7c..455c3f46cb 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3420,7 +3420,7 @@ def xautoclaim( groupname: GroupT, consumername: ConsumerT, min_idle_time: int, - start_id: int = 0, + start_id: StreamIdT = "0-0", count: Union[int, None] = None, justid: bool = False, ) -> ResponseT: diff --git a/redis/commands/graph/query_result.py b/redis/commands/graph/query_result.py index 644ac5a3db..3ffa664791 100644 --- a/redis/commands/graph/query_result.py +++ b/redis/commands/graph/query_result.py @@ -9,10 +9,12 @@ from .path import Path LABELS_ADDED = "Labels added" +LABELS_REMOVED = "Labels removed" NODES_CREATED = "Nodes created" NODES_DELETED = "Nodes deleted" RELATIONSHIPS_DELETED = "Relationships deleted" PROPERTIES_SET = "Properties set" +PROPERTIES_REMOVED = "Properties removed" RELATIONSHIPS_CREATED = "Relationships created" INDICES_CREATED = "Indices created" INDICES_DELETED = "Indices deleted" @@ -21,8 +23,10 @@ STATS = [ LABELS_ADDED, + LABELS_REMOVED, NODES_CREATED, PROPERTIES_SET, + PROPERTIES_REMOVED, RELATIONSHIPS_CREATED, NODES_DELETED, RELATIONSHIPS_DELETED, @@ -323,40 +327,60 @@ def _get_stat(self, stat): @property def labels_added(self): + """Returns the number of labels added in the query""" return self._get_stat(LABELS_ADDED) + @property + def labels_removed(self): + """Returns the number of labels removed in the query""" + return self._get_stat(LABELS_REMOVED) + @property def nodes_created(self): + """Returns the number of nodes created in the query""" return self._get_stat(NODES_CREATED) @property def nodes_deleted(self): + """Returns the number of nodes deleted in the query""" return self._get_stat(NODES_DELETED) @property def properties_set(self): + """Returns the number of properties set in the query""" return self._get_stat(PROPERTIES_SET) + @property + def properties_removed(self): + """Returns the number of properties removed in the query""" + return self._get_stat(PROPERTIES_REMOVED) + @property def relationships_created(self): + """Returns the number of relationships created in the query""" return self._get_stat(RELATIONSHIPS_CREATED) @property def relationships_deleted(self): + """Returns the number of relationships deleted in the query""" return self._get_stat(RELATIONSHIPS_DELETED) @property def indices_created(self): + """Returns the number of indices created in the query""" return self._get_stat(INDICES_CREATED) @property def indices_deleted(self): + """Returns the number of indices deleted in the query""" return self._get_stat(INDICES_DELETED) @property def cached_execution(self): + """Returns whether or not the query execution plan was cached""" return self._get_stat(CACHED_EXECUTION) == 1 @property def run_time_ms(self): + """Returns the server execution time of the query""" return self._get_stat(INTERNAL_EXECUTION_TIME) diff --git a/tests/test_asyncio/conftest.py b/tests/test_asyncio/conftest.py index 8166588d8e..04fbf62cf7 100644 --- a/tests/test_asyncio/conftest.py +++ b/tests/test_asyncio/conftest.py @@ -1,14 +1,9 @@ -import asyncio +import functools import random import sys from typing import Union from urllib.parse import urlparse -if sys.version_info[0:2] == (3, 6): - import pytest as pytest_asyncio -else: - import pytest_asyncio - import pytest from packaging.version import Version @@ -26,6 +21,13 @@ from .compat import mock +if sys.version_info[0:2] == (3, 6): + import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio +else: + import pytest_asyncio + async def _get_info(redis_url): client = redis.Redis.from_url(redis_url) @@ -69,11 +71,13 @@ async def _get_info(redis_url): "pool-hiredis", ], ) -def create_redis(request, event_loop: asyncio.BaseEventLoop): +async def create_redis(request): """Wrapper around redis.create_redis.""" single_connection, parser_cls = request.param - async def f( + teardown_clients = [] + + async def client_factory( url: str = request.config.getoption("--redis-url"), cls=redis.Redis, flushdb=True, @@ -95,56 +99,50 @@ async def f( client = client.client() await client.initialize() - def teardown(): - async def ateardown(): - if not cluster_mode: - if "username" in kwargs: - return - if flushdb: - try: - await client.flushdb() - except redis.ConnectionError: - # handle cases where a test disconnected a client - # just manually retry the flushdb - await client.flushdb() - await client.close() - await client.connection_pool.disconnect() - else: - if flushdb: - try: - await client.flushdb(target_nodes="primaries") - except redis.ConnectionError: - # handle cases where a test disconnected a client - # just manually retry the flushdb - await client.flushdb(target_nodes="primaries") - await client.close() - - if event_loop.is_running(): - event_loop.create_task(ateardown()) + async def teardown(): + if not cluster_mode: + if flushdb and "username" not in kwargs: + try: + await client.flushdb() + except redis.ConnectionError: + # handle cases where a test disconnected a client + # just manually retry the flushdb + await client.flushdb() + await client.close() + await client.connection_pool.disconnect() else: - event_loop.run_until_complete(ateardown()) - - request.addfinalizer(teardown) - + if flushdb: + try: + await client.flushdb(target_nodes="primaries") + except redis.ConnectionError: + # handle cases where a test disconnected a client + # just manually retry the flushdb + await client.flushdb(target_nodes="primaries") + await client.close() + + teardown_clients.append(teardown) return client - return f + yield client_factory + + for teardown in teardown_clients: + await teardown() @pytest_asyncio.fixture() -async def r(request, create_redis): - yield await create_redis() +async def r(create_redis): + return await create_redis() @pytest_asyncio.fixture() async def r2(create_redis): """A second client for tests that need multiple""" - yield await create_redis() + return await create_redis() @pytest_asyncio.fixture() async def modclient(request, create_redis): - yield await create_redis( + return await create_redis( url=request.config.getoption("--redismod-url"), decode_responses=True ) @@ -222,7 +220,7 @@ async def mock_cluster_resp_slaves(create_redis, **kwargs): def master_host(request): url = request.config.getoption("--redis-url") parts = urlparse(url) - yield parts.hostname + return parts.hostname async def wait_for_command( @@ -246,3 +244,41 @@ async def wait_for_command( return monitor_response if key in monitor_response["command"]: return None + + +# python 3.6 doesn't have the asynccontextmanager decorator. Provide it here. +class AsyncContextManager: + def __init__(self, async_generator): + self.gen = async_generator + + async def __aenter__(self): + try: + return await self.gen.__anext__() + except StopAsyncIteration as err: + raise RuntimeError("Pickles") from err + + async def __aexit__(self, exc_type, exc_inst, tb): + if exc_type: + await self.gen.athrow(exc_type, exc_inst, tb) + return True + try: + await self.gen.__anext__() + except StopAsyncIteration: + return + raise RuntimeError("More pickles") + + +if sys.version_info[0:2] == (3, 6): + + def asynccontextmanager(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return AsyncContextManager(func(*args, **kwargs)) + + return wrapper + +else: + from contextlib import asynccontextmanager as _asynccontextmanager + + def asynccontextmanager(func): + return _asynccontextmanager(func) diff --git a/tests/test_asyncio/test_bloom.py b/tests/test_asyncio/test_bloom.py index feb98cc41e..2bf4e030e6 100644 --- a/tests/test_asyncio/test_bloom.py +++ b/tests/test_asyncio/test_bloom.py @@ -1,10 +1,13 @@ +import sys + import pytest import redis.asyncio as redis from redis.exceptions import ModuleError, RedisError from redis.utils import HIREDIS_AVAILABLE -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio def intlist(obj): @@ -91,7 +94,7 @@ async def do_verify(): res += rv == x assert res < 5 - do_verify() + await do_verify() cmds = [] if HIREDIS_AVAILABLE: with pytest.raises(ModuleError): @@ -120,7 +123,7 @@ async def do_verify(): cur_info = await modclient.bf().execute_command("bf.debug", "myBloom") assert prev_info == cur_info - do_verify() + await do_verify() await modclient.bf().client.delete("myBloom") await modclient.bf().create("myBloom", "0.0001", "10000000") diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index d33e2e83b7..1365e4daff 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -13,6 +13,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio @@ -42,7 +44,8 @@ skip_unless_arch_bits, ) -pytestmark = [pytest.mark.asyncio, pytest.mark.onlycluster] +pytestmark = pytest.mark.onlycluster + default_host = "127.0.0.1" default_port = 7000 diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index e128ac40b8..913f05b3fe 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -12,6 +12,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio @@ -27,11 +29,24 @@ REDIS_6_VERSION = "5.9.0" -pytestmark = pytest.mark.asyncio +@pytest_asyncio.fixture() +async def r_teardown(r: redis.Redis): + """ + A special fixture which removes the provided names from the database after use + """ + usernames = [] + + def factory(username): + usernames.append(username) + return r + + yield factory + for username in usernames: + await r.acl_deluser(username) @pytest_asyncio.fixture() -async def slowlog(r: redis.Redis, event_loop): +async def slowlog(r: redis.Redis): current_config = await r.config_get() old_slower_than_value = current_config["slowlog-log-slower-than"] old_max_legnth_value = current_config["slowlog-max-len"] @@ -94,17 +109,9 @@ async def test_acl_cat_with_category(self, r: redis.Redis): assert "get" in commands @skip_if_server_version_lt(REDIS_6_VERSION) - async def test_acl_deluser(self, r: redis.Redis, request, event_loop): + async def test_acl_deluser(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) assert await r.acl_deluser(username) == 0 assert await r.acl_setuser(username, enabled=False, reset=True) @@ -117,18 +124,9 @@ async def test_acl_genpass(self, r: redis.Redis): @skip_if_server_version_lt(REDIS_6_VERSION) @skip_if_server_version_gte("7.0.0") - async def test_acl_getuser_setuser(self, r: redis.Redis, request, event_loop): + async def test_acl_getuser_setuser(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) - + r = r_teardown(username) # test enabled=False assert await r.acl_setuser(username, enabled=False, reset=True) assert await r.acl_getuser(username) == { @@ -233,17 +231,9 @@ def teardown(): @skip_if_server_version_lt(REDIS_6_VERSION) @skip_if_server_version_gte("7.0.0") - async def test_acl_list(self, r: redis.Redis, request, event_loop): + async def test_acl_list(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) assert await r.acl_setuser(username, enabled=False, reset=True) users = await r.acl_list() @@ -251,17 +241,9 @@ def teardown(): @skip_if_server_version_lt(REDIS_6_VERSION) @pytest.mark.onlynoncluster - async def test_acl_log(self, r: redis.Redis, request, event_loop, create_redis): + async def test_acl_log(self, r_teardown, create_redis): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) await r.acl_setuser( username, enabled=True, @@ -294,55 +276,25 @@ def teardown(): assert await r.acl_log_reset() @skip_if_server_version_lt(REDIS_6_VERSION) - async def test_acl_setuser_categories_without_prefix_fails( - self, r: redis.Redis, request, event_loop - ): + async def test_acl_setuser_categories_without_prefix_fails(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) with pytest.raises(exceptions.DataError): await r.acl_setuser(username, categories=["list"]) @skip_if_server_version_lt(REDIS_6_VERSION) - async def test_acl_setuser_commands_without_prefix_fails( - self, r: redis.Redis, request, event_loop - ): + async def test_acl_setuser_commands_without_prefix_fails(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) with pytest.raises(exceptions.DataError): await r.acl_setuser(username, commands=["get"]) @skip_if_server_version_lt(REDIS_6_VERSION) - async def test_acl_setuser_add_passwords_and_nopass_fails( - self, r: redis.Redis, request, event_loop - ): + async def test_acl_setuser_add_passwords_and_nopass_fails(self, r_teardown): username = "redis-py-user" - - def teardown(): - coro = r.acl_deluser(username) - if event_loop.is_running(): - event_loop.create_task(coro) - else: - event_loop.run_until_complete(coro) - - request.addfinalizer(teardown) + r = r_teardown(username) with pytest.raises(exceptions.DataError): await r.acl_setuser(username, passwords="+mypass", nopass=True) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 78a3efd2a0..8030f7e628 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,5 +1,6 @@ import asyncio import socket +import sys import types from unittest.mock import patch @@ -18,7 +19,8 @@ from .compat import mock -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio @pytest.mark.onlynoncluster diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index 6c56558d59..c8eb918e28 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -7,6 +7,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio @@ -15,10 +17,9 @@ from tests.conftest import skip_if_redis_enterprise, skip_if_server_version_lt from .compat import mock +from .conftest import asynccontextmanager from .test_pubsub import wait_for_message -pytestmark = pytest.mark.asyncio - @pytest.mark.onlynoncluster class TestRedisAutoReleaseConnectionPool: @@ -114,7 +115,8 @@ async def can_read(self, timeout: float = 0): class TestConnectionPool: - def get_pool( + @asynccontextmanager + async def get_pool( self, connection_kwargs=None, max_connections=None, @@ -126,71 +128,77 @@ def get_pool( max_connections=max_connections, **connection_kwargs, ) - return pool + try: + yield pool + finally: + await pool.disconnect(inuse_connections=True) async def test_connection_creation(self): connection_kwargs = {"foo": "bar", "biz": "baz"} - pool = self.get_pool( + async with self.get_pool( connection_kwargs=connection_kwargs, connection_class=DummyConnection - ) - connection = await pool.get_connection("_") - assert isinstance(connection, DummyConnection) - assert connection.kwargs == connection_kwargs + ) as pool: + connection = await pool.get_connection("_") + assert isinstance(connection, DummyConnection) + assert connection.kwargs == connection_kwargs async def test_multiple_connections(self, master_host): connection_kwargs = {"host": master_host} - pool = self.get_pool(connection_kwargs=connection_kwargs) - c1 = await pool.get_connection("_") - c2 = await pool.get_connection("_") - assert c1 != c2 + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + c1 = await pool.get_connection("_") + c2 = await pool.get_connection("_") + assert c1 != c2 async def test_max_connections(self, master_host): connection_kwargs = {"host": master_host} - pool = self.get_pool(max_connections=2, connection_kwargs=connection_kwargs) - await pool.get_connection("_") - await pool.get_connection("_") - with pytest.raises(redis.ConnectionError): + async with self.get_pool( + max_connections=2, connection_kwargs=connection_kwargs + ) as pool: + await pool.get_connection("_") await pool.get_connection("_") + with pytest.raises(redis.ConnectionError): + await pool.get_connection("_") async def test_reuse_previously_released_connection(self, master_host): connection_kwargs = {"host": master_host} - pool = self.get_pool(connection_kwargs=connection_kwargs) - c1 = await pool.get_connection("_") - await pool.release(c1) - c2 = await pool.get_connection("_") - assert c1 == c2 + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + c1 = await pool.get_connection("_") + await pool.release(c1) + c2 = await pool.get_connection("_") + assert c1 == c2 - def test_repr_contains_db_info_tcp(self): + async def test_repr_contains_db_info_tcp(self): connection_kwargs = { "host": "localhost", "port": 6379, "db": 1, "client_name": "test-client", } - pool = self.get_pool( + async with self.get_pool( connection_kwargs=connection_kwargs, connection_class=redis.Connection - ) - expected = ( - "ConnectionPool>" - ) - assert repr(pool) == expected + ) as pool: + expected = ( + "ConnectionPool>" + ) + assert repr(pool) == expected - def test_repr_contains_db_info_unix(self): + async def test_repr_contains_db_info_unix(self): connection_kwargs = {"path": "/abc", "db": 1, "client_name": "test-client"} - pool = self.get_pool( + async with self.get_pool( connection_kwargs=connection_kwargs, connection_class=redis.UnixDomainSocketConnection, - ) - expected = ( - "ConnectionPool>" - ) - assert repr(pool) == expected + ) as pool: + expected = ( + "ConnectionPool>" + ) + assert repr(pool) == expected class TestBlockingConnectionPool: - def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): + @asynccontextmanager + async def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): connection_kwargs = connection_kwargs or {} pool = redis.BlockingConnectionPool( connection_class=DummyConnection, @@ -198,7 +206,10 @@ def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): timeout=timeout, **connection_kwargs, ) - return pool + try: + yield pool + finally: + await pool.disconnect(inuse_connections=True) async def test_connection_creation(self, master_host): connection_kwargs = { @@ -207,10 +218,10 @@ async def test_connection_creation(self, master_host): "host": master_host[0], "port": master_host[1], } - pool = self.get_pool(connection_kwargs=connection_kwargs) - connection = await pool.get_connection("_") - assert isinstance(connection, DummyConnection) - assert connection.kwargs == connection_kwargs + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + connection = await pool.get_connection("_") + assert isinstance(connection, DummyConnection) + assert connection.kwargs == connection_kwargs async def test_disconnect(self, master_host): """A regression test for #1047""" @@ -220,30 +231,31 @@ async def test_disconnect(self, master_host): "host": master_host[0], "port": master_host[1], } - pool = self.get_pool(connection_kwargs=connection_kwargs) - await pool.get_connection("_") - await pool.disconnect() + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + await pool.get_connection("_") + await pool.disconnect() async def test_multiple_connections(self, master_host): connection_kwargs = {"host": master_host[0], "port": master_host[1]} - pool = self.get_pool(connection_kwargs=connection_kwargs) - c1 = await pool.get_connection("_") - c2 = await pool.get_connection("_") - assert c1 != c2 + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + c1 = await pool.get_connection("_") + c2 = await pool.get_connection("_") + assert c1 != c2 async def test_connection_pool_blocks_until_timeout(self, master_host): """When out of connections, block for timeout seconds, then raise""" connection_kwargs = {"host": master_host} - pool = self.get_pool( + async with self.get_pool( max_connections=1, timeout=0.1, connection_kwargs=connection_kwargs - ) - await pool.get_connection("_") + ) as pool: + c1 = await pool.get_connection("_") - start = asyncio.get_event_loop().time() - with pytest.raises(redis.ConnectionError): - await pool.get_connection("_") - # we should have waited at least 0.1 seconds - assert asyncio.get_event_loop().time() - start >= 0.1 + start = asyncio.get_event_loop().time() + with pytest.raises(redis.ConnectionError): + await pool.get_connection("_") + # we should have waited at least 0.1 seconds + assert asyncio.get_event_loop().time() - start >= 0.1 + await c1.disconnect() async def test_connection_pool_blocks_until_conn_available(self, master_host): """ @@ -251,26 +263,26 @@ async def test_connection_pool_blocks_until_conn_available(self, master_host): to the pool """ connection_kwargs = {"host": master_host[0], "port": master_host[1]} - pool = self.get_pool( + async with self.get_pool( max_connections=1, timeout=2, connection_kwargs=connection_kwargs - ) - c1 = await pool.get_connection("_") + ) as pool: + c1 = await pool.get_connection("_") - async def target(): - await asyncio.sleep(0.1) - await pool.release(c1) + async def target(): + await asyncio.sleep(0.1) + await pool.release(c1) - start = asyncio.get_event_loop().time() - await asyncio.gather(target(), pool.get_connection("_")) - assert asyncio.get_event_loop().time() - start >= 0.1 + start = asyncio.get_event_loop().time() + await asyncio.gather(target(), pool.get_connection("_")) + assert asyncio.get_event_loop().time() - start >= 0.1 async def test_reuse_previously_released_connection(self, master_host): connection_kwargs = {"host": master_host} - pool = self.get_pool(connection_kwargs=connection_kwargs) - c1 = await pool.get_connection("_") - await pool.release(c1) - c2 = await pool.get_connection("_") - assert c1 == c2 + async with self.get_pool(connection_kwargs=connection_kwargs) as pool: + c1 = await pool.get_connection("_") + await pool.release(c1) + c2 = await pool.get_connection("_") + assert c1 == c2 def test_repr_contains_db_info_tcp(self): pool = redis.ConnectionPool( @@ -689,6 +701,8 @@ async def test_arbitrary_command_advances_next_health_check(self, r): if r.connection: await r.get("foo") next_health_check = r.connection.next_health_check + # ensure that the event loop's `time()` advances a bit + await asyncio.sleep(0.001) await r.get("foo") assert next_health_check < r.connection.next_health_check diff --git a/tests/test_asyncio/test_encoding.py b/tests/test_asyncio/test_encoding.py index 133ea3783c..5db7187c84 100644 --- a/tests/test_asyncio/test_encoding.py +++ b/tests/test_asyncio/test_encoding.py @@ -4,14 +4,14 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio import redis.asyncio as redis from redis.exceptions import DataError -pytestmark = pytest.mark.asyncio - @pytest.mark.onlynoncluster class TestEncoding: diff --git a/tests/test_asyncio/test_json.py b/tests/test_asyncio/test_json.py index a045dd7c1a..416a9f4a21 100644 --- a/tests/test_asyncio/test_json.py +++ b/tests/test_asyncio/test_json.py @@ -5,8 +5,6 @@ from redis.commands.json.path import Path from tests.conftest import skip_ifmodversion_lt -pytestmark = pytest.mark.asyncio - @pytest.mark.redismod async def test_json_setbinarykey(modclient: redis.Redis): diff --git a/tests/test_asyncio/test_lock.py b/tests/test_asyncio/test_lock.py index 8ceb3bc958..86a8d62f71 100644 --- a/tests/test_asyncio/test_lock.py +++ b/tests/test_asyncio/test_lock.py @@ -5,14 +5,14 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio from redis.asyncio.lock import Lock from redis.exceptions import LockError, LockNotOwnedError -pytestmark = pytest.mark.asyncio - @pytest.mark.onlynoncluster class TestLock: diff --git a/tests/test_asyncio/test_monitor.py b/tests/test_asyncio/test_monitor.py index 783ba262b0..9185bcd2ee 100644 --- a/tests/test_asyncio/test_monitor.py +++ b/tests/test_asyncio/test_monitor.py @@ -1,10 +1,13 @@ +import sys + import pytest from tests.conftest import skip_if_redis_enterprise, skip_ifnot_redis_enterprise from .conftest import wait_for_command -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio @pytest.mark.onlynoncluster diff --git a/tests/test_asyncio/test_pipeline.py b/tests/test_asyncio/test_pipeline.py index dfeb66464c..33391d019d 100644 --- a/tests/test_asyncio/test_pipeline.py +++ b/tests/test_asyncio/test_pipeline.py @@ -1,3 +1,5 @@ +import sys + import pytest import redis @@ -5,7 +7,8 @@ from .conftest import wait_for_command -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio class TestPipeline: diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index 6c76bf334e..d6a817a61b 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -8,6 +8,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio(forbid_global_loop=True) else: import pytest_asyncio @@ -18,8 +20,6 @@ from .compat import mock -pytestmark = pytest.mark.asyncio(forbid_global_loop=True) - def with_timeout(t): def wrapper(corofunc): @@ -80,6 +80,13 @@ def make_subscribe_test_data(pubsub, type): assert False, f"invalid subscribe type: {type}" +@pytest_asyncio.fixture() +async def pubsub(r: redis.Redis): + p = r.pubsub() + yield p + await p.close() + + @pytest.mark.onlynoncluster class TestPubSubSubscribeUnsubscribe: async def _test_subscribe_unsubscribe( @@ -101,12 +108,12 @@ async def _test_subscribe_unsubscribe( i = len(keys) - 1 - i assert await wait_for_message(p) == make_message(unsub_type, key, i) - async def test_channel_subscribe_unsubscribe(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "channel") + async def test_channel_subscribe_unsubscribe(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "channel") await self._test_subscribe_unsubscribe(**kwargs) - async def test_pattern_subscribe_unsubscribe(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "pattern") + async def test_pattern_subscribe_unsubscribe(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "pattern") await self._test_subscribe_unsubscribe(**kwargs) @pytest.mark.onlynoncluster @@ -144,12 +151,12 @@ async def _test_resubscribe_on_reconnection( for channel in unique_channels: assert channel in keys - async def test_resubscribe_to_channels_on_reconnection(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "channel") + async def test_resubscribe_to_channels_on_reconnection(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "channel") await self._test_resubscribe_on_reconnection(**kwargs) - async def test_resubscribe_to_patterns_on_reconnection(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "pattern") + async def test_resubscribe_to_patterns_on_reconnection(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "pattern") await self._test_resubscribe_on_reconnection(**kwargs) async def _test_subscribed_property( @@ -199,13 +206,13 @@ async def _test_subscribed_property( # now we're finally unsubscribed assert p.subscribed is False - async def test_subscribe_property_with_channels(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "channel") + async def test_subscribe_property_with_channels(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "channel") await self._test_subscribed_property(**kwargs) @pytest.mark.onlynoncluster - async def test_subscribe_property_with_patterns(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "pattern") + async def test_subscribe_property_with_patterns(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "pattern") await self._test_subscribed_property(**kwargs) async def test_ignore_all_subscribe_messages(self, r: redis.Redis): @@ -224,9 +231,10 @@ async def test_ignore_all_subscribe_messages(self, r: redis.Redis): assert p.subscribed is True assert await wait_for_message(p) is None assert p.subscribed is False + await p.close() - async def test_ignore_individual_subscribe_messages(self, r: redis.Redis): - p = r.pubsub() + async def test_ignore_individual_subscribe_messages(self, pubsub): + p = pubsub checks = ( (p.subscribe, "foo"), @@ -243,13 +251,13 @@ async def test_ignore_individual_subscribe_messages(self, r: redis.Redis): assert message is None assert p.subscribed is False - async def test_sub_unsub_resub_channels(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "channel") + async def test_sub_unsub_resub_channels(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "channel") await self._test_sub_unsub_resub(**kwargs) @pytest.mark.onlynoncluster - async def test_sub_unsub_resub_patterns(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "pattern") + async def test_sub_unsub_resub_patterns(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "pattern") await self._test_sub_unsub_resub(**kwargs) async def _test_sub_unsub_resub( @@ -266,12 +274,12 @@ async def _test_sub_unsub_resub( assert await wait_for_message(p) == make_message(sub_type, key, 1) assert p.subscribed is True - async def test_sub_unsub_all_resub_channels(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "channel") + async def test_sub_unsub_all_resub_channels(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "channel") await self._test_sub_unsub_all_resub(**kwargs) - async def test_sub_unsub_all_resub_patterns(self, r: redis.Redis): - kwargs = make_subscribe_test_data(r.pubsub(), "pattern") + async def test_sub_unsub_all_resub_patterns(self, pubsub): + kwargs = make_subscribe_test_data(pubsub, "pattern") await self._test_sub_unsub_all_resub(**kwargs) async def _test_sub_unsub_all_resub( @@ -300,8 +308,8 @@ def message_handler(self, message): async def async_message_handler(self, message): self.async_message = message - async def test_published_message_to_channel(self, r: redis.Redis): - p = r.pubsub() + async def test_published_message_to_channel(self, r: redis.Redis, pubsub): + p = pubsub await p.subscribe("foo") assert await wait_for_message(p) == make_message("subscribe", "foo", 1) assert await r.publish("foo", "test message") == 1 @@ -310,8 +318,8 @@ async def test_published_message_to_channel(self, r: redis.Redis): assert isinstance(message, dict) assert message == make_message("message", "foo", "test message") - async def test_published_message_to_pattern(self, r: redis.Redis): - p = r.pubsub() + async def test_published_message_to_pattern(self, r: redis.Redis, pubsub): + p = pubsub await p.subscribe("foo") await p.psubscribe("f*") assert await wait_for_message(p) == make_message("subscribe", "foo", 1) @@ -340,6 +348,7 @@ async def test_channel_message_handler(self, r: redis.Redis): assert await r.publish("foo", "test message") == 1 assert await wait_for_message(p) is None assert self.message == make_message("message", "foo", "test message") + await p.close() async def test_channel_async_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) @@ -348,6 +357,7 @@ async def test_channel_async_message_handler(self, r): assert await r.publish("foo", "test message") == 1 assert await wait_for_message(p) is None assert self.async_message == make_message("message", "foo", "test message") + await p.close() async def test_channel_sync_async_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) @@ -359,6 +369,7 @@ async def test_channel_sync_async_message_handler(self, r): assert await wait_for_message(p) is None assert self.message == make_message("message", "foo", "test message") assert self.async_message == make_message("message", "bar", "test message 2") + await p.close() @pytest.mark.onlynoncluster async def test_pattern_message_handler(self, r: redis.Redis): @@ -370,6 +381,7 @@ async def test_pattern_message_handler(self, r: redis.Redis): assert self.message == make_message( "pmessage", "foo", "test message", pattern="f*" ) + await p.close() async def test_unicode_channel_message_handler(self, r: redis.Redis): p = r.pubsub(ignore_subscribe_messages=True) @@ -380,6 +392,7 @@ async def test_unicode_channel_message_handler(self, r: redis.Redis): assert await r.publish(channel, "test message") == 1 assert await wait_for_message(p) is None assert self.message == make_message("message", channel, "test message") + await p.close() @pytest.mark.onlynoncluster # see: https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html @@ -395,9 +408,10 @@ async def test_unicode_pattern_message_handler(self, r: redis.Redis): assert self.message == make_message( "pmessage", channel, "test message", pattern=pattern ) + await p.close() - async def test_get_message_without_subscribe(self, r: redis.Redis): - p = r.pubsub() + async def test_get_message_without_subscribe(self, r: redis.Redis, pubsub): + p = pubsub with pytest.raises(RuntimeError) as info: await p.get_message() expect = ( @@ -427,8 +441,8 @@ def message_handler(self, message): async def r(self, create_redis): return await create_redis(decode_responses=True) - async def test_channel_subscribe_unsubscribe(self, r: redis.Redis): - p = r.pubsub() + async def test_channel_subscribe_unsubscribe(self, pubsub): + p = pubsub await p.subscribe(self.channel) assert await wait_for_message(p) == self.make_message( "subscribe", self.channel, 1 @@ -439,8 +453,8 @@ async def test_channel_subscribe_unsubscribe(self, r: redis.Redis): "unsubscribe", self.channel, 0 ) - async def test_pattern_subscribe_unsubscribe(self, r: redis.Redis): - p = r.pubsub() + async def test_pattern_subscribe_unsubscribe(self, pubsub): + p = pubsub await p.psubscribe(self.pattern) assert await wait_for_message(p) == self.make_message( "psubscribe", self.pattern, 1 @@ -451,8 +465,8 @@ async def test_pattern_subscribe_unsubscribe(self, r: redis.Redis): "punsubscribe", self.pattern, 0 ) - async def test_channel_publish(self, r: redis.Redis): - p = r.pubsub() + async def test_channel_publish(self, r: redis.Redis, pubsub): + p = pubsub await p.subscribe(self.channel) assert await wait_for_message(p) == self.make_message( "subscribe", self.channel, 1 @@ -463,8 +477,8 @@ async def test_channel_publish(self, r: redis.Redis): ) @pytest.mark.onlynoncluster - async def test_pattern_publish(self, r: redis.Redis): - p = r.pubsub() + async def test_pattern_publish(self, r: redis.Redis, pubsub): + p = pubsub await p.psubscribe(self.pattern) assert await wait_for_message(p) == self.make_message( "psubscribe", self.pattern, 1 @@ -490,6 +504,7 @@ async def test_channel_message_handler(self, r: redis.Redis): await r.publish(self.channel, new_data) assert await wait_for_message(p) is None assert self.message == self.make_message("message", self.channel, new_data) + await p.close() async def test_pattern_message_handler(self, r: redis.Redis): p = r.pubsub(ignore_subscribe_messages=True) @@ -511,6 +526,7 @@ async def test_pattern_message_handler(self, r: redis.Redis): assert self.message == self.make_message( "pmessage", self.channel, new_data, pattern=self.pattern ) + await p.close() async def test_context_manager(self, r: redis.Redis): async with r.pubsub() as pubsub: @@ -520,6 +536,7 @@ async def test_context_manager(self, r: redis.Redis): assert pubsub.connection is None assert pubsub.channels == {} assert pubsub.patterns == {} + await pubsub.close() @pytest.mark.onlynoncluster @@ -535,8 +552,8 @@ async def test_channel_subscribe(self, r: redis.Redis): class TestPubSubSubcommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.0") - async def test_pubsub_channels(self, r: redis.Redis): - p = r.pubsub() + async def test_pubsub_channels(self, r: redis.Redis, pubsub): + p = pubsub await p.subscribe("foo", "bar", "baz", "quux") for i in range(4): assert (await wait_for_message(p))["type"] == "subscribe" @@ -560,6 +577,9 @@ async def test_pubsub_numsub(self, r: redis.Redis): channels = [(b"foo", 1), (b"bar", 2), (b"baz", 3)] assert await r.pubsub_numsub("foo", "bar", "baz") == channels + await p1.close() + await p2.close() + await p3.close() @skip_if_server_version_lt("2.8.0") async def test_pubsub_numpat(self, r: redis.Redis): @@ -568,6 +588,7 @@ async def test_pubsub_numpat(self, r: redis.Redis): for i in range(3): assert (await wait_for_message(p))["type"] == "psubscribe" assert await r.pubsub_numpat() == 3 + await p.close() @pytest.mark.onlynoncluster @@ -580,6 +601,7 @@ async def test_send_pubsub_ping(self, r: redis.Redis): assert await wait_for_message(p) == make_message( type="pong", channel=None, data="", pattern=None ) + await p.close() @skip_if_server_version_lt("3.0.0") async def test_send_pubsub_ping_message(self, r: redis.Redis): @@ -589,13 +611,16 @@ async def test_send_pubsub_ping_message(self, r: redis.Redis): assert await wait_for_message(p) == make_message( type="pong", channel=None, data="hello world", pattern=None ) + await p.close() @pytest.mark.onlynoncluster class TestPubSubConnectionKilled: @skip_if_server_version_lt("3.0.0") - async def test_connection_error_raised_when_connection_dies(self, r: redis.Redis): - p = r.pubsub() + async def test_connection_error_raised_when_connection_dies( + self, r: redis.Redis, pubsub + ): + p = pubsub await p.subscribe("foo") assert await wait_for_message(p) == make_message("subscribe", "foo", 1) for client in await r.client_list(): @@ -607,8 +632,8 @@ async def test_connection_error_raised_when_connection_dies(self, r: redis.Redis @pytest.mark.onlynoncluster class TestPubSubTimeouts: - async def test_get_message_with_timeout_returns_none(self, r: redis.Redis): - p = r.pubsub() + async def test_get_message_with_timeout_returns_none(self, pubsub): + p = pubsub await p.subscribe("foo") assert await wait_for_message(p) == make_message("subscribe", "foo", 1) assert await p.get_message(timeout=0.01) is None @@ -616,15 +641,13 @@ async def test_get_message_with_timeout_returns_none(self, r: redis.Redis): @pytest.mark.onlynoncluster class TestPubSubReconnect: - # @pytest.mark.xfail @with_timeout(2) - async def test_reconnect_listen(self, r: redis.Redis): + async def test_reconnect_listen(self, r: redis.Redis, pubsub): """ Test that a loop processing PubSub messages can survive a disconnect, by issuing a connect() call. """ messages = asyncio.Queue() - pubsub = r.pubsub() interrupt = False async def loop(): @@ -698,12 +721,12 @@ async def _subscribe(self, p, *args, **kwargs): ): return - async def test_callbacks(self, r: redis.Redis): + async def test_callbacks(self, r: redis.Redis, pubsub): def callback(message): messages.put_nowait(message) messages = asyncio.Queue() - p = r.pubsub() + p = pubsub await self._subscribe(p, foo=callback) task = asyncio.get_event_loop().create_task(p.run()) await r.publish("foo", "bar") @@ -720,13 +743,13 @@ def callback(message): "type": "message", } - async def test_exception_handler(self, r: redis.Redis): + async def test_exception_handler(self, r: redis.Redis, pubsub): def exception_handler_callback(e, pubsub) -> None: assert pubsub == p exceptions.put_nowait(e) exceptions = asyncio.Queue() - p = r.pubsub() + p = pubsub await self._subscribe(p, foo=lambda x: None) with mock.patch.object(p, "get_message", side_effect=Exception("error")): task = asyncio.get_event_loop().create_task( @@ -740,26 +763,25 @@ def exception_handler_callback(e, pubsub) -> None: pass assert str(e) == "error" - async def test_late_subscribe(self, r: redis.Redis): + async def test_late_subscribe(self, r: redis.Redis, pubsub): def callback(message): messages.put_nowait(message) messages = asyncio.Queue() - p = r.pubsub() + p = pubsub task = asyncio.get_event_loop().create_task(p.run()) # wait until loop gets settled. Add a subscription await asyncio.sleep(0.1) await p.subscribe(foo=callback) # wait tof the subscribe to finish. Cannot use _subscribe() because # p.run() is already accepting messages - await asyncio.sleep(0.1) - await r.publish("foo", "bar") - message = None - try: - async with async_timeout.timeout(0.1): - message = await messages.get() - except asyncio.TimeoutError: - pass + while True: + n = await r.publish("foo", "bar") + if n == 1: + break + await asyncio.sleep(0.1) + async with async_timeout.timeout(0.1): + message = await messages.get() task.cancel() # we expect a cancelled error, not the Runtime error # ("did you forget to call subscribe()"") diff --git a/tests/test_asyncio/test_scripting.py b/tests/test_asyncio/test_scripting.py index 764525fb4a..406ab208e2 100644 --- a/tests/test_asyncio/test_scripting.py +++ b/tests/test_asyncio/test_scripting.py @@ -4,6 +4,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio diff --git a/tests/test_asyncio/test_search.py b/tests/test_asyncio/test_search.py index 5aaa56f159..bc3a212ac9 100644 --- a/tests/test_asyncio/test_search.py +++ b/tests/test_asyncio/test_search.py @@ -1,6 +1,7 @@ import bz2 import csv import os +import sys import time from io import TextIOWrapper @@ -18,7 +19,8 @@ from redis.commands.search.suggestion import Suggestion from tests.conftest import skip_ifmodversion_lt -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio WILL_PLAY_TEXT = os.path.abspath( diff --git a/tests/test_asyncio/test_sentinel.py b/tests/test_asyncio/test_sentinel.py index 4130e67400..e77e07f98e 100644 --- a/tests/test_asyncio/test_sentinel.py +++ b/tests/test_asyncio/test_sentinel.py @@ -5,6 +5,8 @@ if sys.version_info[0:2] == (3, 6): import pytest as pytest_asyncio + + pytestmark = pytest.mark.asyncio else: import pytest_asyncio @@ -17,8 +19,6 @@ SlaveNotFoundError, ) -pytestmark = pytest.mark.asyncio - @pytest_asyncio.fixture(scope="module") def master_ip(master_host): diff --git a/tests/test_asyncio/test_timeseries.py b/tests/test_asyncio/test_timeseries.py index ac2807fe1d..0e57c4f049 100644 --- a/tests/test_asyncio/test_timeseries.py +++ b/tests/test_asyncio/test_timeseries.py @@ -1,3 +1,4 @@ +import sys import time from time import sleep @@ -6,7 +7,8 @@ import redis.asyncio as redis from tests.conftest import skip_ifmodversion_lt -pytestmark = pytest.mark.asyncio +if sys.version_info[0:2] == (3, 6): + pytestmark = pytest.mark.asyncio @pytest.mark.redismod diff --git a/tests/test_graph.py b/tests/test_graph.py index 76f8794c18..526308c672 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1,7 +1,24 @@ +from unittest.mock import patch + import pytest from redis.commands.graph import Edge, Node, Path from redis.commands.graph.execution_plan import Operation +from redis.commands.graph.query_result import ( + CACHED_EXECUTION, + INDICES_CREATED, + INDICES_DELETED, + INTERNAL_EXECUTION_TIME, + LABELS_ADDED, + LABELS_REMOVED, + NODES_CREATED, + NODES_DELETED, + PROPERTIES_REMOVED, + PROPERTIES_SET, + RELATIONSHIPS_CREATED, + RELATIONSHIPS_DELETED, + QueryResult, +) from redis.exceptions import ResponseError from tests.conftest import skip_if_redis_enterprise @@ -575,3 +592,33 @@ def test_explain(client): assert result.structured_plan == expected redis_graph.delete() + + +@pytest.mark.redismod +def test_resultset_statistics(client): + with patch.object(target=QueryResult, attribute="_get_stat") as mock_get_stats: + result = client.graph().query("RETURN 1") + result.labels_added + mock_get_stats.assert_called_with(LABELS_ADDED) + result.labels_removed + mock_get_stats.assert_called_with(LABELS_REMOVED) + result.nodes_created + mock_get_stats.assert_called_with(NODES_CREATED) + result.nodes_deleted + mock_get_stats.assert_called_with(NODES_DELETED) + result.properties_set + mock_get_stats.assert_called_with(PROPERTIES_SET) + result.properties_removed + mock_get_stats.assert_called_with(PROPERTIES_REMOVED) + result.relationships_created + mock_get_stats.assert_called_with(RELATIONSHIPS_CREATED) + result.relationships_deleted + mock_get_stats.assert_called_with(RELATIONSHIPS_DELETED) + result.indices_created + mock_get_stats.assert_called_with(INDICES_CREATED) + result.indices_deleted + mock_get_stats.assert_called_with(INDICES_DELETED) + result.cached_execution + mock_get_stats.assert_called_with(CACHED_EXECUTION) + result.run_time_ms + mock_get_stats.assert_called_with(INTERNAL_EXECUTION_TIME) diff --git a/tests/test_json.py b/tests/test_json.py index 1cc448c5f9..0965a93d88 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -1411,7 +1411,8 @@ def test_set_path(client): with open(jsonfile, "w+") as fp: fp.write(json.dumps({"hello": "world"})) - open(nojsonfile, "a+").write("hello") + with open(nojsonfile, "a+") as fp: + fp.write("hello") result = {jsonfile: True, nojsonfile: False} assert client.json().set_path(Path.root_path(), root) == result diff --git a/tests/test_ssl.py b/tests/test_ssl.py index d029b80dcb..ed38a3166b 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -68,8 +68,8 @@ def test_validating_self_signed_certificate(self, request): assert r.ping() def test_validating_self_signed_string_certificate(self, request): - f = open(self.SERVER_CERT) - cert_data = f.read() + with open(self.SERVER_CERT) as f: + cert_data = f.read() ssl_url = request.config.option.redis_ssl_url p = urlparse(ssl_url)[1].split(":") r = redis.Redis( diff --git a/tox.ini b/tox.ini index 0ceb008cf6..d1aeb02ade 100644 --- a/tox.ini +++ b/tox.ini @@ -9,6 +9,7 @@ markers = asyncio: marker for async tests replica: replica tests experimental: run only experimental tests +asyncio_mode = auto [tox] minversion = 3.2.0