migrate_redis.py 5.74 KB
import redis
from django.conf import settings
from django.core.management import BaseCommand


class Command(BaseCommand):
    def add_arguments(self, parser):
        parser.add_argument('-t', '--test', action='store_true',
                            help="Run test for redis migrate, a redis server should run on localhost:6379")
        parser.add_argument('-m', '--migrate', action='store_true', help="Redis data migrate")

    def handle(self, *args, **options):
        """
        redis migrate.
        :param args:
        :param options:
        :return:
        """
        if options['test']:
            self.migrate_test_v2(settings.REDIS_MIGRATE)

        elif options['migrate']:
            for rd in settings.REDIS_MIGRATE:
                for prefix in rd['prefix']:
                    self.migrate_v2(rd['source'], rd['target'], prefix=prefix)

    @staticmethod
    def migrate_template(**kwargs):
        """
        See redis MIGRATE command.
        """
        return "MIGRATE {dst_host} {dst_port} {key} {dst_db} {timeout} COPY KEYS {keys}".format(**kwargs)

    def migrate_v3(self, source, target, prefix=""):
        """
        适用于: redis >= 3.0.8
        :param source:
        :param target:
        :param prefix:
        :return:
        """
        self.stdout.write(self.style.SUCCESS(
            "Start to migrate from %s to %s, prefix: %s" % (
                "redis://" + source['host'] + ":" + str(source['port']) + "/" + str(source['db']),
                "redis://" + target['host'] + ":" + str(target['port']) + "/" + str(target['db']),
                prefix
            )
        ))
        rd = redis.StrictRedis(**source)
        target_rd = redis.StrictRedis(**target)
        for key in target_rd.keys(prefix + "*"):
            target_rd.delete(key.decode())

        binary_keys = rd.keys(prefix + "*")
        keys = ' '.join(key.decode() for key in binary_keys)

        command = self.migrate_template(
            dst_host=target['host'], dst_port=target['port'], dst_db=target['db'], key='', keys=keys, timeout=5000,
        )
        r = rd.execute_command(command)

        if r.decode() == 'OK':
            self.stdout.write(self.style.SUCCESS("Migrate success."))
        else:
            self.stdout.write(self.style.ERROR("Migrate failed: %s" % r))

    def migrate_v2(self, source, target, prefix=""):
        """
        适用于:redis >= 2.6
        :param source:
        :param target:
        :param prefix:
        :return:
        """
        self.stdout.write(self.style.SUCCESS(
            "Start to migrate from %s to %s, prefix: %s" % (
                "redis://" + source['host'] + ":" + str(source['port']) + "/" + str(source['db']),
                "redis://" + target['host'] + ":" + str(target['port']) + "/" + str(target['db']),
                prefix
            )
        ))
        src_rd = redis.StrictRedis(**source)
        dst_rd = redis.StrictRedis(**target)
        total = len(src_rd.keys(prefix + "*"))
        for key in src_rd.keys(prefix + "*"):
            try:
                """ redis> SET """
                v = src_rd.get(key)
                dst_rd.set(key, v)
            except redis.exceptions.ResponseError:
                """ redis> HSET """
                try:
                    v = src_rd.hgetall(key)
                    dst_rd.hmset(key, v)
                except redis.exceptions.ResponseError:
                    """redis> ZADD """
                    zv = src_rd.zrange(key, 0, -1, withscores=True)
                    v = {key.decode(): value for key, value in zv}
                    dst_rd.zadd(key, **v)

        self.stdout.write(self.style.SUCCESS("Migrate success: %s key." % total))

    def migrate_test_v3(self, redis_migrate):

        prefix = 'key_for_migrate_testing:'

        for conf in redis_migrate:

            rd1, rd2 = conf['source'], conf['target']

            rd = redis.StrictRedis(**rd1)
            for i in range(100):
                rd.set(prefix + str(i), i)
            assert len(rd.keys("key_for_migrate_testing:*")) == 100

            self.migrate_v3(rd1, rd2, prefix=prefix)
            assert len(rd.keys(prefix + "*")) == 100

            for k in rd.keys(prefix + "*"):
                rd.delete(k.decode())
            assert len(rd.keys(prefix + "*")) == 0

            rd = redis.StrictRedis(**rd2)
            assert len(rd.keys(prefix + "*")) == 100

            for k in rd.keys(prefix + "*"):
                rd.delete(k.decode())
            assert len(rd.keys(prefix + "*")) == 0

    def migrate_test_v2(self, redis_migrate):

        prefix = 'key_for_migrate_testing:'

        for conf in redis_migrate:
            rd1, rd2 = conf['source'], conf['target']
            src_rd = redis.StrictRedis(**rd1)
            dst_rd = redis.StrictRedis(**rd2)

            for key in src_rd.keys(prefix+"*"):
                src_rd.delete(key)
            for key in dst_rd.keys(prefix+"*"):
                dst_rd.delete(key)

            src_rd.set(prefix + "SET", 1)
            src_rd.hmset(prefix + "HMSET", {'a': 1, 'b': 1})
            src_rd.zadd(prefix + "ZADD", **{'a': 1, 'b': 2})

            assert len(src_rd.keys(prefix+"*")) == 3

            self.migrate_v2(rd1, rd2, prefix=prefix)

            assert len(dst_rd.keys(prefix+"*")) == 3
            assert dst_rd.get(prefix + "SET") == src_rd.get(prefix + "SET")
            assert dst_rd.hgetall(prefix + "HMSET") == src_rd.hgetall(prefix + "HMSET")
            assert dst_rd.zrange(prefix + "ZADD", 0, -1, withscores=True) == src_rd.zrange(prefix + "ZADD", 0, -1, withscores=True)

            for key in ("SET", "HMSET", "ZADD"):
                src_rd.delete(prefix + key)
                dst_rd.delete(prefix + key)

            assert len(src_rd.keys(prefix + "*")) == 0
            assert len(dst_rd.keys(prefix + "*")) == 0