Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
M
meta_base_code
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
宋柯
meta_base_code
Commits
72c037c7
Commit
72c037c7
authored
Sep 11, 2020
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
b796b6de
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
301 additions
and
95 deletions
+301
-95
daily_once_task.sh
daily_once_task.sh
+0
-1
portary_article_distribution.py
task/portary_article_distribution.py
+14
-94
func_from_es_get_article.py
utils/func_from_es_get_article.py
+286
-0
func_from_redis_get_portrait.py
utils/func_from_redis_get_portrait.py
+1
-0
No files found.
daily_once_task.sh
View file @
72c037c7
source
/srv/envs/esmm/bin/activate
/opt/spark/bin/spark-submit
--master
yarn
--deploy-mode
client
--queue
root.strategy
--driver-memory
16g
--executor-memory
1g
--executor-cores
2
--num-executors
100
--conf
spark.default.parallelism
=
100
--conf
spark.storage.memoryFraction
=
0.5
--conf
spark.shuffle.memoryFraction
=
0.3
--jars
/srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/meta_base_code/task/search_strategy_d.py
/opt/spark/bin/spark-submit
--master
yarn
--deploy-mode
client
--queue
root.strategy
--driver-memory
16g
--executor-memory
1g
--executor-cores
4
--num-executors
120
--conf
spark.default.parallelism
=
100
--conf
spark.storage.memoryFraction
=
0.5
--conf
spark.shuffle.memoryFraction
=
0.3
--conf
spark.locality.wait
=
0
--jars
/srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/meta_base_code/task/core_indicators_monitoring.py
/opt/spark/bin/spark-submit
--master
yarn
--deploy-mode
client
--queue
root.strategy
--driver-memory
16g
--executor-memory
2g
--executor-cores
4
--num-executors
70
--conf
spark.default.parallelism
=
100
--conf
spark.storage.memoryFraction
=
0.5
--conf
spark.shuffle.memoryFraction
=
0.3
--conf
spark.locality.wait
=
0
--jars
/srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/meta_base_code/task/core_indicators_monitoring.py
/opt/spark/bin/spark-submit
--master
yarn
--deploy-mode
client
--queue
root.strategy
--driver-memory
16g
--executor-memory
1g
--executor-cores
4
--num-executors
70
--conf
spark.default.parallelism
=
100
--conf
spark.storage.memoryFraction
=
0.5
--conf
spark.shuffle.memoryFraction
=
0.3
--conf
spark.locality.wait
=
0
--jars
/srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/meta_base_code/task/advertisement_strategy_d.py
...
...
task/portary_article_distribution.py
View file @
72c037c7
...
...
@@ -11,6 +11,8 @@ import json
import
pymysql
import
xlwt
,
datetime
import
redis
from
meta_base_code.utils.func_from_redis_get_portrait
import
user_portrait_scan_info
from
meta_base_code.utils.func_from_es_get_article
import
get_es_article_num
# from pyhive import hive
from
maintenance.func_send_email_with_file
import
send_file_email
from
typing
import
Dict
,
List
...
...
@@ -38,101 +40,19 @@ def con_sql(sql):
return
result
startTime
=
time
.
time
()
sparkConf
=
SparkConf
()
sparkConf
.
set
(
"spark.sql.crossJoin.enabled"
,
True
)
sparkConf
.
set
(
"spark.debug.maxToStringFields"
,
"100"
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
False
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_read"
,
True
)
sparkConf
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
True
)
sparkConf
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
True
)
sparkConf
.
set
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
sparkConf
.
set
(
"mapreduce.output.fileoutputformat.compress"
,
False
)
sparkConf
.
set
(
"mapreduce.map.output.compress"
,
False
)
sparkConf
.
set
(
"prod.gold.jdbcuri"
,
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.mimas.jdbcuri"
,
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.gaia.jdbcuri"
,
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tidb.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.jerry.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.158:2379"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.170:4000"
)
sparkConf
.
set
(
"prod.tidb.database"
,
"jerry_prod"
)
sparkConf
.
setAppName
(
"search_tractate_ctr"
)
spark
=
(
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
config
(
"spark.sql.extensions"
,
"org.apache.spark.sql.TiExtensions"
)
.
config
(
"spark.tispark.pd.addresses"
,
"172.16.40.170:2379"
)
.
enableHiveSupport
()
.
getOrCreate
())
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar"
)
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'"
)
def
user_portrait_scan_info
():
return_dict
=
{}
try
:
round
=
0
all_count
=
0
empty_count
=
0
just_projects_count
=
0
keys
=
"doris:user_portrait:tag3:device_id:*"
cur
,
results
=
redis_client2
.
scan
(
0
,
keys
,
3000
)
while
cur
!=
0
:
round
+=
1
print
(
"round: "
+
str
(
round
))
cur
,
results
=
redis_client2
.
scan
(
cur
,
keys
,
3000
)
for
key
in
results
:
key
=
key
device_id
=
key
.
split
(
":"
)[
-
1
]
all_count
+=
1
# print(key)
# if user_portrait_is_empty(device_id):
# print(device_id)
# empty_count += 1
# if user_portrait_just_projects(device_id):
# print(device_id)
# just_projects_count += 1
# user_portrait_get_empty_candidates(device_id)
try
:
res_dic
=
get_user_portrait_tag3_from_redis
(
device_id
)
# print(res_dic)
for
data_type
in
res_dic
:
for
tag
in
res_dic
[
data_type
]:
if
tag
==
"眼窝"
:
print
(
return_dict
.
get
(
tag
))
if
return_dict
.
get
(
tag
):
return_dict
[
tag
]
=
(
data_type
,
return_dict
[
tag
][
1
]
+
1
)
else
:
return_dict
[
tag
]
=
(
data_type
,
1
)
except
:
continue
# for data_list in res_dic:
# for data in data_list:
return
return_dict
except
Exception
as
e
:
print
(
e
)
return
{}
task_list
=
[]
task_days
=
3
for
t
in
range
(
1
,
task_days
):
day_num
=
0
-
t
now
=
(
datetime
.
datetime
.
now
()
+
datetime
.
timedelta
(
days
=
day_num
))
last_30_day_str
=
(
now
+
datetime
.
timedelta
(
days
=-
30
))
.
strftime
(
"
%
Y
%
m
%
d"
)
today_str
=
now
.
strftime
(
"
%
Y
%
m
%
d"
)
yesterday_str
=
(
now
+
datetime
.
timedelta
(
days
=-
1
))
.
strftime
(
"
%
Y
%
m
%
d"
)
one_week_age_str
=
(
now
+
datetime
.
timedelta
(
days
=-
7
))
.
strftime
(
"
%
Y
%
m
%
d"
)
user_portrait_scan
=
user_portrait_scan_info
()
for
res
in
user_portrait_scan
:
second_demands
=
[]
projects
=
[]
if
res
.
get
(
"second_demands"
):
second_demands
=
res
.
get
(
"second_demands"
)
count_res
=
get_es_article_num
({
"second_demands"
:
second_demands
},
allow_tag
=
[
"second_demands"
])
print
(
count_res
)
if
res
.
get
(
"projects"
):
projects
=
res
.
get
(
"projects"
)
count_res
=
get_es_article_num
({
"tags_v3"
:
projects
},
allow_tag
=
[
"tags_v3"
])
print
(
count_res
)
# for active_type in res_dict:
# for device_os_type in res_dict[active_type]:
...
...
utils/func_from_es_get_article.py
0 → 100644
View file @
72c037c7
# -*- coding:UTF-8 -*-
# @Time : 2020/9/11 13:51
# @File : func_from_es_get_article.py
# @email : litao@igengmei.com
# @author : litao
from
elasticsearch
import
Elasticsearch
es
=
Elasticsearch
([
{
'host'
:
'172.16.31.17'
,
'port'
:
9200
,
},
{
'host'
:
'172.16.31.11'
,
'port'
:
9200
,
}])
def
get_device_num_from_es
(
word
):
results
=
es
.
search
(
index
=
'gm-dbmw-device'
,
doc_type
=
'doc'
,
timeout
=
'10s'
,
size
=
0
,
body
=
{
"query"
:
{
"bool"
:
{
"should"
:
[
{
"nested"
:
{
"path"
:
"first_demands"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"first_demands.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"second_demands"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"second_demands.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"first_solutions"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"first_solutions.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"second_solutions"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"second_solutions.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"first_positions"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"first_positions.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"second_positions"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"second_positions.name"
:
[
word
]
}
}
]
}
}
}
},
{
"nested"
:
{
"path"
:
"projects"
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"projects.name"
:
[
word
]
}
}
]
}
}
}
}
],
"minimum_should_match"
:
1
}
}
}
)
tractate_content_num
=
results
[
"hits"
][
"total"
]
return
tractate_content_num
def
get_es_article_num
(
tag_dict
,
allow_tag
=
[
"first_demands"
,
"second_demands"
,
"first_solutions"
,
"second_solutions"
,
"positions"
,
"second_positions"
,
"tags_v3"
]):
# {tag_name:(answer_content_num, tractate_content_num, diary_content_num, total_num)}
article_dict
=
{
"first_demands"
:
[],
"second_demands"
:
[],
"first_solutions"
:
[],
"second_solutions"
:
[],
"positions"
:
[],
"second_positions"
:
[],
"tags_v3"
:
[],
}
for
key
in
article_dict
.
keys
():
if
key
not
in
allow_tag
:
article_dict
.
pop
(
key
)
for
tag_type
in
tag_dict
:
for
tag_name
in
tag_dict
[
tag_type
]:
body
=
{
"query"
:
{
"bool"
:
{
"minimum_should_match"
:
1
,
"should"
:
[],
"must"
:
[
{
"term"
:
{
"is_online"
:
True
}
},
{
"terms"
:
{
"content_level"
:
[
6
,
5
,
4
,
3.5
,
3
]
}
},
{
"range"
:
{
"content_length"
:
{
"gte"
:
30
}
}
}],
}
},
}
body
[
"query"
][
"bool"
][
"must"
]
.
append
({
"term"
:
{
tag_type
:
tag_name
}})
results
=
es
.
search
(
index
=
'gm-dbmw-answer-read'
,
doc_type
=
'answer'
,
timeout
=
'10s'
,
size
=
0
,
body
=
body
)
answer_content_num
=
results
[
"hits"
][
"total"
]
body
=
{
"query"
:
{
"bool"
:
{
"minimum_should_match"
:
1
,
"should"
:
[],
"must"
:
[{
"term"
:
{
"is_online"
:
True
}},
{
"terms"
:
{
"content_level"
:
[
6
,
5
,
4
,
3.5
,
3
]}
}]
}
}
}
body
[
"query"
][
"bool"
][
"must"
]
.
append
({
"term"
:
{
tag_type
:
tag_name
}})
# tractate
results
=
es
.
search
(
index
=
'gm-dbmw-tractate-read'
,
doc_type
=
'tractate'
,
timeout
=
'10s'
,
size
=
0
,
)
tractate_content_num
=
results
[
"hits"
][
"total"
]
body
=
{
"query"
:
{
"bool"
:
{
"minimum_should_match"
:
1
,
"should"
:
[],
"must"
:
[{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"has_cover"
:
True
}
},
{
"term"
:
{
"is_sink"
:
False
}
},
{
"term"
:
{
"has_after_cover"
:
True
}
},
{
"term"
:
{
"has_before_cover"
:
True
}
},
{
"range"
:
{
"content_level"
:
{
"gte"
:
"3"
}}},
{
"term"
:
{
"content_simi_bol_show"
:
0
}
}
]
}
},
}
body
[
"query"
][
"bool"
][
"must"
]
.
append
({
"term"
:
{
tag_type
:
tag_name
}})
### diary 日记
results
=
es
.
search
(
index
=
'gm-dbmw-diary-read'
,
doc_type
=
'diary'
,
timeout
=
'10s'
,
size
=
0
,
body
=
body
)
diary_content_num
=
results
[
"hits"
][
"total"
]
total_num
=
answer_content_num
+
tractate_content_num
+
diary_content_num
data_dic
=
{
tag_name
:
(
answer_content_num
,
tractate_content_num
,
diary_content_num
,
total_num
)}
print
(
data_dic
)
article_dict
[
tag_type
]
.
append
(
data_dic
)
return
article_dict
utils/func_from_redis_get_portrait.py
View file @
72c037c7
...
...
@@ -46,6 +46,7 @@ def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
}
return
{}
def
user_portrait_scan_info
():
return_dict
=
{}
try
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment