Commit 2f205213 authored by 李延哲's avatar 李延哲

数据同步搭建

parent 1dabb2e9
......@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Python 3.6 (poseidon)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
import os
import raven
from celery import Celery
from django.conf import settings
from raven.contrib.celery import register_signal, register_logger_signal
# set the default Django settings module for the 'celery' program.
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.settings')
class _Celery(Celery):
"""wrap for celery.Celery."""
def on_configure(self):
# check if sentry settings provided
if not settings.SENTRY_CELERY_ENDPOINT:
return
client = raven.Client(settings.SENTRY_CELERY_ENDPOINT)
# register a custom filter to filter out duplicate logs
register_logger_signal(client)
# hook into the Celery error handler
register_signal(client)
app = _Celery('saturn_tasks')
app.conf.task_default_queue = 'saturn_tasks'
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
import sys
import json
from django.conf import settings
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent
MYSQL_SETTINGS = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '12345678'}
kafka_setting = {}
def listen_data():
stream = BinLogStreamReader(
connection_settings=settings.MYSQL_SETTINGS,
server_id=3,
blocking=True,
only_schemas=['demo'],
only_tables=["runoob_tbl"],
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
# producer = KafkaProducer(kafka_setting, topic_setting)
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print(json.dumps(event))
sys.stdout.flush()
stream.close()
\ No newline at end of file
from elasticsearch import Elasticsearch, helpers
from django.conf import settings
class ESPerform(object):
cli_obj = None
cli_info_list = settings.ES_HOSTS
@classmethod
def get_cli(cls, cli_info=None):
try:
init_args = {
'sniff_on_start': False,
'sniff_on_connection_fail': False,
}
es_cli_info = cli_info if cli_info else cls.cli_info_list
cls.cli_obj = Elasticsearch(hosts=es_cli_info, **init_args)
return cls.cli_obj
except:
return None
@classmethod
def create_data(cls, index="", body={}):
cls.cli_obj.create(index=index, body=body)
@classmethod
def del_data(cls, index="", sql_id=""):
if not sql_id:
return
query = {'query': {'term': {'sql_id': sql_id}}}
result = cls.cli_obj.search(index=index, body=query)
if result.get('hits') and result['hits'].get('hits') and result['hits']['hits'][0]:
result = result['hits']['hits'][0]
cls.cli_obj.delete(index=index, id=result.get("_id"))
@classmethod
def update_data(cls, index="", sql_id="", body={}):
if not sql_id:
return
query = {'query': {'term': {'sql_id': sql_id}}}
result = cls.cli_obj.search(index=index, body=query)
if result.get('hits') and result['hits'].get('hits') and result['hits']['hits'][0]:
result = result['hits']['hits'][0]
if not result.get("_id"):
return
cls.cli_obj.update(index="my_index", id=result.get("_id"), body={"doc": body})
@classmethod
def lazy_create(cls, body):
helpers.bulk(cls.cli_obj, body)
\ No newline at end of file
# -*- coding: utf-8 -*-
import redis
import json
from django.conf import settings
from engine.redis_base import _PoolMinx
from engine.es_base import ESPerform
class LazyDataHandle(object):
redis_cli = _PoolMinx()
@classmethod
def insert(cls, l_key, data):
#插入数据
result = cls.redis_cli.client.get(name=l_key) # 拿到全部数据
ESPerform.lazy_create(body=result)
return
\ No newline at end of file
# -*- coding: utf-8 -*-
import redis
import json
from django.conf import settings
class _PoolMinx(object):
pool = redis.ConnectionPool(**settings.REDIS['es_lazydata'])
client = redis.Redis(connection_pool=pool)
def get_lazykey(self):
keys_list = self.client.keys("lazydata__*")
return [item.decode('utf-8') for item in keys_list]
def del_key_data(self, key):
self.client.delete(key)
return
class LazyRecord(_PoolMinx):
def __init__(self, model_name):
self.prefix = "lazydata__"
self._index = model_name
self.model_name = self.prefix + model_name
def insert(self, data):
"""插入一条数据"""
if not data:
return
data = json.dumps(data)
self.client.rpush(self.model_name, json.dumps(data))
return
def get_data(self):
result = self.client.lrange(name=self.model_name, start=0, end=-1) # 拿到全部数据
return result
def get_len(self):
result_len = self.client.llen(name=self.model_name)
return result_len
def clear_data(self):
"""清空数据"""
self.client.delete(self.model_name)
return
def pop_data(self, pop_count=1):
"""pop数据"""
result = self.client.lrange(name=self.model_name, start=0, end=pop_count)
for item in result:
self.client.lpop(name=self.model_name)
return result
def push_data(self, data):
"""EXCEPTION TO DO"""
for item in data:
self.client.lpush(self.model_name, item)
\ No newline at end of file
import requests
import json
from celery import shared_task
from django.conf import settings
from engine.redis_base import LazyRecord, _PoolMinx
from engine.es_base import ESPerform
@shared_task
def data_sync():
redis_pool = _PoolMinx()
key_list = redis_pool.get_lazykey()
for lazy_key in key_list:
lazy_cli = LazyRecord(model_name=lazy_key)
lazy_data = lazy_cli.pop_data(pop_count=settings.LazyLen)
try:
ESPerform.lazy_create(lazy_data)
except:
lazy_cli.push_data(lazy_data)
......@@ -118,3 +118,7 @@ USE_TZ = True
# https://docs.djangoproject.com/en/1.9/howto/static-files/
STATIC_URL = '/static/'
from .settings_local import *
from engine.data_monitor import listen_data
listen_data()
MYSQL_SETTINGS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '12345678'
}
ES_HOSTS = ['127.0.0.1']
REDIS = {
'default': {'host': '127.0.0.1', 'port': 6379, 'db': 0},
'es_lazydata': {'host': '127.0.0.1', 'port': 6379, 'db': 1}
}
LazyLen = 100
SENTRY_CELERY_ENDPOINT = ""
BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_TASK_SERIALIZER = "json"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment