diff --git a/eda/smart_rank/dist_update_user_portrait_service.py b/eda/smart_rank/dist_update_user_portrait_service.py index dfb16833c4e510cb397bf020349d3150c5e5b4a3..640ec964da900852ccabfddff48257357449b419 100644 --- a/eda/smart_rank/dist_update_user_portrait_service.py +++ b/eda/smart_rank/dist_update_user_portrait_service.py @@ -17,187 +17,7 @@ import numpy as np import pandas as pd from pyspark.sql.functions import lit from pyspark.sql.functions import concat_ws - - -def send_email(app,id,e): - # 第三方 SMTP æœåŠ¡ - mail_host = 'smtp.exmail.qq.com' # 设置æœåС噍 - mail_user = "gaoyazhe@igengmei.com" # 用户å - mail_pass = "VCrKTui99a7ALhiK" # å£ä»¤ - - sender = 'gaoyazhe@igengmei.com' - receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,å¯è®¾ç½®ä¸ºä½ çš„QQ邮箱或者其他邮箱 - e = str(e) - msg = MIMEMultipart() - part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8') - msg.attach(part) - msg['From'] = formataddr(["gaoyazhe", sender]) - # 括å·é‡Œçš„å¯¹åº”æ”¶ä»¶äººé‚®ç®±æ˜µç§°ã€æ”¶ä»¶äººé‚®ç®±è´¦å· - msg['To'] = ";".join(receivers) - # message['Cc'] = ";".join(cc_reciver) - - msg['Subject'] = 'spark streaming:app_name:'+app - with open('error.txt','w') as f: - f.write(e) - f.close() - part = MIMEApplication(open('error.txt', 'r').read()) - part.add_header('Content-Disposition', 'attachment', filename="error.txt") - msg.attach(part) - - try: - smtpObj = smtplib.SMTP_SSL(mail_host, 465) - smtpObj.login(mail_user, mail_pass) - smtpObj.sendmail(sender, receivers, msg.as_string()) - except smtplib.SMTPException: - print('error') - - -def get_data_by_mysql(host, port, user, passwd, db, sql): - try: - db = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, cursorclass=pymysql.cursors.DictCursor) - cursor = db.cursor() - cursor.execute(sql) - results = cursor.fetchall() - db.close() - return results - except Exception as e: - print(e) - - -def get_all_search_word_and_synonym_tags(): - """ - :return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...} - """ - try: - sql = "select a.keyword , c.id from api_wordrel a " \ - "left join api_wordrelsynonym b on a.id = b.wordrel_id " \ - "left join api_tag c on b.word=c.name " \ - "where a.category in (1,13,10,11,12) and c.tag_type+0<'4'+0 and c.is_online=1" - mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) - result_dict = dict() - for data in mysql_results: - if data['keyword'] not in result_dict: - result_dict[data['keyword']] = [data['id']] - else: - result_dict[data['keyword']].append(data['id']) - return result_dict - except Exception as e: - print(e) - - -def get_all_synonym_tags(): - """ - :return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...} - """ - try: - sql = "select a.word, b.id from api_wordrelsynonym a left join api_tag b " \ - "on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1" - mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) - result_dict = dict() - for data in mysql_results: - if data['word'] not in result_dict: - result_dict[data['word']] = [data['id']] - else: - result_dict[data['word']].append(data['id']) - return result_dict - except Exception as e: - print(e) - - -def get_all_word_tags(): - try: - search_word_and_synonym_tags = get_all_search_word_and_synonym_tags() - synonym_tags = get_all_synonym_tags() - if search_word_and_synonym_tags and synonym_tags: - return {**synonym_tags, **search_word_and_synonym_tags} - except Exception as e: - print(e) - - -def get_all_tag_tag_type(): - """ - :return:dict {tag_id1:tag_type1,tag_id2:tag_type2...} - """ - try: - sql = "select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1" - mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) - result_dict = dict() - for data in mysql_results: - result_dict[data['id']] = data['tag_type'] - return result_dict - except Exception as e: - print(e) - - -def get_all_3tag_2tag(): - try: - sql = "select a.child_id,a.parent_id from api_tagrelation a" \ - " left join api_tag b on a.parent_id=b.id " \ - "where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) " \ - "and b.tag_type='2'" - mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) - result_dict = dict() - for data in mysql_results: - if data['child_id'] not in result_dict: - result_dict[data['child_id']] = [data['parent_id']] - else: - result_dict[data['child_id']].append(data['parent_id']) - return result_dict - except Exception as e: - print(e) - - -def get_tag2_from_tag3(tag3, all_3tag_2tag, user_log_df_tag2_list): - try: - tag2s = [] - if tag3 in all_3tag_2tag: - tag2s = all_3tag_2tag[tag3] - for tag2 in tag2s: - if tag2 in user_log_df_tag2_list: - return tag2 - return tag3 - except Exception as e: - print(e) - - -def compute_henqiang(x): - score = 15-x*((15-0.5)/180) - if score>0.5: - return score - else: - return 0.5 -def compute_jiaoqiang(x): - score = 12-x*(12/180) - if score>0.5: - return score - else: - return 0.5 -def compute_ruoyixiang(x): - score = 5-x*((5-0.5)/180) - if score>0.5: - return score - else: - return 0.5 -def compute_validate(x): - score = 10-x*((10-0.5)/180) - if score>0.5: - return score - else: - return 0.5 -def compute_ai_scan(x): - score = 2 - x * ((2 - 0.5) / 180) - if score>0.5: - return score - else: - return 0.5 -def get_action_tag_count(df, action_time): - try: - if not df[df['time'] == action_time].empty: - return len(df[df['time'] == action_time]) - else: - return 1 - except Exception as e: - print(e) +from eda.smart_rank.utils import * def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, size=10): diff --git a/eda/smart_rank/evaluation_metrics.py b/eda/smart_rank/evaluation_metrics.py index f52496763731bccc499aad05e2d0299cadf87b62..e3e1e129f1cc5ffe1d60016155bf1c6dd5735b1a 100644 --- a/eda/smart_rank/evaluation_metrics.py +++ b/eda/smart_rank/evaluation_metrics.py @@ -5,7 +5,7 @@ import time import json import numpy as np import pandas as pd -from eda.smart_rank.dist_update_user_portrait_service import * +from eda.smart_rank.utils import * def get_user_service_portrait_not_alipay(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2tag, size=10): diff --git a/eda/smart_rank/utils.py b/eda/smart_rank/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..4aa2fda1c3e346df2e9ac0536774bfa47f79f09f --- /dev/null +++ b/eda/smart_rank/utils.py @@ -0,0 +1,206 @@ +import pymysql +import redis +import datetime +import time +import json +import numpy as np +import pandas as pd + + +def send_email(app,id,e): + # 第三方 SMTP æœåŠ¡ + mail_host = 'smtp.exmail.qq.com' # 设置æœåС噍 + mail_user = "gaoyazhe@igengmei.com" # 用户å + mail_pass = "VCrKTui99a7ALhiK" # å£ä»¤ + + sender = 'gaoyazhe@igengmei.com' + receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,å¯è®¾ç½®ä¸ºä½ çš„QQ邮箱或者其他邮箱 + e = str(e) + msg = MIMEMultipart() + part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8') + msg.attach(part) + msg['From'] = formataddr(["gaoyazhe", sender]) + # 括å·é‡Œçš„å¯¹åº”æ”¶ä»¶äººé‚®ç®±æ˜µç§°ã€æ”¶ä»¶äººé‚®ç®±è´¦å· + msg['To'] = ";".join(receivers) + # message['Cc'] = ";".join(cc_reciver) + + msg['Subject'] = 'spark streaming:app_name:'+app + with open('error.txt','w') as f: + f.write(e) + f.close() + part = MIMEApplication(open('error.txt', 'r').read()) + part.add_header('Content-Disposition', 'attachment', filename="error.txt") + msg.attach(part) + + try: + smtpObj = smtplib.SMTP_SSL(mail_host, 465) + smtpObj.login(mail_user, mail_pass) + smtpObj.sendmail(sender, receivers, msg.as_string()) + except smtplib.SMTPException: + print('error') + + +def get_data_by_mysql(host, port, user, passwd, db, sql): + try: + db = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, cursorclass=pymysql.cursors.DictCursor) + cursor = db.cursor() + cursor.execute(sql) + results = cursor.fetchall() + db.close() + return results + except Exception as e: + print(e) + + +def get_all_search_word_synonym_tags(): + """ + :return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...} + """ + try: + sql = "select a.keyword , c.id from api_wordrel a " \ + "left join api_wordrelsynonym b on a.id = b.wordrel_id " \ + "left join api_tag c on b.word=c.name " \ + "where a.category in (1,13,10,11,12) and c.tag_type+0<'4'+0 and c.is_online=1" + mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) + result_dict = dict() + for data in mysql_results: + if data['keyword'] not in result_dict: + result_dict[data['keyword']] = [data['id']] + else: + result_dict[data['keyword']].append(data['id']) + return result_dict + except Exception as e: + print(e) + + +def get_all_synonym_tags(): + """ + :return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...} + """ + try: + sql = "select a.word, b.id from api_wordrelsynonym a left join api_tag b " \ + "on a.word=b.name where b.tag_type+0<'4'+0 and b.is_online=1" + mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) + result_dict = dict() + for data in mysql_results: + if data['word'] not in result_dict: + result_dict[data['word']] = [data['id']] + else: + result_dict[data['word']].append(data['id']) + return result_dict + except Exception as e: + print(e) + + +def get_all_api_tags(): + """ + :return:dict {"search_word1":[tag_list1],"search_word2":[tag_list2]...} + """ + try: + sql = "select name, id from api_tag where tag_type+0<'4'+0 and is_online=1" + mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) + result_dict = dict() + for data in mysql_results: + if data['name'] not in result_dict: + result_dict[data['name']] = [data['id']] + else: + result_dict[data['name']].append(data['id']) + return result_dict + except Exception as e: + print(e) + + +def get_all_word_tags(): + try: + search_word_synonym_tags = get_all_search_word_synonym_tags() + synonym_tags = get_all_synonym_tags() + api_tags = get_all_api_tags() + return {**search_word_synonym_tags, **synonym_tags, **api_tags} + except Exception as e: + print(e) + + +def get_all_tag_tag_type(): + """ + :return:dict {tag_id1:tag_type1,tag_id2:tag_type2...} + """ + try: + sql = "select id,tag_type from api_tag where tag_type+0<'4'+0 and is_online=1" + mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) + result_dict = dict() + for data in mysql_results: + result_dict[data['id']] = data['tag_type'] + return result_dict + except Exception as e: + print(e) + + +def get_all_3tag_2tag(): + try: + sql = "select a.child_id,a.parent_id from api_tagrelation a" \ + " left join api_tag b on a.parent_id=b.id " \ + "where a.child_id in (select id from api_tag where tag_type='3' and is_online=1) " \ + "and b.tag_type='2'" + mysql_results = get_data_by_mysql('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql) + result_dict = dict() + for data in mysql_results: + if data['child_id'] not in result_dict: + result_dict[data['child_id']] = [data['parent_id']] + else: + result_dict[data['child_id']].append(data['parent_id']) + return result_dict + except Exception as e: + print(e) + + +def get_tag2_from_tag3(tag3, all_3tag_2tag, user_log_df_tag2_list): + try: + tag2s = [] + if tag3 in all_3tag_2tag: + tag2s = all_3tag_2tag[tag3] + for tag2 in tag2s: + if tag2 in user_log_df_tag2_list: + return tag2 + return tag3 + except Exception as e: + print(e) + + +def compute_henqiang(x): + score = 15-x*((15-0.5)/180) + if score>0.5: + return score + else: + return 0.5 +def compute_jiaoqiang(x): + score = 12-x*(12/180) + if score>0.5: + return score + else: + return 0.5 +def compute_ruoyixiang(x): + score = 5-x*((5-0.5)/180) + if score>0.5: + return score + else: + return 0.5 +def compute_validate(x): + score = 10-x*((10-0.5)/180) + if score>0.5: + return score + else: + return 0.5 +def compute_ai_scan(x): + score = 2 - x * ((2 - 0.5) / 180) + if score>0.5: + return score + else: + return 0.5 +def get_action_tag_count(df, action_time): + try: + if not df[df['time'] == action_time].empty: + return len(df[df['time'] == action_time]) + else: + return 1 + except Exception as e: + print(e) \ No newline at end of file