import os import subprocess from django.db.backends.base.creation import TEST_DATABASE_PREFIX LOCAL = { 'HOST': os.getenv('TEST_DB_HOST', 'localhost'), 'PORT': os.getenv('TEST_DB_PORT', 3306), 'USER': os.getenv('TEST_DB_USER', 'root'), 'PASSWORD': os.getenv('TEST_DB_PASSWORD', "") } def truncate_database(connection): """ truncate database :param connection: :return: """ db = connection.settings_dict command = """ mysql -u{USER} -P{PORT} -p{PASSWORD} -Nse 'show tables' {DATABASE} | while read table; do mysql -u{USER} -P{PORT} -p{PASSWORD} -e "SET FOREIGN_KEY_CHECKS=0; truncate table $table" {DATABASE}; done """.format(USER=db['USER'], PORT=db['PORT'], PASSWORD=db['PASSWORD'], DATABASE=db['NAME']) if not db['PASSWORD']: command = command.replace("-p", "") subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) def database_exists(connection): """ :param connection: :return: """ test_db_name = TEST_DATABASE_PREFIX + connection.settings_dict['NAME'] cmdline = "mysqlshow -h%s -P%s -u%s" % (LOCAL['HOST'], LOCAL['PORT'], LOCAL['USER']) if LOCAL['PASSWORD']: cmdline += " -p%s" % LOCAL['PASSWORD'] cmdline += " | grep -v Wildcard | grep -ow %s" % test_db_name ps = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = ps.communicate()[0].strip() return output == test_db_name def copy_database_structure(connection): """ copy database structure to test_db. :param connection: :return: """ src = connection.settings_dict dst = LOCAL.copy() dst['NAME'] = TEST_DATABASE_PREFIX + connection.settings_dict['NAME'] if database_exists(connection): # if test database exist, skip rebuild database. return dst cmdline = "echo 'CREATE DATABASE IF NOT EXISTS %s' | mysql -h %s -P %s -u %s" % ( dst['NAME'], dst['HOST'], dst['PORT'], dst['USER'] ) if dst['PASSWORD']: cmdline += "-p " + dst['PASSWORD'] os.system(cmdline) cmdline = "mysqldump -h %s -P %s -u %s -p%s --set-gtid-purged=OFF -d %s | mysql -h %s -P %s -u %s -D%s" % \ (src['HOST'], src['PORT'], src['USER'], src['PASSWORD'], src['NAME'], dst['HOST'], dst['PORT'], dst['USER'], dst['NAME']) if dst['PASSWORD']: cmdline += "-p" + dst['PASSWORD'] os.system(cmdline) return dst