From 75926e570f16b37fccb70537db33e202f4ab20cf Mon Sep 17 00:00:00 2001 From: Derek Nelson <=> Date: Sun, 4 Dec 2016 20:26:58 -0800 Subject: [PATCH] Fix tests --- test/test_pipeline_kafka.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/test_pipeline_kafka.py b/test/test_pipeline_kafka.py index 21b506dd..ee53d53f 100644 --- a/test/test_pipeline_kafka.py +++ b/test/test_pipeline_kafka.py @@ -9,11 +9,11 @@ def test_basic(pipeline, kafka, clean_db): Produce and consume several topics into several streams and verify that all resulting data is correct """ - pipeline.create_stream('stream', x='integer') - pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream GROUP BY x') + pipeline.create_stream('stream0', x='integer') + pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream0 GROUP BY x') kafka.create_topic('test_basic') - pipeline.consume_begin('test_basic', 'stream') + pipeline.consume_begin('test_basic', 'stream0') producer = kafka.get_producer('test_basic') for n in range(1000): @@ -33,11 +33,11 @@ def test_consumers(pipeline, kafka, clean_db): """ Verify that offsets are properly maintained when storing them locally across consumer restarts """ - pipeline.create_stream('stream', x='integer') - pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream GROUP BY x') + pipeline.create_stream('stream0', x='integer') + pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream0 GROUP BY x') kafka.create_topic('test_consumers', partitions=4) - pipeline.consume_begin('test_consumers', 'stream', parallelism=4) + pipeline.consume_begin('test_consumers', 'stream0', parallelism=4) producer = kafka.get_producer('test_consumers') for n in range(1000): @@ -73,7 +73,7 @@ def messages_consumed0(): for n in range(1000): producer.produce(str(n)) - pipeline.consume_begin('test_consumers', 'stream', parallelism=4) + pipeline.consume_begin('test_consumers', 'stream0', parallelism=4) time.sleep(2) # Verify count @@ -147,8 +147,8 @@ def test_consume_stream_partitioned_safety(pipeline, kafka, clean_db): Produce without a key or non-existent stream key and make sure thing work properly ''' - pipeline.create_stream('stream', x='integer') - pipeline.create_cv('cv', 'SELECT count(*) FROM stream') + pipeline.create_stream('stream0', x='integer') + pipeline.create_cv('cv', 'SELECT count(*) FROM stream0') kafka.create_topic('stream_partitioned_topic_safe') pipeline.consume_begin_stream_partitioned('stream_partitioned_topic_safe') @@ -158,7 +158,7 @@ def test_consume_stream_partitioned_safety(pipeline, kafka, clean_db): for n in range(100): producer.produce(str(n), partition_key='') producer.produce(str(n), partition_key='invalid') - producer.produce(str(n), partition_key='stream') + producer.produce(str(n), partition_key='stream0') def messages_partitioned(): rows = pipeline.execute('SELECT count FROM cv') @@ -349,15 +349,15 @@ def test_produce(pipeline, kafka, clean_db): """ Tests pipeline_kafka.emit_tuple and pipeline_kafka.produce_message """ - pipeline.create_stream('stream', payload='json') - pipeline.create_cv('cv', 'SELECT payload FROM stream') + pipeline.create_stream('stream0', payload='json') + pipeline.create_cv('cv', 'SELECT payload FROM stream0') pipeline.create_table('t', x='integer', y='integer') pipeline.execute("""CREATE TRIGGER tg AFTER INSERT ON t FOR EACH ROW EXECUTE PROCEDURE pipeline_kafka.emit_tuple('topic') """) kafka.create_topic('topic', partitions=4) - pipeline.consume_begin('topic', 'stream') + pipeline.consume_begin('topic', 'stream0') for i in range(100): pipeline.insert('t', ('x', 'y'), [(i, 2 * i)])