Commit 22de0c8b authored by Apple's avatar Apple

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

parents f1c59911 ebdc3e98
...@@ -111,27 +111,27 @@ def feature_engineer(): ...@@ -111,27 +111,27 @@ def feature_engineer():
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_min from train_Knowledge_network_data" sql = "select distinct price_min from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_method from train_Knowledge_network_data" sql = "select distinct treatment_method from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_max from train_Knowledge_network_data" sql = "select distinct price_max from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_time from train_Knowledge_network_data" sql = "select distinct treatment_time from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct maintain_time from train_Knowledge_network_data" sql = "select distinct maintain_time from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct recover_time from train_Knowledge_network_data" sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql)) unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
...@@ -154,9 +154,11 @@ def feature_engineer(): ...@@ -154,9 +154,11 @@ def feature_engineer():
unique_values.extend(features) unique_values.extend(features)
print("unique_values length") print("unique_values length")
print(len(unique_values)) print(len(unique_values))
print("特征维度:")
print(apps_number + level2_number + level3_number + len(unique_values))
temp = list(range(2 + apps_number + level2_number + level3_number, temp = list(range(16 + apps_number + level2_number + level3_number,
2 + apps_number + level2_number + level3_number + len(unique_values))) 16 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp)) value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
...@@ -169,7 +171,7 @@ def feature_engineer(): ...@@ -169,7 +171,7 @@ def feature_engineer():
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \ "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
"left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \ "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
"left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \ "left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
"left join jerry_test.train_Knowledge_network_data k on feat.level2 = k.level2_id " \ "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
"left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \ "left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
"left join jerry_test.question_tag question on e.device_id = question.device_id " \ "left join jerry_test.question_tag question on e.device_id = question.device_id " \
"left join jerry_test.search_tag search on e.device_id = search.device_id " \ "left join jerry_test.search_tag search on e.device_id = search.device_id " \
...@@ -254,7 +256,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -254,7 +256,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \ "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.train_Knowledge_network_data k on feat.level2 = k.level2_id" "left join jerry_test.knowledge k on feat.level2 = k.level2_id"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id", "channel", "top", "time", "hospital_id",
......
This diff is collapsed.
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from pyspark.context import SparkContext # from pyspark.context import SparkContext
from pyspark.conf import SparkConf # from pyspark.conf import SparkConf
import pytispark.pytispark as pti # import pytispark.pytispark as pti
from pyspark.sql import SparkSession # from pyspark.sql import SparkSession
import numpy as np import pandas as pd
import pymysql
from sqlalchemy import create_engine
def test(): def test():
...@@ -28,26 +31,48 @@ def test(): ...@@ -28,26 +31,48 @@ def test():
spark.sql(sql).show(6) spark.sql(sql).show(6)
def some_function(x):
# Use the libraries to do work
return np.sin(x)**2 + 2
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
if __name__ == '__main__': if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ # sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ # .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \ # .set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \ # .set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \ # .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \ # .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g") # .set("spark.driver.maxResultSize", "8g")
#
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() # spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark) # ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test") # ti.tidbMapDatabase("jerry_test")
# spark.sparkContext.setLogLevel("WARN") # spark.sparkContext.setLogLevel("WARN")
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60" # sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# spark.sql(sql).show(6) # spark.sql(sql).show(6)
sql = "select level2_id,concat('t',treatment_method)," \
"concat('min',price_min),concat('max',price_max)," \
"concat('tr',treatment_time),concat('m',maintain_time)," \
"concat('r',recover_time) from jerry_test.train_Knowledge_network_data"
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
df = con_sql(db, sql)
df = df.rename(columns={0: "level2_id", 1: "treatment_method",2:"price_min",3:"price_max",4:"treatment_time",
5:"maintain_time",6:"recover_time"})
print(df.head(6))
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df.to_sql('knowledge', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment