Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Nelson committed Dec 5, 2016
1 parent 75a915f commit 75926e5
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions test/test_pipeline_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ def test_basic(pipeline, kafka, clean_db):
Produce and consume several topics into several streams and verify that
all resulting data is correct
"""
pipeline.create_stream('stream', x='integer')
pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream GROUP BY x')
pipeline.create_stream('stream0', x='integer')
pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream0 GROUP BY x')

kafka.create_topic('test_basic')
pipeline.consume_begin('test_basic', 'stream')
pipeline.consume_begin('test_basic', 'stream0')

producer = kafka.get_producer('test_basic')
for n in range(1000):
Expand All @@ -33,11 +33,11 @@ def test_consumers(pipeline, kafka, clean_db):
"""
Verify that offsets are properly maintained when storing them locally across consumer restarts
"""
pipeline.create_stream('stream', x='integer')
pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream GROUP BY x')
pipeline.create_stream('stream0', x='integer')
pipeline.create_cv('basic', 'SELECT x, COUNT(*) FROM stream0 GROUP BY x')

kafka.create_topic('test_consumers', partitions=4)
pipeline.consume_begin('test_consumers', 'stream', parallelism=4)
pipeline.consume_begin('test_consumers', 'stream0', parallelism=4)

producer = kafka.get_producer('test_consumers')
for n in range(1000):
Expand Down Expand Up @@ -73,7 +73,7 @@ def messages_consumed0():
for n in range(1000):
producer.produce(str(n))

pipeline.consume_begin('test_consumers', 'stream', parallelism=4)
pipeline.consume_begin('test_consumers', 'stream0', parallelism=4)
time.sleep(2)

# Verify count
Expand Down Expand Up @@ -147,8 +147,8 @@ def test_consume_stream_partitioned_safety(pipeline, kafka, clean_db):
Produce without a key or non-existent stream key and make sure
thing work properly
'''
pipeline.create_stream('stream', x='integer')
pipeline.create_cv('cv', 'SELECT count(*) FROM stream')
pipeline.create_stream('stream0', x='integer')
pipeline.create_cv('cv', 'SELECT count(*) FROM stream0')

kafka.create_topic('stream_partitioned_topic_safe')
pipeline.consume_begin_stream_partitioned('stream_partitioned_topic_safe')
Expand All @@ -158,7 +158,7 @@ def test_consume_stream_partitioned_safety(pipeline, kafka, clean_db):
for n in range(100):
producer.produce(str(n), partition_key='')
producer.produce(str(n), partition_key='invalid')
producer.produce(str(n), partition_key='stream')
producer.produce(str(n), partition_key='stream0')

def messages_partitioned():
rows = pipeline.execute('SELECT count FROM cv')
Expand Down Expand Up @@ -349,15 +349,15 @@ def test_produce(pipeline, kafka, clean_db):
"""
Tests pipeline_kafka.emit_tuple and pipeline_kafka.produce_message
"""
pipeline.create_stream('stream', payload='json')
pipeline.create_cv('cv', 'SELECT payload FROM stream')
pipeline.create_stream('stream0', payload='json')
pipeline.create_cv('cv', 'SELECT payload FROM stream0')
pipeline.create_table('t', x='integer', y='integer')
pipeline.execute("""CREATE TRIGGER tg AFTER INSERT ON t
FOR EACH ROW EXECUTE PROCEDURE pipeline_kafka.emit_tuple('topic')
""")

kafka.create_topic('topic', partitions=4)
pipeline.consume_begin('topic', 'stream')
pipeline.consume_begin('topic', 'stream0')

for i in range(100):
pipeline.insert('t', ('x', 'y'), [(i, 2 * i)])
Expand Down

0 comments on commit 75926e5

Please sign in to comment.