Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
M
meta_base_code
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
宋柯
meta_base_code
Commits
69e3d529
Commit
69e3d529
authored
Nov 23, 2020
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
29af2e6c
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
593 additions
and
26 deletions
+593
-26
spark_temp.py
task/spark_temp.py
+556
-0
spark_test.py
task/spark_test.py
+37
-26
No files found.
task/spark_temp.py
0 → 100644
View file @
69e3d529
# -*- coding:UTF-8 -*-
# @Time : 2020/11/23 16:15
# @File : spark_temp.py
# @email : litao@igengmei.com
# @author : litao
# -*- coding:UTF-8 -*-
# @Time : 2020/9/16 17:41
# @File : new_user_has_protratit_rate.py
# @email : litao@igengmei.com
# @author : litao
import
hashlib
import
json
import
pymysql
import
xlwt
,
datetime
import
redis
# from pyhive import hive
from
maintenance.func_send_email_with_file
import
send_file_email
from
typing
import
Dict
,
List
from
elasticsearch_7
import
Elasticsearch
from
elasticsearch_7.helpers
import
scan
import
sys
import
time
from
pyspark
import
SparkConf
from
pyspark.sql
import
SparkSession
,
DataFrame
from
meta_base_code.utils.func_from_redis_get_portrait
import
*
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
def
con_sql
(
sql
):
# 从数据库的表里获取数据
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'aqpuBLYzEV7tML5RPsN1pntUzFy'
,
db
=
'jerry_prod'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
db
.
close
()
return
result
startTime
=
time
.
time
()
sparkConf
=
SparkConf
()
sparkConf
.
set
(
"spark.sql.crossJoin.enabled"
,
True
)
sparkConf
.
set
(
"spark.debug.maxToStringFields"
,
"100"
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
False
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_read"
,
True
)
sparkConf
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
True
)
sparkConf
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
True
)
sparkConf
.
set
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
sparkConf
.
set
(
"mapreduce.output.fileoutputformat.compress"
,
False
)
sparkConf
.
set
(
"mapreduce.map.output.compress"
,
False
)
sparkConf
.
set
(
"prod.gold.jdbcuri"
,
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.mimas.jdbcuri"
,
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.gaia.jdbcuri"
,
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tidb.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.jerry.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.158:2379"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.170:4000"
)
sparkConf
.
set
(
"prod.tidb.database"
,
"jerry_prod"
)
sparkConf
.
setAppName
(
"new_user_has_protratit_rate"
)
spark
=
(
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
config
(
"spark.sql.extensions"
,
"org.apache.spark.sql.TiExtensions"
)
.
config
(
"spark.tispark.pd.addresses"
,
"172.16.40.170:2379"
)
.
enableHiveSupport
()
.
getOrCreate
())
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar"
)
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'"
)
task_list
=
[]
task_days
=
3
device_id_list
=
[
"861239046925022"
,
"864326031909959"
,
"AE0CF229-CA42-4FB9-A556-922BB92D4DDA"
,
"androidid_b706db3c7857e0e1"
,
"23E28D73-4431-414F-A5F5-9C763E65AE79"
,
"E1558B57-6B34-4CE9-ACD5-A3FF480217FD"
,
"androidid_48215e9a90ba2fa4"
,
"androidid_ffef45f900806daa"
,
"androidid_19285fe044f291ca"
,
"B196E016-BE27-4794-8F48-CB077807C7EA"
,
"androidid_8f49c518e71596c3"
,
"3C3BBCCB-2329-46A4-BC52-60C691E0864F"
,
"A0000075BBCC80"
,
"androidid_39e90320915dbff8"
,
"864318033310070"
,
"869741043800126"
,
"androidid_a70352493bbf848e"
,
"864282030853674"
,
"SLYDU17B17000998"
,
"867085034663818"
,
"2BB819FD-846E-4E0D-ABBF-10D9B31B46DB"
,
"868082033810333"
,
"861918387179949"
,
"F68190C3-A4F7-4F77-B92B-D1675A91AA04"
,
"androidid_66f747ff8b5ad294"
,
"09F9B8DE-3217-4AA2-AD6E-B69E37639C98"
,
"865676035722991"
,
"869880027200059"
,
"androidid_d07e372946d51fde"
,
"androidid_f98f737876f770c5"
,
"860627047292130"
,
"androidid_146850b35f43e7fb"
,
"androidid_747f212484a5c406"
,
"862859043610613"
,
"866250033462825"
,
"867911044510576"
,
"androidid_cc678a7826dc4781"
,
"860525042509435"
,
"B4C1FC26-103D-4260-BE6F-3CCB26D73690"
,
"863600030765864"
,
"529A3012-876A-45E9-AE69-9A5E313814D7"
,
"androidid_a537629ff2a9a70d"
,
"5FA109A8-61EF-485C-A970-BAF039C9FCF1"
,
"5FA109A8-61EF-485C-A970-BAF039C9FCF1"
,
"865845043561218"
,
"866695044598571"
,
"862550048747274"
,
"androidid_d165cdfa482e327e"
,
"androidid_25c4c660defa79ab"
,
"androidid_f38650fad8e87fd4"
,
"androidid_044ec3bb5ba9bf76"
,
"867888052761913"
,
"881A7673-8D39-42FD-A607-D5D2FA6E31DF"
,
"860737033974292"
,
"863377043917341"
,
"C356D83E-76FB-4C07-B3D4-51ACC1D34468"
,
"DFFC3F67-F1EA-4391-A03F-A0A79C3B9D7D"
,
"F9F9CC6F-7645-4000-A886-23065B49F9E5"
,
"866708047217303"
,
"A9ECD3B2-5362-49C5-9FE7-A5385D96EBF3"
,
"androidid_e32e66c299c93df0"
,
"866092036748937"
,
"861918032272651"
,
"868900045007962"
,
"864427032734758"
,
"58020E20-E523-4FBA-A786-EE6BC77AA779"
,
"863187030097126"
,
"359583071756904"
,
"862406048703215"
,
"C18FE468-8B4B-4A5E-B62A-6CCB8672B40C"
,
"864051046912620"
,
"20477B01-7089-4152-8648-AB1D3390FF14"
,
"869908046215913"
,
"860887046704531"
,
"FA82A9EB-A099-402C-8587-4F7C84F56D98"
,
"androidid_12e386c58b4ec5e7"
,
"androidid_9d8c40e4adcb9ca8"
,
"865235044638282"
,
"867492049315758"
,
"androidid_c58b0a07fcc0fc25"
,
"androidid_c6b537e3434718c1"
,
"861448044386810"
,
"863732034751721"
,
"866229038673149"
,
"A6EC8F25-9AD1-4C6D-8A46-DADFDA30283A"
,
"34C77834-5A8D-46ED-926B-9B16A0532E1C"
,
"862550048112370"
,
"866251037046895"
,
"DE55AEBF-D600-4FEA-BAB7-2ABFC289A05A"
,
"26ADF574-5276-4AD1-A9C9-5B4ABAC69AE5"
,
"862502042854069"
,
"9A483865-679A-4BEC-AA3B-D28CCDA39CD7"
,
"867135044505614"
,
"androidid_11e162f7be789b57"
,
"66DA7B0A-A675-4FE5-9228-C06E9A492377"
,
"862776043351289"
,
"863268048958218"
,
"865914041032364"
,
"866712036210147"
,
"867768049331378"
,
"869996041176545"
,
"600EE3B0-E7E2-4DB6-B2B4-BE5DDB701413"
,
"864277030823750"
,
"52920143-DD14-41FF-9D57-A8DBB0CA2C05"
,
"867194045037440"
,
"862341039084418"
,
"868460049608232"
,
"A0000068BF2BC3"
,
"860910047115612"
,
"androidid_ee3a94ff1a9db02f"
,
"864080038274814"
,
"868737058523778"
,
"869923027428682"
,
"androidid_cccd788d81581028"
,
"41AB5124-DAE0-4710-8947-96BE3B8EFDF6"
,
"863445039676948"
,
"861293043817617"
,
"866306042855423"
,
"862565034725282"
,
"867216034576797"
,
"androidid_884dbbb62821503e"
,
"24FE39DD-779D-4345-BAE8-690E971A3E78"
,
"androidid_f4e779789dfb9e83"
,
"546C98BB-B71F-44A2-9E92-CFD5545749B7"
,
"864401030215730"
,
"861913031896709"
,
"C7CA290B-ED40-4B60-92F6-B95246C97E0E"
,
"androidid_ae7d7a68f73d0afe"
,
"androidid_bd26308f7ecf65a6"
,
"866986040057708"
,
"4811CBD0-68ED-4741-B9B0-AD5D630BDDD9"
,
"864322039839678"
,
"androidid_93ab6fee80819254"
,
"868062043608836"
,
"865740052303588"
,
"869146024674872"
,
"73071440-A108-4A0C-83DF-89EE0E2A8E36"
,
"863056039643034"
,
"865099048017194"
,
"868267048500835"
,
"androidid_82b46a2a16560146"
,
"99001025791854"
,
"47556C2B-F89B-4832-87E7-AF094C5B45C9"
,
"androidid_c2bff5389dd295ad"
,
"863139041519093"
,
"79139CAF-B720-4268-A4A1-E5FF46DA86D6"
,
"BD435C5F-6149-48B1-9CE6-6756FE200841"
,
"57E1EA59-4EBD-4C34-9027-8B6A301297AE"
,
"androidid_becec45595bb1601"
,
"864555038470520"
,
"209D69DE-4532-438C-A253-50CAA836B88D"
,
"863361037410976"
,
"B97CE764-D064-4953-B359-FA7270B1C1D1"
,
"866842039173925"
,
"DE5A0C24-332A-4675-8093-F6EA75674074"
,
"androidid_c91a73e2dac6624b"
,
"862217039881773"
,
"androidid_1d86e484558cbf0c"
,
"209284E4-6B4F-4121-AD9A-A5E9B5E9E4DB"
,
"863780047753754"
,
"androidid_bf2e13c72949d525"
,
"865632039665090"
,
"865845036346866"
,
"androidid_23cab5008504d229"
,
"androidid_f1736e1eb315783d"
,
"E1E45249-CA55-4F6E-94FE-73BDEB239790"
,
"androidid_01a897a5b138f19b"
,
"869633048267999"
,
"4F2DD0AE-83BE-4264-A30A-5C44A5684147"
,
"864550031866147"
,
"0F85B2D6-5463-47EB-8F5E-187CBFEF5433"
,
"3629090B-30AB-4C21-821F-4E098A31B4B1"
,
"865937031773929"
,
"866356030825326"
,
"868362034685672"
,
"F10446B7-7F71-4C02-9AFE-41E0B7FB27C6"
,
"866525048414154"
,
"866619049897937"
,
"868391046063156"
,
"A00000898F52DE"
,
"DF8F1D7D-B442-4785-97C2-A7AA4B8BACB9"
,
"03F7E97A-D1D6-4EF6-8611-15B23C2E4E00"
,
"androidid_b5ce7fc399b85cf7"
,
"860737038988347"
,
"860800039846912"
,
"866927031655662"
,
"CFD6D69A-8E25-4E38-8694-2BACF42A8C4A"
,
"8A7F3130-BE5D-4ABD-8DC9-538DA929341A"
,
"androidid_bdef3d1e5720c8f0"
,
"869353043880098"
,
"031B2BBC-9858-4BB6-BD49-1E01C7815E9A"
,
"868047041350309"
,
"863601032130453"
,
"868889040583194"
,
"3D6AE98F-3607-4F6B-B2FB-12B5DC3FD2DE"
,
"androidid_dff61b7437925592"
,
"393C2D87-0953-4E3B-A786-EF0572E64889"
,
"864162036168733"
,
"androidid_d6dfbeadfaceb1e8"
,
"22CE68E8-B8BA-4609-8019-936BFDA13480"
,
"861065033361149"
,
"864358047211377"
,
"8BA69C39-6BEC-415D-AAF1-4A54E8DC44DB"
,
"androidid_cc14e534512c7b9b"
,
"868535046882060"
,
"8888FE9B-FCD5-49D3-835A-6AFB58C31AAB"
,
"80B13FDA-F7C2-4698-B86C-21086B22503B"
,
"androidid_addbf04ade290a87"
,
"866676048735914"
,
"866933036565195"
,
"869234035002059"
,
"E64447E6-3170-44E6-A79D-47D338552263"
,
"1C6AC498-0ED2-45E5-A832-959FBBA289D7"
,
"861064043123953"
,
"866179039093799"
,
"867229033491835"
,
"androidid_3ef435d3d801d977"
,
"866306044282501"
,
"867535036711833"
,
"FE666AC4-5EC6-4E65-8E60-BE073A8150C4"
,
"860214047711656"
,
"D594995A-AB27-4480-BE56-87B4CFCD09CD"
,
"androidid_d6874d4bd27bb281"
,
"867490043304455"
,
"868375048629536"
,
"860281049832160"
,
"865033047501999"
,
"868097043087342"
,
"androidid_ed898afb1d40a3b8"
,
"868234041864287"
,
"869894049431998"
,
"866636031927803"
,
"FCB4AC46-98DD-4B5B-B4CB-6C90FAA23BFE"
,
"862951040774573"
,
"869282047830157"
,
"862591038527151"
,
"D5124711-0098-48BE-BD03-CA3A244A85CF"
,
"androidid_7e8ebfdfd1546006"
,
"7AC0DFC9-197F-4B72-863B-B3B6E223B852"
,
"864426049274055"
,
"867499031719731"
,
"EAA1F29E-3594-4801-B2B6-BF194B14D528"
,
"864128033909908"
,
"864347046808416"
,
"androidid_cbd7a3f01137e5f6"
,
"861916041959490"
,
"866039035923029"
,
"865281033287079"
,
"C9937271-0E3F-4423-A497-EB4E38E1CB7D"
,
"androidid_188e2f4fa6e285b3"
,
"860492058510551"
,
"867464039858311"
,
"C7962EA7-F7FC-49BC-B949-9764A43FA712"
,
"862986039412413"
,
"866503030429035"
,
"21D11FF5-B172-4A42-9DF3-58FACD16275A"
,
"863486049542498"
,
"BFEE5B42-30D7-4FC6-B9F7-178378C9B9A5"
,
"861843030274600"
,
"androidid_1c361b1840b40e49"
,
"864165045993355"
,
"2D2259F7-EC21-4BA8-9A7A-E0A244336094"
,
"866743049336207"
,
"androidid_3645e132deda533b"
,
"862975037151016"
,
"867673045179476"
,
"862083037615757"
,
"androidid_96f653663bb6abff"
,
"869120048129734"
,
"CBFC4A91-D9E9-4373-A36D-1C276542D899"
,
"45BF9F73-254B-4583-9494-681A7475E396"
,
"EA6A4923-9B0E-42B0-8EBF-CD45171C2A3F"
,
"androidid_6165eec309d4a51f"
,
"866320033541348"
,
"867836038528807"
,
"863780049463154"
,
"869647049686569"
,
"EDC752F9-EAE1-4C66-85BD-9E783749067B"
,
"860823043871874"
,
"867768042050132"
,
"863147035350458"
,
"863182032403653"
,
"androidid_ff6aac0e76fde4b1"
,
"861695037475899"
,
"02D76AE7-55EF-4A81-A366-FEEF6CFC5683"
,
"0768EE37-0532-4AE6-8E23-762530357937"
,
"869437041870670"
,
"3714CDB4-CB45-4C29-8E56-69FBD751E40C"
,
"864190047577642"
,
"93A4B3CA-5A58-424A-BFD5-E396E2D19FE7"
,
"C9E672B6-6DA9-4569-9A61-4613DB46A4D7"
,
"868650048032042"
,
"androidid_6a307ae2412b9c6c"
,
"865736034460967"
,
"9FA04B34-14E6-45B0-9454-3497709F7B80"
,
"867540037027247"
,
"869939038542717"
,
"A1000055921BDD"
,
"864209048571451"
,
"816EA3D7-652C-43F7-8189-D525B096A4EE"
,
"862434040179325"
,
"866413030453086"
,
"866319030413501"
,
"92DD6FAF-8018-40FB-8FC0-877AB2CC8056"
,
"androidid_86b295b102135a56"
,
"868712030411434"
,
"860803047071184"
,
"androidid_576e65ea7343a68a"
,
"5B24CB58-492F-429D-9CA7-63F9199B924B"
,
"764F8262-B7DC-41DE-85DF-7DECFF725429"
,
"A00000639E6695"
,
"androidid_4cd13f04627cfac8"
,
"860865048412089"
,
"B7205E05-03EF-46A9-843D-2D091ECD8107"
,
"889269DD-8F11-45CC-875E-7F6E9F0797E7"
,
"865365037969038"
,
"866722043599226"
,
"androidid_8ae7063ba4a357d4"
,
"863049034762230"
,
"867210049686196"
,
"F7EC59EA-16D1-4A55-9188-2FECE7769E55"
,
"androidid_60092285cbd6b421"
,
"048CCEBA-3502-446F-919C-6072DC0C43B1"
,
"39525886-7204-436E-A5A0-6E7B2E4EA91C"
,
"864290041542046"
,
"861189054147888"
,
"861868045336599"
,
"869232037018818"
,
"6CA84BCB-7547-4E26-B60C-0A8289B14701"
,
"A0000060111151"
,
"A0000060111151"
,
"androidid_2ed1ea8a0f43220a"
,
"869592030992974"
,
"55B20246-1660-4455-ABAB-264EE4FD22CF"
,
"76C70DE8-53A4-4364-B3F3-8C82D76715B4"
,
"860229040713848"
,
"androidid_a8c2a4ce4b7377a2"
,
"866722049279674"
,
"868198040730197"
,
"869622036651306"
,
"DF041C27-575F-4D61-9139-28E5B945BB00"
,
"androidid_917dfad57c3a4435"
,
"androidid_bbe2ec45bb86084e"
,
"866863047262682"
,
"868452037963315"
,
"androidid_47587d7f95617914"
,
"864175030737717"
,
"9D70D29C-1AA1-4F97-852F-7613031F1733"
,
"1D4B89E7-0122-44A0-9748-23FE2A825AA1"
,
"357107073264972"
,
"androidid_c0e7a6d1877d7a68"
,
"864955048416515"
,
"8D57E982-97A5-460D-A94E-BB54657B6C5A"
,
"AD07FD4F-922B-42CC-AC89-481F2B422AA0"
,
"androidid_b82f08b86c05b759"
,
"863054031070751"
,
"867139044480604"
,
"46C1F100-524B-4291-AF3B-E106B2228D03"
,
"F6FD7614-1C2E-4E66-A33F-7B2DAF1B643E"
,
"860477049577702"
,
"2527CB72-DD06-4B27-BF0B-65B17C13DECD"
,
"8C3C812B-9FCE-4826-B6F8-48B05001BEC5"
,
"F96DDC67-5B66-4DCD-8C5E-18026D4FDE27"
,
"861351041995759"
,
"866356042500875"
,
"A000009689D765"
,
"861519048538794"
,
"androidid_fd1674101ee54301"
,
]
for
t
in
range
(
2
,
task_days
):
day_num
=
0
-
t
now
=
(
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
days
=
day_num
))
last_30_day_str
=
(
now
+
datetime
.
timedelta
(
days
=-
30
))
.
strftime
(
"
%
Y
%
m
%
d"
)
tomorrow_str
=
(
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
days
=
day_num
+
1
))
.
strftime
(
"
%
Y
%
m
%
d"
)
today_str
=
now
.
strftime
(
"
%
Y
%
m
%
d"
)
today_str_format
=
now
.
strftime
(
"
%
Y-
%
m-
%
d"
)
yesterday_str
=
(
now
+
datetime
.
timedelta
(
days
=-
1
))
.
strftime
(
"
%
Y
%
m
%
d"
)
yesterday_str_format
=
(
now
+
datetime
.
timedelta
(
days
=-
1
))
.
strftime
(
"
%
Y-
%
m-
%
d"
)
one_week_age_str
=
(
now
+
datetime
.
timedelta
(
days
=-
7
))
.
strftime
(
"
%
Y
%
m
%
d"
)
new_urser_device_id_sql
=
r"""
select t2.device_id as device_id from
(select device_id from online.ml_device_day_active_status where partition_date = '{today_str}' and active_type in (1,2)) t2
LEFT join (
select first_device from online.ml_user_history_detail where partition_date = '{tomorrow_str}' and last_active_date = '{today_str}'
) on first_device = t2.device_id
LEFT JOIN
(
select distinct device_id
from ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day='{today_str}'
union all
select distinct device_id
from dim.dim_device_user_staff --去除内网用户
)spam_pv
on spam_pv.device_id=t2.device_id
LEFT JOIN
(
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date='{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{today_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{today_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{today_str}'
) t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{today_str}'
AND is_login_doctor = '1'
) t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev
on t2.device_id=dev.device_id
WHERE spam_pv.device_id IS NULL
and dev.device_id is null and first_device is not null
"""
.
format
(
today_str
=
today_str
,
yesterday_str_format
=
yesterday_str_format
,
today_str_format
=
today_str_format
,
tomorrow_str
=
tomorrow_str
)
print
(
new_urser_device_id_sql
)
new_urser_device_id_df
=
spark
.
sql
(
new_urser_device_id_sql
)
new_urser_device_id_df
.
createOrReplaceTempView
(
"device_id_view"
)
new_urser_device_id_df
.
show
(
1
)
sql_res
=
new_urser_device_id_df
.
collect
()
# print("-------------------------------")
# for count, res in enumerate(sql_res):
# # print(count, res)
# track = res.track
# if not track:
# continue
# track_list = track.split(",")
# for one_key_word in track_list:
# if one_key_word in res_dict:
# res_dict[one_key_word] += 1
# else:
# res_dict[one_key_word] = 1
# print(res_dict)
task/spark_test.py
View file @
69e3d529
...
@@ -29,8 +29,10 @@ from meta_base_code.utils.func_from_redis_get_portrait import *
...
@@ -29,8 +29,10 @@ from meta_base_code.utils.func_from_redis_get_portrait import *
def
con_sql
(
sql
):
def
con_sql
(
sql
):
# 从数据库的表里获取数据
# 从数据库的表里获取数据
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'aqpuBLYzEV7tML5RPsN1pntUzFy'
,
# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db
=
'jerry_prod'
)
# db='jerry_prod')
db
=
pymysql
.
connect
(
host
=
'172.16.30.136'
,
port
=
3306
,
user
=
'doris'
,
passwd
=
'o5gbA27hXHHm'
,
db
=
'doris_prod'
)
cursor
=
db
.
cursor
()
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
result
=
cursor
.
fetchall
()
...
@@ -76,6 +78,8 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF
...
@@ -76,6 +78,8 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF
task_list
=
[]
task_list
=
[]
task_days
=
3
task_days
=
3
for
t
in
range
(
2
,
task_days
):
for
t
in
range
(
2
,
task_days
):
day_num
=
0
-
t
day_num
=
0
-
t
now
=
(
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
days
=
day_num
))
now
=
(
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
days
=
day_num
))
...
@@ -182,15 +186,20 @@ WHERE spam_pv.device_id IS NULL
...
@@ -182,15 +186,20 @@ WHERE spam_pv.device_id IS NULL
for
count_user_count
,
res
in
enumerate
(
sql_res
):
for
count_user_count
,
res
in
enumerate
(
sql_res
):
# print(count, res)
# print(count, res)
portratit_res
=
get_user_portrait_tag3_from_redis
(
res
.
device_id
)
portratit_res
=
get_user_portrait_tag3_from_redis
(
res
.
device_id
)
sql
=
"""select cl_id, projects from kafka_tag3_log
where cl_id = '
%
s' and event_cn = 'kyc' """
# print(count_user_count, res, portratit_res)
# print(count_user_count, res, portratit_res)
sql_res_list
=
con_sql
(
sql
)
kyc_str_list
=
[]
if
sql_res
:
print
(
sql_res
,
type
(
sql_res
))
kyc_str_list
=
sql_res_list
[
0
][
1
]
.
split
(
","
)
temp_count
=
0
temp_count
=
0
for
demand
in
portratit_res
:
for
demand
in
portratit_res
:
if
portratit_res
[
demand
]:
if
portratit_res
[
demand
]:
try
:
try
:
for
tag
in
portratit_res
[
demand
][
0
:
3
]:
for
tag
in
portratit_res
[
demand
][
0
:
3
]:
if
tag
==
"吸脂瘦下颌缘"
and
demand
==
"projects"
:
print
(
res
.
device_id
)
if
tag
in
portrait_dict
[
demand
]:
if
tag
in
portrait_dict
[
demand
]:
portrait_dict
[
demand
][
tag
]
+=
1
portrait_dict
[
demand
][
tag
]
+=
1
else
:
else
:
...
@@ -203,33 +212,35 @@ WHERE spam_pv.device_id IS NULL
...
@@ -203,33 +212,35 @@ WHERE spam_pv.device_id IS NULL
if
not
temp_count
:
if
not
temp_count
:
count_not_has_portratit
+=
1
count_not_has_portratit
+=
1
no_portrait_device_id_list
.
append
(
res
.
device_id
)
no_portrait_device_id_list
.
append
(
res
.
device_id
)
for
key
in
kyc_str_list
:
portrait_dict
[
"projects"
][
key
]
-=
1
print
(
portrait_dict
)
print
(
portrait_dict
)
print
(
count_user_count
+
1
,
count_not_has_portratit
)
print
(
count_user_count
+
1
,
count_not_has_portratit
)
print
(
"-------------------------------"
)
print
(
"-------------------------------"
)
#
for protratit_type in portrait_dict["projects"]:
for
protratit_type
in
portrait_dict
[
"projects"
]:
#
partition_date = today_str
partition_date
=
today_str
#
pid = hashlib.md5((partition_date + protratit_type).encode("utf8")).hexdigest()
pid
=
hashlib
.
md5
((
partition_date
+
protratit_type
)
.
encode
(
"utf8"
))
.
hexdigest
()
#
action_count = portrait_dict["projects"][protratit_type]
action_count
=
portrait_dict
[
"projects"
][
protratit_type
]
#
#
instert_sql = """replace into new_user_project_count(
instert_sql
=
"""replace into new_user_project_count(
#
partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');""".format(
partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');"""
.
format
(
#
partition_day=today_str, pid=pid, protratit_count=action_count
partition_day
=
today_str
,
pid
=
pid
,
protratit_count
=
action_count
#
, protratit_type=protratit_type
,
protratit_type
=
protratit_type
#
)
)
#
print(instert_sql)
print
(
instert_sql
)
#
# cursor.execute("set names 'UTF8'")
# cursor.execute("set names 'UTF8'")
#
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'aqpuBLYzEV7tML5RPsN1pntUzFy'
,
#
db='jerry_prod')
db
=
'jerry_prod'
)
#
cursor = db.cursor()
cursor
=
db
.
cursor
()
#
res = cursor.execute(instert_sql)
res
=
cursor
.
execute
(
instert_sql
)
#
db.commit()
db
.
commit
()
#
print(res)
print
(
res
)
#
cursor.executemany()
cursor
.
executemany
()
#
db.close()
db
.
close
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment