Commit 93ae2bf9 authored by 赵威's avatar 赵威

Merge branch 'offic' into 'master'

Offic

See merge request !76
parents f00abab7 abb30700
...@@ -9,7 +9,7 @@ from pyspark import SparkConf ...@@ -9,7 +9,7 @@ from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_tag3_user_order_log, get_user_portrait_tag3_with_score, from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_tag3_user_order_log, get_user_portrait_tag3_with_score,
send_email, write_user_portrait) send_msg_to_dingtalk, write_user_portrait)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
...@@ -327,6 +327,7 @@ def update_tag3_user_portrait(cl_id): ...@@ -327,6 +327,7 @@ def update_tag3_user_portrait(cl_id):
def consume_kafka(): def consume_kafka():
start = datetime.datetime.now()
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
db, cursor = get_doris_prod() db, cursor = get_doris_prod()
cursor.execute(sql) cursor.execute(sql)
...@@ -360,22 +361,23 @@ def consume_kafka(): ...@@ -360,22 +361,23 @@ def consume_kafka():
# result2.foreach(print) # result2.foreach(print)
# result2.collect() # result2.collect()
end = datetime.datetime.now()
msg_dict = {"start": str(start), "end": str(end), "device_number": len(device_ids_lst)}
msg_res = ""
for (k, v) in msg_dict.items():
msg_res += str(k)
msg_res += ": "
msg_res += str(v)
msg_res += "\n"
send_msg_to_dingtalk(msg_res)
spark.stop() spark.stop()
except Exception as e: except Exception as e:
send_email("tag3_update_user_portrait_offline", "tag3_update_user_portrait_offline", e) send_msg_to_dingtalk("tag3_update_user_portrait_offline\n" + str(e))
if __name__ == "__main__": if __name__ == "__main__":
start = datetime.datetime.now() start = datetime.datetime.now()
# update_tag3_user_portrait("862460044588666")
# update_tag3_user_portrait("androidid_a25a1129c0b38f7b")
# cl_id = "864350041167473"
# df = get_tag3_user_log(cl_id)
# df[["projects", "business_tags"]]
# update_tag3_user_portrait(cl_id)
# cl_id = "867617044159377" # cl_id = "867617044159377"
# print(update_tag3_user_portrait(cl_id)) # print(update_tag3_user_portrait(cl_id))
......
...@@ -2,11 +2,15 @@ ...@@ -2,11 +2,15 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
import base64
import datetime import datetime
import hashlib
import hmac
import json import json
import smtplib import smtplib
import time import time
import traceback import traceback
import urllib
from email.mime.application import MIMEApplication from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
...@@ -16,6 +20,31 @@ import numpy as np ...@@ -16,6 +20,31 @@ import numpy as np
import pandas as pd import pandas as pd
import pymysql import pymysql
import redis import redis
import requests
def send_msg_to_dingtalk(msg, mobiles=[]):
try:
secret = "SECca234e669d42b8ee9d6aa73817457785be7556ee3b4c5249c31d2c3422732511"
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
headers = {"Content-Type": "application/json"}
data = {"msgtype": "text", "text": {"content": msg}}
if mobiles:
data = {"msgtype": "text", "text": {"content": msg}, "at": {"atMobiles": mobiles, "isAtAll": False}}
json_data = json.dumps(data)
url = "https://oapi.dingtalk.com/robot/send?access_token=5812943d740c3072c7cee3f32226e05caa5d3b7887592c4c11ae8004a6e2c6a4&sign={}&timestamp={}".format(
sign, timestamp)
response = requests.post(url=url, data=json_data, headers=headers)
return str(response.status_code) + " " + str(response.content.decode("utf-8"))
except Exception as e:
print(e)
return str(e)
def send_email(app, id, e): def send_email(app, id, e):
...@@ -444,9 +473,7 @@ def get_tag3_user_log(cl_id): ...@@ -444,9 +473,7 @@ def get_tag3_user_log(cl_id):
# select log_time, score_type, business_tags, event_cn from kafka_tag3_log where cl_id = '867617044159377' and event_cn in ('支付订单', '验证订单'); # select log_time, score_type, business_tags, event_cn from kafka_tag3_log where cl_id = '867617044159377' and event_cn in ('支付订单', '验证订单');
def get_tag3_user_order_log(cl_id): def get_tag3_user_order_log(cl_id):
columns = [ columns = ["log_time", "score_type", "business_tags", "event_cn"]
"log_time", "score_type", "business_tags", "event_cn"
]
try: try:
sql = """select log_time, score_type, business_tags, event_cn sql = """select log_time, score_type, business_tags, event_cn
from kafka_tag3_log from kafka_tag3_log
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment