feature_engineering.py 20.5 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1
# -*- coding: utf-8 -*-
张彦钊's avatar
张彦钊 committed
2
import pymysql
张彦钊's avatar
张彦钊 committed
3 4 5
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
张彦钊's avatar
张彦钊 committed
6
import datetime
张彦钊's avatar
张彦钊 committed
7
import pandas as pd
张彦钊's avatar
张彦钊 committed
8
import time
张彦钊's avatar
张彦钊 committed
9
from pyspark import StorageLevel
张彦钊's avatar
张彦钊 committed
10 11


张彦钊's avatar
张彦钊 committed
12 13 14 15 16 17 18 19 20
def app_list_func(x,l):
    b = str(x).split(",")
    e = []
    for i in b:
        if i in l.keys():
            e.append(l[i])
        else:
            e.append(0)
    return e
张彦钊's avatar
张彦钊 committed
21

张彦钊's avatar
张彦钊 committed
22

张彦钊's avatar
张彦钊 committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
def get_list(db,sql,n):
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    v = list(set([i[0] for i in result]))
    app_list_value = [str(i).split(",") for i in v]
    app_list_unique = []
    for i in app_list_value:
        app_list_unique.extend(i)
    app_list_unique = list(set(app_list_unique))
    number = len(app_list_unique)
    app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
    db.close()
    return number, app_list_map

张彦钊's avatar
张彦钊 committed
38

张彦钊's avatar
张彦钊 committed
39
def get_map():
40
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
41 42
    sql = "select app_list from device_app_list"
    a = time.time()
张彦钊's avatar
张彦钊 committed
43
    apps_number, app_list_map = get_list(db,sql,16)
张彦钊's avatar
张彦钊 committed
44 45
    print("applist")
    print((time.time()-a)/60)
46
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
47 48
    sql = "select level2_ids from diary_feat"
    b = time.time()
张彦钊's avatar
张彦钊 committed
49
    leve2_number, leve2_map = get_list(db, sql, 16+apps_number)
张彦钊's avatar
张彦钊 committed
50 51
    print("leve2")
    print((time.time() - b) / 60)
52
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
53 54
    sql = "select level3_ids from diary_feat"
    c = time.time()
张彦钊's avatar
张彦钊 committed
55
    leve3_number, leve3_map = get_list(db, sql, 16+leve2_number+apps_number)
张彦钊's avatar
张彦钊 committed
56
    print((time.time() - c) / 60)
张彦钊's avatar
张彦钊 committed
57 58 59 60 61 62 63 64 65 66 67 68 69
    return apps_number, app_list_map,leve2_number, leve2_map,leve3_number, leve3_map


def get_unique(db,sql):
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    v = list(set([i[0] for i in result]))
    db.close()
    print(sql)
    print(len(v))
    return v

张彦钊's avatar
张彦钊 committed
70 71 72 73 74 75 76
def con_sql(db,sql):
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    df = pd.DataFrame(list(result))
    db.close()
    return df
张彦钊's avatar
张彦钊 committed
77

张彦钊's avatar
张彦钊 committed
78

79
def get_pre_number():
80
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
81 82 83 84 85 86 87 88 89
    sql = "select count(*) from esmm_pre_data"
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchone()[0]
    print("预测集数量:")
    print(result)
    db.close()


张彦钊's avatar
张彦钊 committed
90
def feature_engineer():
张彦钊's avatar
张彦钊 committed
91
    apps_number, app_list_map, level2_number, leve2_map, level3_number, leve3_map = get_map()
张彦钊's avatar
张彦钊 committed
92 93 94 95 96 97 98 99 100 101 102 103 104
    app_list_map["app_list"] = 16
    leve3_map["level3_ids"] = 17
    leve3_map["search_tag3"] = 18
    leve2_map["level2_ids"] = 19
    leve2_map["tag1"] = 20
    leve2_map["tag2"] = 21
    leve2_map["tag3"] = 22
    leve2_map["tag4"] = 23
    leve2_map["tag5"] = 24
    leve2_map["tag6"] = 25
    leve2_map["tag7"] = 26
    leve2_map["search_tag2"] = 27

张彦钊's avatar
张彦钊 committed
105
    unique_values = []
106
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
107
    sql = "select distinct stat_date from esmm_train_data_dwell"
张彦钊's avatar
张彦钊 committed
108
    unique_values.extend(get_unique(db,sql))
张彦钊's avatar
张彦钊 committed
109

110
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
111
    sql = "select distinct ucity_id from esmm_train_data_dwell"
张彦钊's avatar
张彦钊 committed
112
    unique_values.extend(get_unique(db, sql))
张彦钊's avatar
张彦钊 committed
113

114
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
115
    sql = "select distinct ccity_name from esmm_train_data_dwell"
张彦钊's avatar
张彦钊 committed
116
    unique_values.extend(get_unique(db, sql))
张彦钊's avatar
张彦钊 committed
117

118
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
119 120
    sql = "select distinct time from cid_time_cut"
    unique_values.extend(get_unique(db, sql))
张彦钊's avatar
张彦钊 committed
121

122
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
123 124 125
    sql = "select distinct device_type from user_feature"
    unique_values.extend(get_unique(db, sql))

126
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
127 128 129
    sql = "select distinct manufacturer from user_feature"
    unique_values.extend(get_unique(db, sql))

130
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
131 132 133
    sql = "select distinct channel from user_feature"
    unique_values.extend(get_unique(db, sql))

134
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
135 136 137
    sql = "select distinct top from cid_type_top"
    unique_values.extend(get_unique(db, sql))

138
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
139 140 141
    sql = "select distinct price_min from knowledge"
    unique_values.extend(get_unique(db, sql))

142
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
143 144 145
    sql = "select distinct treatment_method from knowledge"
    unique_values.extend(get_unique(db, sql))

146
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
147 148 149
    sql = "select distinct price_max from knowledge"
    unique_values.extend(get_unique(db, sql))

150
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
151 152 153
    sql = "select distinct treatment_time from knowledge"
    unique_values.extend(get_unique(db, sql))

154
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
155 156 157
    sql = "select distinct maintain_time from knowledge"
    unique_values.extend(get_unique(db, sql))

158
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
159 160 161 162 163
    sql = "select distinct recover_time from knowledge"
    unique_values.extend(get_unique(db, sql))

    # unique_values.append("video")

164
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
张彦钊's avatar
张彦钊 committed
165
    sql = "select max(stat_date) from esmm_train_data_dwell"
张彦钊's avatar
张彦钊 committed
166 167 168
    validate_date = con_sql(db, sql)[0].values.tolist()[0]
    print("validate_date:" + validate_date)
    temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
张彦钊's avatar
张彦钊 committed
169
    start = (temp - datetime.timedelta(days=180)).strftime("%Y-%m-%d")
张彦钊's avatar
张彦钊 committed
170
    print(start)
张彦钊's avatar
张彦钊 committed
171

172
    db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
张彦钊's avatar
张彦钊 committed
173
    sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
张彦钊's avatar
张彦钊 committed
174 175 176 177 178 179
          "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
          "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
          "where e.stat_date >= '{}'".format(start)
    unique_values.extend(get_unique(db, sql))
    features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
                "channel", "top", "time", "stat_date", "hospital_id",
张彦钊's avatar
张彦钊 committed
180 181
                "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
                "app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
182
                "search_tag2", "search_tag3"]
张彦钊's avatar
张彦钊 committed
183 184 185
    unique_values.extend(features)
    print("unique_values length")
    print(len(unique_values))
张彦钊's avatar
张彦钊 committed
186 187
    print("特征维度:")
    print(apps_number + level2_number + level3_number + len(unique_values))
张彦钊's avatar
张彦钊 committed
188

189 190
    temp = list(range(29 + apps_number + level2_number + level3_number,
                      29 + apps_number + level2_number + level3_number + len(unique_values)))
张彦钊's avatar
张彦钊 committed
191 192
    value_map = dict(zip(unique_values, temp))

193 194
    sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
          "u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
张彦钊's avatar
张彦钊 committed
195 196 197
          "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
          "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
          "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
198
          "e.device_id,e.cid_id " \
张彦钊's avatar
张彦钊 committed
199
          "from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
200 201 202 203
          "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
          "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
          "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
          "left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
张彦钊's avatar
张彦钊 committed
204 205 206 207 208 209 210 211
          "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
          "left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
          "left join jerry_test.question_tag question on e.device_id = question.device_id " \
          "left join jerry_test.search_tag search on e.device_id = search.device_id " \
          "left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
          "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
          "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
          "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
212 213 214 215 216 217 218 219
          "left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
          "left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
          "left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
          "where e.stat_date >= '{}'".format(start)

    df = spark.sql(sql)

    df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
张彦钊's avatar
张彦钊 committed
220
                             "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
221
                             "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
222 223 224 225

    df = df.na.fill(dict(zip(features, features)))

    rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
张彦钊's avatar
张彦钊 committed
226
                    "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
227
                    "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
张彦钊's avatar
张彦钊 committed
228
                    "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
229
                    "maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
230
        .rdd.repartition(200).map(
张彦钊's avatar
张彦钊 committed
231 232 233 234 235 236 237 238
        lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
                   app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
                   app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
                   app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
                   [value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4),
                    value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
                    value_map.get(x[20], 9), value_map.get(x[21], 10),
                    value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
239
                    value_map.get(x[25], 14), value_map.get(x[26], 15)],
张彦钊's avatar
张彦钊 committed
240
                   app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
241 242 243 244 245 246 247
                   ))


    rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)

    # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集

张彦钊's avatar
张彦钊 committed
248
    train = rdd.map(
249
        lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
张彦钊's avatar
张彦钊 committed
250
                   x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
251
    f = time.time()
张彦钊's avatar
张彦钊 committed
252 253 254
    spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
                                      "tag1_list", "tag2_list", "tag3_list", "tag4_list",
                                      "tag5_list", "tag6_list", "tag7_list", "ids",
255 256 257 258 259 260 261 262 263
                                      "search_tag2_list","search_tag3_list","city","cid_id","uid") \
        .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
    h = time.time()
    print("train tfrecord done")
    print((h - f) / 60)

    print("训练集样本总量:")
    print(rdd.count())

张彦钊's avatar
张彦钊 committed
264
    get_pre_number()
265 266 267

    test = rdd.filter(lambda x: x[0] == validate_date).map(
        lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
张彦钊's avatar
张彦钊 committed
268
                   x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
269

张彦钊's avatar
张彦钊 committed
270 271 272 273
    spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
                                     "tag1_list", "tag2_list", "tag3_list", "tag4_list",
                                     "tag5_list", "tag6_list", "tag7_list", "ids",
                                     "search_tag2_list","search_tag3_list","city","cid_id","uid") \
274 275 276 277 278
        .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")

    print("va tfrecord done")

    rdd.unpersist()
张彦钊's avatar
张彦钊 committed
279

张彦钊's avatar
张彦钊 committed
280
    return validate_date, value_map, app_list_map, leve2_map, leve3_map
张彦钊's avatar
张彦钊 committed
281

张彦钊's avatar
i  
张彦钊 committed
282

张彦钊's avatar
张彦钊 committed
283
def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
张彦钊's avatar
张彦钊 committed
284 285
    sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
          "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
张彦钊's avatar
张彦钊 committed
286 287 288
          "dl.app_list,e.hospital_id,feat.level3_ids," \
          "wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
          "ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
289
          "k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
290
          "from jerry_test.esmm_pre_data e " \
张彦钊's avatar
张彦钊 committed
291 292 293 294 295
          "left join jerry_test.user_feature u on e.device_id = u.device_id " \
          "left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
          "left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
          "left join jerry_test.device_app_list dl on e.device_id = dl.device_id " \
          "left join jerry_test.diary_feat feat on e.cid_id = feat.diary_id " \
张彦钊's avatar
张彦钊 committed
296 297 298 299 300 301 302 303
          "left join jerry_test.wiki_tag wiki on e.device_id = wiki.device_id " \
          "left join jerry_test.question_tag question on e.device_id = question.device_id " \
          "left join jerry_test.search_tag search on e.device_id = search.device_id " \
          "left join jerry_test.budan_tag budan on e.device_id = budan.device_id " \
          "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
          "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
          "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
          "left join jerry_test.knowledge k on feat.level2 = k.level2_id " \
304
          "left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date"
张彦钊's avatar
张彦钊 committed
305

张彦钊's avatar
张彦钊 committed
306 307
    features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
                "channel", "top", "time", "hospital_id",
张彦钊's avatar
张彦钊 committed
308 309
                "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time",
                "app_list", "level3_ids", "level2_ids", "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
310
                "search_tag2", "search_tag3"]
张彦钊's avatar
张彦钊 committed
311 312

    df = spark.sql(sql)
张彦钊's avatar
张彦钊 committed
313
    df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
张彦钊's avatar
张彦钊 committed
314

张彦钊's avatar
张彦钊 committed
315
    df = df.na.fill(dict(zip(features, features)))
张彦钊's avatar
张彦钊 committed
316
    f = time.time()
张彦钊's avatar
张彦钊 committed
317
    rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
张彦钊's avatar
张彦钊 committed
318
                    "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
张彦钊's avatar
张彦钊 committed
319
                    "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
张彦钊's avatar
张彦钊 committed
320
                    "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
321
                    "maintain_time", "recover_time", "search_tag2", "search_tag3") \
张彦钊's avatar
张彦钊 committed
322 323
        .rdd.repartition(200).map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4], x[5],
                                             app_list_func(x[6], app_list_map), app_list_func(x[7], leve2_map),
张彦钊's avatar
张彦钊 committed
324 325 326 327 328 329 330 331 332 333 334
                                             app_list_func(x[8], leve3_map), app_list_func(x[9], leve2_map),
                                             app_list_func(x[10], leve2_map), app_list_func(x[11], leve2_map),
                                             app_list_func(x[12], leve2_map), app_list_func(x[13], leve2_map),
                                             app_list_func(x[14], leve2_map), app_list_func(x[15], leve2_map),
                                             [value_map.get(date, 1), value_map.get(x[16], 2),
                                              value_map.get(x[17], 3), value_map.get(x[18], 4),
                                              value_map.get(x[19], 5), value_map.get(x[20], 6),
                                              value_map.get(x[21], 7), value_map.get(x[22], 8),
                                              value_map.get(x[23], 9), value_map.get(x[24], 10),
                                              value_map.get(x[25], 11), value_map.get(x[26], 12),
                                              value_map.get(x[27], 13), value_map.get(x[28], 14),
335
                                              value_map.get(x[29], 15)],
张彦钊's avatar
张彦钊 committed
336
                                             app_list_func(x[30], leve2_map),app_list_func(x[31], leve3_map)))
张彦钊's avatar
张彦钊 committed
337

张彦钊's avatar
张彦钊 committed
338
    rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
339
    print("预测集样本大小:")
张彦钊's avatar
张彦钊 committed
340
    print(rdd.count())
341
    if rdd.filter(lambda x: x[0] == 0).count() > 0:
342
        print("预测集native有数据")
343
        spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
张彦钊's avatar
张彦钊 committed
344 345 346 347
                              .map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
                                              x[12], x[13], x[14], x[15], x[16], x[17], x[18], x[3], x[4], x[5]))) \
            .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list",
                  "tag4_list","tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list",
348 349 350
                  "search_tag3_list", "city", "uid","cid_id") \
            .repartition(1).write.format("tfrecords").save(path=path + "native/", mode="overwrite")

351 352 353 354 355 356
        print("native tfrecord done")
        h = time.time()
        print((h - f) / 60)
    else:
        print("预测集native为空")

357
    if rdd.filter(lambda x: x[0] == 1).count() > 0:
358
        print("预测集nearby有数据")
359
        spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
张彦钊's avatar
张彦钊 committed
360 361 362 363 364
                              .map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
                                              x[12], x[13], x[14], x[15], x[16], x[17], x[18], x[3], x[4], x[5]))) \
            .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list",
                  "tag4_list","tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list",
                  "search_tag3_list", "city", "uid", "cid_id")\
365
            .repartition(1).write.format("tfrecords").save(path=path + "nearby/", mode="overwrite")
366 367 368
        print("nearby tfrecord done")
    else:
        print("预测集nearby为空")
张彦钊's avatar
张彦钊 committed
369

张彦钊's avatar
张彦钊 committed
370

张彦钊's avatar
张彦钊 committed
371
if __name__ == '__main__':
张彦钊's avatar
张彦钊 committed
372 373 374 375 376
    sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
        .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
        .set("spark.tispark.plan.allow_index_double_read", "false") \
        .set("spark.tispark.plan.allow_index_read", "true") \
        .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
377
        .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf")\
张彦钊's avatar
张彦钊 committed
378 379 380
        .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")

    spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
张彦钊's avatar
张彦钊 committed
381 382 383
    ti = pti.TiContext(spark)
    ti.tidbMapDatabase("jerry_test")
    ti.tidbMapDatabase("eagle")
张彦钊's avatar
张彦钊 committed
384 385 386 387
    spark.sparkContext.setLogLevel("WARN")
    path = "hdfs:///strategy/esmm/"
    local_path = "/home/gmuser/esmm/"

张彦钊's avatar
张彦钊 committed
388 389
    validate_date, value_map, app_list_map, leve2_map, leve3_map = feature_engineer()
    get_predict(validate_date, value_map, app_list_map, leve2_map, leve3_map)
张彦钊's avatar
张彦钊 committed
390

张彦钊's avatar
张彦钊 committed
391
    spark.stop()