Commit 301d62af authored by 张彦钊's avatar 张彦钊

change test file

parent 6f3c334f
......@@ -155,8 +155,8 @@ def feature_engineer():
print("unique_values length")
print(len(unique_values))
temp = list(range(2 + apps_number + level2_number + level3_number,
2 + apps_number + level2_number + level3_number + len(unique_values)))
temp = list(range(16 + apps_number + level2_number + level3_number,
16 + apps_number + level2_number + level3_number + len(unique_values)))
value_map = dict(zip(unique_values, temp))
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
......
# -*- coding: utf-8 -*-
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import numpy as np
# from pyspark.context import SparkContext
# from pyspark.conf import SparkConf
# import pytispark.pytispark as pti
# from pyspark.sql import SparkSession
import pandas as pd
import pymysql
from sqlalchemy import create_engine
def test():
......@@ -28,26 +31,48 @@ def test():
spark.sql(sql).show(6)
def some_function(x):
# Use the libraries to do work
return np.sin(x)**2 + 2
def con_sql(db,sql):
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
db.close()
return df
if __name__ == '__main__':
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \
# .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
# .set("spark.driver.maxResultSize", "8g")
#
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# spark.sparkContext.setLogLevel("WARN")
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# spark.sql(sql).show(6)
sql = "select level2_id,concat('t',treatment_method)," \
"concat('min',price_min),concat('max',price_max)," \
"concat('tr',treatment_time),concat('m',maintain_time)," \
"concat('r,',recover_time) from jerry_test.train_Knowledge_network_data"
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
df = con_sql(db, sql)
df.rename(columns={0: "level2_id", 1: "treatment_method",2:"price_min",3:"price_max",4:"treatment_time",
5:"maintain_time",6:"recover_time"})
print(df.head(6))
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df.to_sql('knowledge', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment