Skip to content

Commit

Permalink
Merge pull request #64 from shuhaowu/ruby-integration-test
Browse files Browse the repository at this point in the history
Integration tests by compiling and calling Ghostferry from Ruby
  • Loading branch information
shuhaowu authored Jan 21, 2019
2 parents 3037220 + b8fb91a commit 07831b1
Show file tree
Hide file tree
Showing 35 changed files with 1,044 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
source "https://rubygems.org"

gem "minitest"
gem "minitest-hooks"
gem "mysql2"

gem "rake"
20 changes: 20 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
GEM
remote: https://rubygems.org/
specs:
minitest (5.11.3)
minitest-hooks (1.5.0)
minitest (> 5.3)
mysql2 (0.5.2)
rake (12.3.2)

PLATFORMS
ruby

DEPENDENCIES
minitest
minitest-hooks
mysql2
rake

BUNDLED WITH
1.16.1
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ $(GOBIN):

test:
@go version
go test ./test ./copydb/test ./sharding/test -p 1 -v
go test ./test/go ./copydb/test ./sharding/test -p 1 -v
bundle install && bundle exec rake test

clean:
rm -rf build
Expand Down
9 changes: 9 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require 'rake/testtask'

task :default => [:test]

Rake::TestTask.new do |t|
t.test_files = FileList['test/**/*test.rb']
t.verbose = true
t.libs << ["test", "test/helpers", "test/lib"]
end
2 changes: 2 additions & 0 deletions dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ up:
- homebrew:
- glide
- mysql
- ruby: 2.5.1
- bundler
- go:
version: 1.10.3
- custom:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
132 changes: 132 additions & 0 deletions test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require "logger"
require "thread"

require "db_helper"
require "ghostferry_helper"
require "mysql2"

module DataWriterHelper
def start_datawriter_with_ghostferry(dw, gf, &on_write)
gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do
dw.start(&on_write)
end
end

def stop_datawriter_during_cutover(dw, gf)
gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do
# At the start of the cutover phase, we have to set the database to
# read-only. This is done by stopping the datawriter.
dw.stop_and_join
end
end

class DataWriter
# A threaded data writer that just hammers the database with write
# queries as much as possible.
#
# This is used essentially for random testing.
def initialize(db_config,
tables: [DbHelper::DEFAULT_FULL_TABLE_NAME],
insert_probability: 0.33,
update_probability: 0.33,
delete_probability: 0.34,
number_of_writers: 1,
logger: nil
)
@db_config = db_config
@tables = tables

@number_of_writers = number_of_writers
@insert_probability = [0, insert_probability]
@update_probability = [@insert_probability[1], @insert_probability[1] + update_probability]
@delete_probability = [@update_probability[1], @update_probability[1] + delete_probability]

@threads = []
@started = false
@stop_requested = false

@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
end
end

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")

connection = Mysql2::Client.new(@db_config)
until @stop_requested do
write_data(connection, &on_write)
end

@logger.info("stopped data writer thread #{i}")
end
end
end

def stop_and_join
@stop_requested = true
join
end

def join
@threads.each do |t|
t.join
end
end

def write_data(connection, &on_write)
r = rand

if r >= @insert_probability[0] && r < @insert_probability[1]
id = insert_data(connection)
op = "INSERT"
elsif r >= @update_probability[0] && r < @update_probability[1]
id = update_data(connection)
op = "UPDATE"
elsif r >= @delete_probability[0] && r < @delete_probability[1]
id = delete_data(connection)
op = "DELETE"
end

@logger.debug("writing data: #{op} #{id}")
on_write.call(op, id) unless on_write.nil?
end

def insert_data(connection)
table = @tables.sample
insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)")
insert_statement.execute(nil, DbHelper.rand_data)
connection.last_id
end

def update_data(connection)
table = @tables.sample
id = random_real_id(connection, table)
update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1")
update_statement.execute(DbHelper.rand_data, id)
id
end

def delete_data(connection)
table = @tables.sample
id = random_real_id(connection, table)
delete_statement = connection.prepare("DELETE FROM #{table} WHERE id >= ? LIMIT 1")
delete_statement.execute(id)
id
end

def random_real_id(connection, table)
# This query is slow for large datasets.
# For testing purposes, this should be okay.
result = connection.query("SELECT id FROM #{table} ORDER BY RAND() LIMIT 1")
raise "No rows in the database?" if result.first.nil?
result.first["id"]
end
end
end
150 changes: 150 additions & 0 deletions test/helpers/db_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "logger"
require "mysql2"

module DbHelper
ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a
DB_PORTS = {source: 29291, target: 29292}

DEFAULT_DB = "gftest"
DEFAULT_TABLE = "test_table_1"

def self.full_table_name(db, table)
"`#{db}`.`#{table}`"
end

def self.rand_data(length: 32)
ALPHANUMERICS.sample(length).join("") + "👻⛴️"
end

DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE)

def full_table_name(db, table)
DbHelper.full_table_name(db, table)
end

def rand_data(length: 32)
DbHelper.rand_data(length: length)
end

def default_db_config(port:)
{
host: "127.0.0.1",
port: port,
username: "root",
password: "",
encoding: "utf8mb4",
collation: "utf8mb4_unicode_ci",
}
end

def transaction(connection)
raise ArgumentError, "must pass a block" if !block_given?

begin
connection.query("BEGIN")
yield
rescue
connection.query("ROLLBACK")
raise
else
connection.query("COMMIT")
end
end

def initialize_db_connections
@connections = {}
DB_PORTS.each do |name, port|
@connections[name] = Mysql2::Client.new(default_db_config(port: port))
end
end

def source_db
@connections[:source]
end

def target_db
@connections[:target]
end

def source_db_config
default_db_config(port: DB_PORTS[:source])
end

def target_db_config
default_db_config(port: DB_PORTS[:target])
end

# Database Seeding Methods
##########################
# Each test case can choose what kind of database it wants to setup by
# calling one of these methods.

def reset_data
@connections.each do |_, connection|
connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`")
end
end

def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111)
dbtable = full_table_name(database_name, table_name)

connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}")
connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))")

transaction(connection) do
insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)")

number_of_rows.times do
insert_statement.execute(nil, rand_data)
end
end
end

def seed_simple_database_with_single_table
# Setup the source database with data.
max_id = 1111
seed_random_data(source_db, number_of_rows: max_id)

# Create some holes in the data.
delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?")
140.times do
delete_statement.execute(Random.rand(max_id) + 1)
end

# Setup the target database with no data but the correct schema.
seed_random_data(target_db, number_of_rows: 0)
end

# Get some overall metrics like CHECKSUM, row count, sample row from tables.
# Generally used for test validation.
def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME])
source_metrics = {}
target_metrics = {}

tables.each do |table|
source_metrics[table] = table_metric(source_db, table)
target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"])
end

[source_metrics, target_metrics]
end

def table_metric(conn, table, sample_id: nil)
metrics = {}
result = conn.query("CHECKSUM TABLE #{table}")
metrics[:checksum] = result.first["Checksum"]

result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}")
metrics[:row_count] = result.first["cnt"]

if sample_id.nil?
result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1")
metrics[:sample_row] = result.first
else
result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1")
metrics[:sample_row] = result.first
end

metrics
end
end
Loading

0 comments on commit 07831b1

Please sign in to comment.