forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 2
/
delete_insert_test.py
81 lines (65 loc) · 2.9 KB
/
delete_insert_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from dtest import Tester, debug
import uuid
import cql
import os
import threading
import random
class DeleteInsertTest(Tester):
"""
Examines scenarios around deleting data and adding data back with the same key
"""
def __init__(self, *args, **kwargs):
Tester.__init__(self, *args, **kwargs)
# Generate 1000 rows in memory so we can re-use the same ones over again:
self.groups = ['group1', 'group2', 'group3', 'group4']
self.rows = [(str(uuid.uuid1()),x,random.choice(self.groups)) for x in range(1000)]
def create_ddl(self, cursor, rf={'dc1':2, 'dc2':2}):
self.create_ks(cursor, 'delete_insert_search_test', rf)
cursor.execute('CREATE TABLE test (id uuid PRIMARY KEY, val1 text, group text)')
cursor.execute('CREATE INDEX group_idx ON test (group)')
def delete_all_rows(self, cursor):
for id, val, group in self.rows:
cursor.execute("DELETE FROM test WHERE id=%s" % u)
def delete_group_rows(self, cursor, group):
"""Delete rows from a given group and return them"""
rows = [r for r in self.rows if r[2]==group]
ids = [r[0] for r in rows]
cursor.execute('DELETE FROM test WHERE id in (%s)' % ', '.join(ids))
return rows
def insert_all_rows(self, cursor):
self.insert_some_rows(cursor, self.rows)
def insert_some_rows(self, cursor, rows):
for row in rows:
cursor.execute("INSERT INTO test (id, val1, group) VALUES (%s, '%s', '%s')" % row)
def delete_insert_search_test(self):
cluster = self.cluster
cluster.populate([2,2]).start()
node1 = cluster.nodelist()[0]
cursor = self.cql_connection(node1).cursor()
cursor.consistency_level = 'LOCAL_QUORUM'
self.create_ddl(cursor)
# Create 1000 rows:
self.insert_all_rows(cursor)
# Delete all of group2:
deleted = self.delete_group_rows(cursor, 'group2')
# Put that group back:
self.insert_some_rows(cursor, rows=deleted)
# Verify that all of group2 is back, 20 times, in parallel
# querying across all nodes:
class ThreadedQuery(threading.Thread):
def __init__(self, connection):
threading.Thread.__init__(self)
self.connection = connection
def run(self):
cursor = self.connection.cursor()
cursor.consistency_level = 'LOCAL_QUORUM'
cursor.execute("SELECT * FROM delete_insert_search_test.test WHERE group = 'group2'")
assert cursor.rowcount == len(deleted)
threads = []
for x in range(20):
conn = self.cql_connection(random.choice(cluster.nodelist()))
threads.append(ThreadedQuery(conn))
for t in threads:
t.start()
for t in threads:
t.join()