Commit 0de7aa72 authored by 李延哲's avatar 李延哲

代码提交

parent 2f205213
......@@ -4,10 +4,13 @@ import json
from django.conf import settings
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent
from engine.redis_base import _PoolMinx
from engine.lazy_data_sync import LazyDataHandle
from settings.mapping import table_index
MYSQL_SETTINGS = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '12345678'}
kafka_setting = {}
last_record_time = _PoolMinx.get_last_record()
def listen_data():
......@@ -17,12 +20,12 @@ def listen_data():
blocking=True,
only_schemas=['demo'],
only_tables=["runoob_tbl"],
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
# producer = KafkaProducer(kafka_setting, topic_setting)
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
skip_to_timestamp=last_record_time)
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table}
event = {"schema": binlogevent.schema, "table": binlogevent.table, 'time': binlogevent.timestamp}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
......@@ -36,8 +39,14 @@ def listen_data():
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print(json.dumps(event))
if int(event.get("time") or 0) <= last_record_time:
#比较最后一条记录时间过滤
continue
#Todo同步到数据库
print(json.dumps(event))
table_index[event.get("table")].handle_data(event)
# LazyDataHandle.handle_data()
sys.stdout.flush()
stream.close()
\ No newline at end of file
import json
from elasticsearch import Elasticsearch, helpers
from django.conf import settings
ES_HOSTS = ['127.0.0.1:9200']
class ESPerform(object):
cli_obj = None
cli_info_list = settings.ES_HOSTS
cli_obj = Elasticsearch('127.0.0.1:9200')
cli_info_list = ES_HOSTS #settings.ES_HOSTS
@classmethod
def get_cli(cls, cli_info=None):
......@@ -14,39 +16,19 @@ class ESPerform(object):
'sniff_on_connection_fail': False,
}
es_cli_info = cli_info if cli_info else cls.cli_info_list
cls.cli_obj = Elasticsearch(hosts=es_cli_info, **init_args)
cls.cli_obj = Elasticsearch('127.0.0.1:9200')
return cls.cli_obj
except:
return None
@classmethod
def create_data(cls, index="", body={}):
cls.cli_obj.create(index=index, body=body)
@classmethod
def del_data(cls, index="", sql_id=""):
if not sql_id:
return
query = {'query': {'term': {'sql_id': sql_id}}}
result = cls.cli_obj.search(index=index, body=query)
if result.get('hits') and result['hits'].get('hits') and result['hits']['hits'][0]:
result = result['hits']['hits'][0]
cls.cli_obj.delete(index=index, id=result.get("_id"))
def lazy_batch(cls, body, index, doc_type):
"""批量处理 包括增删改"""
@classmethod
def update_data(cls, index="", sql_id="", body={}):
if not sql_id:
return
data = []
for item in body:
item = json.loads(item.decode("utf-8").replace("\'", "\""))
data.append(item)
query = {'query': {'term': {'sql_id': sql_id}}}
result = cls.cli_obj.search(index=index, body=query)
if result.get('hits') and result['hits'].get('hits') and result['hits']['hits'][0]:
result = result['hits']['hits'][0]
if not result.get("_id"):
return
cls.cli_obj.update(index="my_index", id=result.get("_id"), body={"doc": body})
@classmethod
def lazy_create(cls, body):
helpers.bulk(cls.cli_obj, body)
\ No newline at end of file
cls.cli_obj.bulk(index=index, doc_type=doc_type, body=data)
\ No newline at end of file
from engine.lazy_data_sync import LazyDataHandle
from engine.redis_base import LazyRecord
class LocalDemo(LazyDataHandle):
@classmethod
def insert_data(cls, data):
table_name = data.get("table")
if not table_name:
return
values = data.get('values')
in_value = [{
"create": {
"_index": table_name,
"_type": table_name,
"_id": values.get("runoob_id")
}}, {
"title": values.get("runoob_title", "") or ""
}]
redis_cli = LazyRecord(model_name=table_name)
redis_cli.handle(data=in_value, index=table_name, doc_type=table_name)
@classmethod
def update_data(cls, data):
table_name = data.get("table")
if not table_name:
return
values = data.get('after_values')
update_value = [{
"update": {
"_id": values.get("runoob_id"),
"_index": table_name,
"_type": table_name,
}
}, {
"doc": {
"title": values.get("runoob_title") or ""
}
}]
redis_cli = LazyRecord(model_name=table_name)
redis_cli.handle(data=update_value, index=table_name, doc_type=table_name)
@classmethod
def del_data(cls, data):
table_name = data.get("table")
if not table_name:
return
values = data.get('values')
out_value = [{
"delete": {
"_index": table_name,
"_id": values.get("runoob_id"),
"_type": table_name,
}
}]
redis_cli = LazyRecord(model_name=table_name)
redis_cli.handle(data=out_value, index=table_name, doc_type=table_name)
\ No newline at end of file
......@@ -8,11 +8,30 @@ from engine.es_base import ESPerform
class LazyDataHandle(object):
redis_cli = _PoolMinx()
actions = ['insert', 'update', 'delete']
@classmethod
def insert(cls, l_key, data):
#插入数据
def insert_data(cls, data):
pass
result = cls.redis_cli.client.get(name=l_key) # 拿到全部数据
ESPerform.lazy_create(body=result)
return
\ No newline at end of file
@classmethod
def del_data(cls, data):
pass
@classmethod
def update_data(cls, data):
pass
@classmethod
def handle_data(cls, data):
if data.get("action") not in cls.actions:
return
if data.get("action") == "insert":
return cls.insert_data(data)
if data.get("action") == "update":
return cls.update_data(data)
if data.get("action") == "delete":
return cls.del_data(data)
......@@ -3,6 +3,7 @@
import redis
import json
from django.conf import settings
from engine.es_base import ESPerform
class _PoolMinx(object):
......@@ -17,6 +18,15 @@ class _PoolMinx(object):
self.client.delete(key)
return
@classmethod
def update_last_record(cls, time_number):
cls.client.set("last_update_time", time_number)
@classmethod
def get_last_record(cls):
last_time = cls.client.get("last_update_time")
return int(last_time or 0)
class LazyRecord(_PoolMinx):
def __init__(self, model_name):
......@@ -24,13 +34,23 @@ class LazyRecord(_PoolMinx):
self._index = model_name
self.model_name = self.prefix + model_name
def insert(self, data):
def handle(self, data, index="", doc_type=""):
"""插入一条数据"""
if not data:
return
data = json.dumps(data)
self.client.rpush(self.model_name, json.dumps(data))
value_list = []
for item in data:
value_list.append(item)
value_list = tuple(value_list)
self.client.rpush(self.model_name, *value_list)
data_lenth = self.get_len()
if data_lenth >= 1:
in_data = self.pop_data()
ESPerform.lazy_batch(in_data, index, doc_type)
return
def get_data(self):
......@@ -57,4 +77,4 @@ class LazyRecord(_PoolMinx):
def push_data(self, data):
"""EXCEPTION TO DO"""
for item in data:
self.client.lpush(self.model_name, item)
\ No newline at end of file
self.client.lpush(self.model_name, item)
#ES映射到具体的Mysql
from engine.heras_transfer import LocalDemo
table_index = {
"runoob_tbl": LocalDemo
}
\ No newline at end of file
......@@ -121,4 +121,30 @@ STATIC_URL = '/static/'
from .settings_local import *
from engine.data_monitor import listen_data
MYSQL_SETTINGS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '12345678'
}
ES_HOSTS = ['127.0.0.1']
REDIS = {
'default': {'host': '127.0.0.1', 'port': 6379, 'db': 0},
'es_lazydata': {'host': '127.0.0.1', 'port': 6379, 'db': 1}
}
LazyLen = 100
SENTRY_CELERY_ENDPOINT = ""
BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_TASK_SERIALIZER = "json"
listen_data()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment