Commit c88aef15 authored by 桂秋月's avatar 桂秋月

1

parent f65d4961
......@@ -66,6 +66,13 @@ mysql_info={
"port":"3306"
},
#mysql+pymysql://xyqb_recommender:vxVFCgWTKjYb0xfR@rm-2ze1l8mi94dkd255c.mysql.rds.aliyuncs.com:3306/xyqb_recommender_system?charset=utf8
"online":{
"host":"rm-2ze1l8mi94dkd255c.mysql.rds.aliyuncs.com",
"pwd":"vxVFCgWTKjYb0xfR",
"name":"xyqb_recommender",
"port":"3306"
},
"usergroup":{
"name":"tjzimuali_test",
"pwd":"bYut032QgkJ7ElMXwW",
......
......@@ -38,14 +38,14 @@ def mysql_universal(namespace):
mysql_info=mysqlInfo(namespace)
#print(mysql_info)
temp=create_engine(mysql_universal.format(**mysql_info))
print("mysql连接信息:",temp)
#print("mysql连接信息:",temp)
return temp
def execmysl(namespace,sql):
if 'select' not in sql.strip() and 'alter' not in sql.strip() and 'update' not in sql:
if 'select' not in sql.strip().lower() and 'alter' not in sql.strip().lower() and 'update' not in sql.lower():
raise Exception('在mysql看来不是有效的sql',sql)
try:
print(sql)
print('执行的sql如下:',sql)
df=pd.read_sql(sql,con=mysql_universal(namespace))
return df
except Exception as e:
......@@ -77,4 +77,4 @@ def concatSql(sql,**kwargs):
if __name__ == '__main__':
#print(mysqlInfo('test1'))
print(execmysl('bd',"select 1 "))
\ No newline at end of file
print(execmysl('yxm',"select 1 "))
\ No newline at end of file
......@@ -96,8 +96,23 @@ if __name__ == '__main__':
gid_redis='device_recall_mapping:'+"BC609876-4ED3-4514-BCB3-9649D066641B"
user_redis='user_recall_mapping:'+"1d0cdee1-85af-4e5b-856e-4a2e453befb6"
sku_batch="personal_recall_product:1657881287309234854"
t=getRedisValue('89',gid_redis,oper='delete') #,oper='delete'
print(t)
user_batch="1d0cdee1-85af-4e5b-856e-4a2e453befb6"+"personal_recall_product"+"*"
# dapan_redis='cold_start_cid3_heat_intentions:202207201049'
# t=getRedisValue('89',dapan_redis,oper='delete1') #,oper='delete'
# print(t)
redis_key="product_similarity:2304543819777"
redis_key="product_correlation:253755595169288"
sku_list="163979637300224,163979645691904,163979628911616"
redis_db=redis_conn('89')
#redis_db.lpush(redis_key,*sku_list.split(','))
print(getRedisValue('89',redis_key,oper='delete1'))
## search_fm_offline_feature:{md5(搜索词)}_query_offline
## 内部:md5加密
......
......@@ -3,6 +3,7 @@ from tools import *
from tools.publicFun import *
from tools.httprequest import *
from tools.listOperation import *
from feature.custom_enum import custom_enum_info
from databaseConn.mysqlOperation import *
#from databaseConn.redisOperation import *
from feature.publicSql import *
......
import os.path
import posix
from feature import *
from feature.publicFun import devResult
def getUserids(uuid):
sql=finance_uuid_idno_userids_sql.format(uuid)
useris_df=execmysl('feature_aliyun',sql)
if useris_df.empty:
return custom_enum_info.custom_not_exist
return useris_df['user_id'].to_list()
def vcc_fixed_amount(uuid,userids,type='fixed'):
#VCC现有固定额度
try:
#print("uuid[{}]的userids:".format(uuid),userids)
temp_user_id="("+str(userids[0])+")" if len(userids)==1 else tuple(userids)
vcc_fixed_amount_change_sql=vcc_fixed_amount_sql.format(temp_user_id).strip()
df=execmysl('feature_aliyun',vcc_fixed_amount_change_sql)
#print(df)
if df.empty:
return custom_enum_info.custom_not_exist
elif type=='fixed':
df=df.fillna(-1)
temp= sum(df['fixed_amount'].to_list())
#print('**'*100,temp)
return temp if temp>0 else custom_enum_info.custom_not_exist
elif type=='used':
df=df.fillna(-1)
temp= sum(df['used_amount'].to_list())
return temp
else:
print("type[]不支持".format(type))
return 1/0
except:
traceback.print_exc(3)
return custom_enum_info.custom_error
def vcc_temporary_amount(uuid,userids):
#VCC现有临时额度
try:
temp_user_id="("+str(userids[0])+")" if len(userids)==1 else tuple(userids)
vcc_temporary_amount_change_sql=vcc_temporary_amount_sql.format(temp_user_id).strip()
df=execmysl('feature_aliyun',vcc_temporary_amount_change_sql)
if df.empty:
return custom_enum_info.custom_not_exist
df=df.fillna(-1)
temp= sum(df['amount'].to_list())
return temp if temp>0 else custom_enum_info.custom_not_exist
except:
traceback.print_exc(3)
return custom_enum_info.custom_error
def vcc_sum_amount(uuid,userids):
#VCC现有总额度
a1=vcc_fixed_amount(uuid,userids)
a1_temp=a1 if a1>0 else 0
a2=vcc_temporary_amount(uuid,userids)
a2_temp=a2 if a2>0 else 0
#print('ttt',a1_temp,a2_temp)
return a1_temp+a2_temp if a2_temp or a1_temp else custom_enum_info.custom_not_exist
def vcc_used_amount(uuid,userids):
#VCC已用额度
a1=vcc_fixed_amount(uuid,userids,type='used')
return round(a1,2)
def cash_used_amount(uuid,userids):
#现金分期当前在贷
try:
#userids=getUserids(uuid)
temp_user_id="("+str(userids[0])+")" if len(userids)==1 else tuple(userids)
cash_used_amount_change_sql=cash_used_amount_sql.format(temp_user_id).strip()
df=execmysl('feature_aliyun',cash_used_amount_change_sql)
if df.empty:
return custom_enum_info.custom_not_exist
df=df.fillna(-1)
temp= sum(df['required_repayment'].to_list())
return temp if temp>0 else custom_enum_info.custom_not_exist
except:
traceback.print_exc(3)
return custom_enum_info.custom_error
def vcc_credit_usage(uuid,userids):
#现金分期当前在贷
a1=vcc_used_amount(uuid,userids)
a1_temp=a1 if a1>0 else 0
a2=vcc_sum_amount(uuid,userids)
a2_temp=a2 if a2>0 else 0
try:
return round(a1_temp/a2_temp,2)
except:
traceback.print_exc(3)
return custom_enum_info.custom_not_exist
def simple_exec(uuid,codes):
#codes=['vcc_fixed_amount','vcc_temporary_amount','vcc_sum_amount','vcc_used_amount','cash_used_amount','vcc_credit_usage']
#codes=['vcc_sum_amount']
userids=getUserids(uuid)
#print(userids)
temp={}
temp['uuid']=uuid
for i in codes:
temp[i+'_test']=eval("{}(uuid,userids)".format(i))
#print('@@'*100,temp[i+'_test'])
return temp
def main(codes):
try:
result_dev=[]
result_test=[]
filename=os.path.join(feature_report,'uuids.xlsx')
uuids_df=readFile(filename)
uuids=uuids_df['uuid'].to_list()
for uuid in uuids:
print("执行的uuid,",uuid)
result1=devResult(uuid,codes,codetype='pre')
result_dev.append(result1)
result2=simple_exec(uuid,codes)
result_test.append(result2)
#return 0
dev_df=pd.DataFrame(result_dev)
test_df=pd.DataFrame(result_test)
#to_excel(filename,index=0,encoding = 'utf-8',float_format = str)
dev_df.to_excel(os.path.join(feature_report,'dev.xlsx'),index=0,encoding = 'utf-8')
test_df.to_excel(os.path.join(feature_report,'test.xlsx'),index=0,encoding = 'utf-8')
print("测试完成")
except:
traceback.print_exc(3)
def compare(codes):
dev_df=readFile("dev.xlsx")
test_df=readFile("test.xlsx")
df=dev_df.merge(test_df,on='uuid', how='inner')
print(df.head(2))
for code in codes:
df[code+"_result"]=df[code+'_dev']==df[code+"_test"]
df.to_excel(os.path.join(feature_report,'result.xlsx'),index=0,encoding = 'utf-8')
if __name__ == '__main__':
codes=['vcc_fixed_amount','vcc_temporary_amount','vcc_sum_amount','vcc_used_amount','cash_used_amount','vcc_credit_usage']
#compare(codes)
codes=['vcc_used_amount']
t=simple_exec('9d287da7-c5e8-4319-bd8f-b81abb8b294d',codes)
print(t)
#print(main(codes))
\ No newline at end of file
class custom_enum_info:
custom_error="暂无此人"
custom_not_exist=-9999999
\ No newline at end of file
import curlify
from feature import *
def devResult(uuid,codes,codetype='pre'):
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'rc_auth_key': 'rc_offline',
'rc_auth_secret': '9d9}bc24!e1z1x3`(x~4r29d$+45n3)\'zb696b$85e>_]p2&4f{,a3~8b3e_ldt^'
}
temp={'uuid':uuid}
temp_codes=','.join(codes)
body={
"codes":temp_codes
}
if codetype=='pre':
body["user_uuid"]=uuid
url=pre_loan_url
else:
url=mid_loan_url
body['tradeNo']=0
body['userUuid']=uuid
t=requests.post(url,data=body,headers=headers)
#print('开发结果===',t.text,uuid)
#print(curlify.to_curl(t.request))
for code in codes:
#print(code)
temp[code+'_dev']=jsonpath.jsonpath(t.json(),'$..{}.value'.format(code))[0]
return temp
def getuuids():
sql="""
select distinct a.uuid from xyqb_user.user a
join xyqb_user.user_detail b on a.id=b.user_id
"""
df=execmysl('feature_aliyun',sql)
filename=os.path.join(feature_report,'uuids.xlsx')
df['uuid'].to_excel(filename,index=0,encoding = 'utf-8',float_format = str)
if __name__ == '__main__':
getuuids()
\ No newline at end of file
......@@ -6,3 +6,45 @@ select phone from finance_offline_feature.del_20220627
feature_user_sql="""
select phone_no,uuid from xyqb_user.user order by rand() limit {num}
"""
##通过金融uuid拿到身份证号,然后再拿到该身份证号的所有金融userid
finance_uuid_idno_userids_sql="""
select user_id from xyqb_user.user_detail where id_no in (
select b.id_no from xyqb_user.user a
join xyqb_user.user_detail b on a.id=b.user_id
where a.uuid='{}')
"""
###VCC现有固定额度
vcc_fixed_amount_sql="""
select fixed_amount,used_amount from vcc_quota.user_quota_record
where user_id in {} and status in (1,2)
"""
##VCC现有临时额度
vcc_temporary_amount_sql="""
SELECT user_id,amount from vcc_quota.user_quota_package where id= (SELECT max(id) FROM vcc_quota.user_quota_package
where user_id in {} and status in (1,2) GROUP BY user_id)
"""
###现金分期当前在贷
cash_used_amount_sql="""
SELECT
repay.user_id,
sum( repay.required_repayment ) required_repayment
FROM
xyqb.repayment_plan repay
INNER JOIN xyqb.loan_application_manifest_history loan ON loan.id = repay.loan_application_manifest_history_id
WHERE
repay.repayment_status <> 3
AND loan.transaction_status IN ( 2, 5 )
and repay.user_id in {}
-- AND date(deadline)>=date(now())
GROUP BY
repay.user_id
"""
##
base_user_info_sql="""
select id from xyqb_user.user where uuid='{}'
"""
\ No newline at end of file
......@@ -10,7 +10,7 @@ def online_intent_cid3(uuid='-999999',gid='-999999'):
df=execmysl('89',recommend_user_intent_change_sql)
if df.empty:
return []
return df['intent_cid3'].to_list()
return df['intent_cid3'].astype(str).to_list()
def offine_intent_cid3(uuid='-999999',gid='-999999'):
cold_start_user_history_cid3_intentions_change_sql=cold_start_user_history_cid3_intentions_sql.format(uuid,gid)
......@@ -31,15 +31,21 @@ def dapan():
def main(uuid='-999999',gid='-999999'):
online_cid3=online_intent_cid3(uuid,gid)
print("实时意图召回80011:",online_cid3)
offline_cid3=offine_intent_cid3(uuid,gid)
print("离线意图召回80012:",offline_cid3)
dapan_cid3=dapan()
print("大盘意图召回80013:",dapan_cid3)
cid3s=removeRepeat(online_cid3+offline_cid3+dapan_cid3)[:3]
skus_df=concatSql(skuinfo_sql,**{"cid3":cid3s})
skus_change_sql=concatSql(skuinfo_sql,**{"cid3":cid3s})
skus_df=execmysl('89',skus_change_sql)
#print('最终的商品数据如下:',skus_df)
return skus_df['sku_no'].to_list()
if __name__ == '__main__':
uuid='6b4fe2b4-a851-40fe-b569-506bb93e6407'
uuid='1bb5268b-8cdb-4d98-8e24-37a39fc25235'
gid=''
#print(online_intent_cid3(uuid))
print(dapan())
print(main(uuid=uuid))
# -*- coding: utf-8 -*-
from recommend import *
def write_similarity(product_id1):
sql="""
select product_id1, product_id2, similarity_rank
from xyqb_recommender_system.mix_products_similarity
where similarity_rank <= 100 and product_id1='{}'
limit 1000
"""
product_id1='2304543819777'
sql=sql.format(product_id1)
df=execmysl('89',sql)
#df=df.groupby('product_id1').agg(lambda x:aa(x)).reset_index()#.apply(lambda x:x.sort_values('similarity_rank')).reset_index()
product_id2s=df['product_id2'].to_list()
return product_id2s
def write_correlation(product_id1):
sql="""
select product_id1, product_id2, correlation_rank
from xyqb_recommender_system.mix_products_correlation
where correlation_rank <= 100 and product_id1='{}' limit 10;
"""
sql=sql.format(product_id1)
redis_key="product_correlation:"+product_id1
df=execmysl('89',sql)
#df=df.groupby('product_id1').agg(lambda x:aa(x)).reset_index()#.apply(lambda x:x.sort_values('similarity_rank')).reset_index()
product_id2s=df['product_id2'].to_list()
return product_id2s
def main():
product_id1='306834'
redis_db=redis_conn('89')
product_id2s=write_similarity(product_id1)
if product_id2s:
redis_key="product_similarity:"+product_id1
if not getRedisValue('89',redis_key):
print(redis_key,"insert succ")
redis_db.lpush(redis_key,*product_id2s)
product_id2s=write_correlation(product_id1)
if product_id2s:
redis_key="product_correlation:"+product_id1
if not getRedisValue('89',redis_key):
print(redis_key,"insert succ")
redis_db.lpush(redis_key,*product_id2s)
if __name__ == '__main__':
main()
# redis_key="product_correlation:{sku}"
#
#print(temp)
\ No newline at end of file
from recommend import *
def offline_recall(uuid_gid):
temp=defaultdict(list)
kdsp_personalize_recall_etrec_change_sql=concatSql(kdsp_personalize_recall_etrec_sql,**{"user_id":uuid_gid})
etrec_df=execmysl('89',kdsp_personalize_recall_etrec_change_sql)
if etrec_df.empty:
temp['90011']=[]
else:
etrec_df['item_ids']=etrec_df['item_ids'].apply(lambda x:','.join([i.split('::')[0] for i in x.split(',')]))
temp['90011']=etrec_df['item_ids'].to_list()
kdsp_personalize_recall_swing_change_sql=concatSql(kdsp_personalize_recall_swing_sql,**{"user_id":uuid_gid})
swing_df=execmysl('89',kdsp_personalize_recall_swing_change_sql)
if swing_df.empty:
temp['90012']=[]
else:
swing_df['item_ids']=swing_df['item_ids'].apply(lambda x:','.join([i.split('::')[0] for i in x.split(',')]))
temp['90012']=swing_df['item_ids'].to_list()
print("temp['90011']",temp['90011'])
print("temp['90012']",temp['90012'])
return temp
if __name__ == '__main__':
uuid='0005a648-32f1-444a-9ce6-0b63fa010c0d'
print(offline_recall(uuid))
\ No newline at end of file
......@@ -126,7 +126,7 @@ select * from online_recommend.recommend_sku_query_match_info
####用户历史意图表-商品冷启动
cold_start_user_history_cid3_intentions_sql="""
select * from offline_recommend.cold_start_user_history_cid3_intentions
where user_uuid='{}' or device_id='{}'
where uuid='{}' or gid='{}'
order by event_time desc limit 3
"""
......@@ -143,3 +143,13 @@ where user_uuid='{}' or device_id='{}'
cold_start_cid3_heat_intentions_timestamp_redis="""cold_start_cid3_heat_intentions"""
cold_start_cid3_heat_intentions_redis="""cold_start_cid3_heat_intentions:{}"""
#########################################################################################################
###etrec召回表,etrec表示训练模型名
kdsp_personalize_recall_etrec_sql="""
select item_ids from offline_recommend.kdsp_personalize_recall_etrec
"""
###swing召回表,swing表示训练模型名
kdsp_personalize_recall_swing_sql="""
select item_ids from offline_recommend.kdsp_personalize_recall_swing
"""
\ No newline at end of file
# -*- coding: utf-8 -*-
from recommend import *
tempresult=defaultdict(list)
a=[{"a":1,"b":11},{"a":1,"b":22},{"a":3,"b":33}]
df=pd.DataFrame(a)
print(df)
t=df.groupby(by=['a']).groups#.__repr__()
print(t,type(t))
#print(temp)
\ No newline at end of file
No preview for this file type
from databaseConn.mysqlOperation import *
from recommend.publicSql import *
from databaseConn import *
git_url="http://git.quantgroup.cn/"
lkb_git_url="http://git.quantgroup.cn"
tjzm_git_url="https://gitlab.tjzimu.com"
git_token='zubux2fMyyp8s8Cys3T6'
git_headers={
"PRIVATE-TOKEN":git_token
}
kafka_host="10.130.9.1:9092,10.130.9.0:9092,10.130.8.254:9092"
\ No newline at end of file
......@@ -8,7 +8,7 @@ jk = jenkins.Jenkins(base_url, username="qiuyue.gui", password="1234567890Yue@")
header = {"Host": "jenkins.quantgroups.cn"}
def buildJob(project, branch, env, jobname="java",prefix_job='ACK'):
def buildJob(project, branch, env,urltype ,jobname="java",prefix_job='ACK'):
'''
:param project:
:param branch:
......@@ -21,7 +21,7 @@ def buildJob(project, branch, env, jobname="java",prefix_job='ACK'):
#print("jobs:",jobs)
jobnames = []
joburls = []
projects_branch = get_project_branch(project)
projects_branch = get_project_branch(project,urltype)
#projects_branch={"kdsp":["master11"]}
temp_branchs=projects_branch.get(project)
if branch not in temp_branchs:
......@@ -43,7 +43,7 @@ def buildJob(project, branch, env, jobname="java",prefix_job='ACK'):
t = jk.build_job(jobname, params)
print("project({0}-{1}-{2}) build succ".format(project, branch, env), t)
def get_project_branch(project):
def get_project_branch(project,urltype):
"""
:param project:
:return: 根据project获取分支信息
......@@ -51,8 +51,14 @@ def get_project_branch(project):
try:
temp_branchs=[]
num=1
if urltype=='tjzm':
git_url=tjzm_git_url
#project_url =tjzm_git_url+'/api/v4/projects?search='+project
else:
git_url=lkb_git_url
project_url =git_url+'/api/v4/projects?search='+project
temp=requests.get(project_url,headers=git_headers)
print(temp.text)
project_name=jsonpath.jsonpath(temp.json(),'$[*].name')
project_id=jsonpath.jsonpath(temp.json(),'$[*].id')
project_info=dict(zip(project_name,project_id))
......@@ -73,12 +79,12 @@ def get_project_branch(project):
traceback.print_exc(3)
def main(project,prefix_job='ACK'):
def main(project,type="",prefix_job='ACK'):
#="nearline-recommend-parent"
project_branch={
"kdsp":{
"branch":"feature-test1-test-202206081010",
"env":'bd',
"branch":"feature-mix-test-yxm-202207201023",
"env":'yxm',
"jobname":"java"},
"nearline-recommend-parent":{
......@@ -87,7 +93,7 @@ def main(project,prefix_job='ACK'):
"jobname":"java"
},
"online-recommend":{
"branch":"master",
"branch":"event_htk_20220714",
"env":'bd',
"jobname":"java"
},
......@@ -100,16 +106,34 @@ def main(project,prefix_job='ACK'):
"branch":"feat/memberDayS3",
"env":'yxm',
"jobname":"ui"
},
"op-api":{
"branch":"feat/no-captcha",
"env":'yxm',
"jobname":"node"
},
"xjfq-ui":{
"branch":"feature/max",
"env":'toc',
"jobname":"ui",
"urltype":"tjzm"
},
"xyqb":{
"branch":"feature-maxmember2",
"env":'toc',
"jobname":"java",
"urltype":"tjzm"
}
}
urltype=project_branch.get(project).get('urltype')
branch=project_branch.get(project).get('branch')
env=project_branch.get(project).get('env')
jobname=project_branch.get(project).get('jobname')
if not (branch and env and jobname):
raise Exception('项目错误,无法构建')
buildJob(project, branch, env, jobname=jobname,prefix_job=prefix_job)
buildJob(project, branch, env,urltype=urltype, jobname=jobname,prefix_job=prefix_job)
if __name__ == '__main__':
project='new-op-optimized-ui'
project="kdsp"
main(project)
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import time
import hmac
import hashlib
import urllib
import base64
import http.client
import requests
import random
import logging
from time import sleep
from rediscluster import RedisCluster
from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("yarn") \
.appName("write_product_correlation_redis.py") \
.config("spark.sql.warehouse.dir", "some-value") \
.config('spark.executor.memory', '8g') \
.config("spark.dynamicAllocation.enabled",'true') \
.config("spark.dynamicAllocation.shuffleTracking.enabled",'true') \
.config("spark.shuffle.service.enabled",'true') \
.config("spark.dynamicAllocation.maxExecutors", 15) \
.config("spark.dynamicAllocation.minExecutors",8) \
.config("spark.executor.cores", "4") \
.enableHiveSupport() \
.getOrCreate()
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
# from sqlalchemy import create_engine
# online_xyqb_recommender_system = create_engine('mysql+pymysql://xyqb_recommender:vxVFCgWTKjYb0xfR@rm-2ze1l8mi94dkd255c.mysql.rds.aliyuncs.com:3306/xyqb_recommender_system?charset=utf8', echo=False)
#
# import warnings
# warnings.filterwarnings('ignore')
#
# write_db_start_time = datetime.now()
# print("关联度表结果开始入库,开始时间:{}".format(write_db_start_time))
# print('')
#
#
# ######################+++++++函数++++++++###########################
# def send_ding(content):
# params = {
# "msgtype": "markdown",
# "markdown": {
# "title": content, # "kdsp排序总表写Redis finished",
# "text": "### " + content + "\n" + "\n @15620699720",
# },
# "at": {
# "atMobiles": [
# "15620699720"
# ],
# "isAtAll": False
# }
# }
# headers = {
# 'Content-Type': 'application/json; charset=utf-8',
# }
# # 计算签名
# timestamp = int(round(time.time() * 1000, 0))
# # timestamp = int((datetime.now() - datetime.strptime('1900-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')).total_seconds())
# secret = 'SEC6bae21828d84137a44520e8312072a5bd82fc8a6b74db787a19386527a3f58ba' # KDSP报警群
# # secret = 'SEC4005efa85f9bba37129a90a9a08287a96707c6dda25f47cd54164e066f6d5ca7' # 尝试群
#
# secret_enc = secret.encode('utf-8')
# string_to_sign = '{}\n{}'.format(timestamp, secret)
# string_to_sign_enc = string_to_sign.encode('utf-8')
# hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
# sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
#
# # KDSP报警群
# durl = 'https://oapi.dingtalk.com/robot/send?access_token=7279f171d1e89d910d2bfad3884fc8f19935e7994346fd6598b7294e88840a7e&timestamp={}&sign={}'.format(
# timestamp, sign)
# # 尝试群
# # durl = 'https://oapi.dingtalk.com/robot/send?access_token=348e0c162fc4d702190f4e26dd0178638c9741f7b384cd8fa9c2e936e0b33a3e&timestamp={}&sign={}'.format(
# # timestamp, sign)
#
# r = requests.post(url=durl, json=params, headers=headers)
# res = r.json()
# if res.get('errcode') == 0:
# return True
# else:
# return False
#
#
# def conredis_test(config):
# redis_nodes = config["REDIS_NODES"]
# redis_expiretime=config["REDIS_EXPIRETIME"] #redis的key的过期时间,单位s
# maxconnections=config["REDIS_MAX_CONNECTIONS"]
# redis_password=config["REDIS_PASSWD"]
# redisClient=None
# try:
# # redisClient = RedisCluster(startup_nodes = redis_nodes, max_connections=maxconnections)
# redisClient = RedisCluster(startup_nodes = redis_nodes, max_connections=maxconnections, password=redis_password)
# except Exception as e:
# # app.logger.error("redis cluster connect error,redis_nodes:"+str(redis_nodes))
# redisClient=None
# return redisClient
#
#
# def conredis_official(config):
# redis_nodes = config["REDIS_NODES"]
# redis_expiretime = config["REDIS_EXPIRETIME"] # redis的key的过期时间,单位s
# maxconnections = config["REDIS_MAX_CONNECTIONS"]
# redis_password = config.get("REDIS_PASSWD", None)
# redisClient = None
# try:
# print("======")
# redisClient = RedisCluster(
# startup_nodes = redis_nodes,
# max_connections=maxconnections,
# skip_full_coverage_check = True
# )
# except Exception as e:
# redisClient = None
# return redisClient
#
#
# def correlation_product_2_redis(df: DataFrame, num_partitions=2, batch_size=50000, sleep_secs=0.2):
# """
# 将 spark sql 的 DataFrame 数据分布批量写入 Redis
# ``df`` 要写入的 spark DataFrame 对象数据。
# ``num_partitions`` int 类型,指定数据的分区数,默认为 2。
# ``batch_size`` int 类型,指定每个 worker 批量执行的数据条数,默认为 500。
# ``sleep_secs`` float 类型,指定每个 worker 批量执行数据后的睡眠时间,单位为秒,默认为 0.2。
# """
#
# # count = df.count()
# # if count == 0:
# # print('df count 0')
# # return
# col_names = df.columns
#
# def _save_2_redis(ite):
#
# config_official_ali = {
# 'REDIS_NODES': [
# {'host': 'r-2ze3ferg4oc0tuqeou.redis.rds.aliyuncs.com', 'port': 6379},
# ],
# 'REDIS_EXPIRETIME': 24 * 7 * 3600,
# 'REDIS_MAX_CONNECTIONS': 50,
# # 'REDIS_PASSWD': 'redis',
# }
# # redis_expiretime_official = config_official["REDIS_EXPIRETIME"]
# redisClient_official_ali = conredis_official(config_official_ali)
#
# pipe_official_ali = redisClient_official_ali.pipeline(transaction=False)
# # 1 天有效期
# ex_official_ali = config_official_ali['REDIS_EXPIRETIME']
#
#
# idx = 1
# for i, row in enumerate(ite, start=1):
# idx = i
# redis_key = 'product_correlation:' + str(row.product_id1)
# fv = {}
# for x in col_names:
# if x != 'product_id2_list':
# continue
# val = getattr(row, x, None)
# if val is not None:
# pipe_official_ali.ltrim(redis_key, -1, 0)
# for i in list(val):
# pipe_official_ali.rpush(redis_key, i)
# # fv[x] = val
# # pipe.hmset(redis_key, fv)
# pipe_official_ali.expire(redis_key, ex_official_ali)
# if i % batch_size == 0:
# pipe_official_ali.execute()
# # 批量提交后,睡眠指定时间,控制写入频率
# sleep(sleep_secs)
# if idx % batch_size != 0:
# # 最后不够 batch_size 的数据批量提交
# pipe_official_ali.execute()
#
# # 分布批量写入
# df.repartition(num_partitions).foreachPartition(_save_2_redis)
#
#
# ############################################ 01 获取相似度表数据 ############################################
# sql = '''
# select product_id1, product_id2, correlation_rank
# from mix_products_correlation
# where correlation_rank <= 100
# -- limit 100
# '''
# mix_prds_correlation = pd.read_sql(sql, con=online_xyqb_recommender_system)
# print('商品数:', len(mix_prds_correlation))
# mix_prds_correlation_df = spark.createDataFrame(mix_prds_correlation)
#
#
# grouped = mix_prds_correlation_df.groupBy("product_id1").agg(collect_list(struct("correlation_rank", "product_id2")).alias("tmp"))
# grouped2 = grouped.select("product_id1", sort_array("tmp")["product_id2"].alias("product_id2_list"))
# correlation_product_2_redis(grouped2, num_partitions=200)
#
# print("关联度表存储redis完成 finished at :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# # send_ding(str(f"关联度表存储redis完成 finished at :") + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
#
# # 近线层关联度表存储redis - Ali
from tools import *
from kafka import KafkaConsumer,KafkaProducer
def kafka_con():
pass
if __name__ == '__main__':
pass
\ No newline at end of file
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import random
import logging
from time import sleep
from rediscluster import RedisCluster
from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("yarn") \
.appName("write_product_similarity_redis.py") \
.config("spark.sql.warehouse.dir", "some-value") \
.config('spark.executor.memory', '8g') \
.config("spark.dynamicAllocation.enabled",'true') \
.config("spark.dynamicAllocation.shuffleTracking.enabled",'true') \
.config("spark.shuffle.service.enabled",'true') \
.config("spark.dynamicAllocation.maxExecutors", 15) \
.config("spark.dynamicAllocation.minExecutors",8) \
.config("spark.executor.cores", "4") \
.enableHiveSupport() \
.getOrCreate()
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
from sqlalchemy import create_engine
online_xyqb_recommender_system = create_engine('mysql+pymysql://xyqb_recommender:vxVFCgWTKjYb0xfR@rm-2ze1l8mi94dkd255c.mysql.rds.aliyuncs.com:3306/xyqb_recommender_system?charset=utf8', echo=False)
import warnings
warnings.filterwarnings('ignore')
write_db_start_time = datetime.now()
print("相似度表结果开始入库,开始时间:{}".format(write_db_start_time))
print('')
def conredis_test(config):
redis_nodes = config["REDIS_NODES"]
redis_expiretime=config["REDIS_EXPIRETIME"] #redis的key的过期时间,单位s
maxconnections=config["REDIS_MAX_CONNECTIONS"]
redis_password=config["REDIS_PASSWD"]
redisClient=None
try:
# redisClient = RedisCluster(startup_nodes = redis_nodes, max_connections=maxconnections)
redisClient = RedisCluster(startup_nodes = redis_nodes, max_connections=maxconnections, password=redis_password)
except Exception as e:
# app.logger.error("redis cluster connect error,redis_nodes:"+str(redis_nodes))
redisClient=None
return redisClient
def conredis_official(config):
redis_nodes = config["REDIS_NODES"]
redis_expiretime = config["REDIS_EXPIRETIME"] # redis的key的过期时间,单位s
maxconnections = config["REDIS_MAX_CONNECTIONS"]
redis_password = config.get("REDIS_PASSWD", None)
redisClient = None
try:
print("======")
redisClient = RedisCluster(
startup_nodes = redis_nodes,
max_connections=maxconnections,
skip_full_coverage_check = True
)
except Exception as e:
redisClient = None
return redisClient
def similarity_product_2_redis(df: DataFrame, num_partitions=2, batch_size=50000, sleep_secs=0.2):
"""
将 spark sql 的 DataFrame 数据分布批量写入 Redis
``df`` 要写入的 spark DataFrame 对象数据。
``num_partitions`` int 类型,指定数据的分区数,默认为 2。
``batch_size`` int 类型,指定每个 worker 批量执行的数据条数,默认为 500。
``sleep_secs`` float 类型,指定每个 worker 批量执行数据后的睡眠时间,单位为秒,默认为 0.2。
"""
# count = df.count()
# if count == 0:
# print('df count 0')
# return
col_names = df.columns
def _save_2_redis(ite):
config_official_ali = {
'REDIS_NODES': [
{'host': 'r-2ze3ferg4oc0tuqeou.redis.rds.aliyuncs.com', 'port': 6379},
],
'REDIS_EXPIRETIME': 24 * 7 * 3600,
'REDIS_MAX_CONNECTIONS': 50,
# 'REDIS_PASSWD': 'redis',
}
# redis_expiretime_official = config_official["REDIS_EXPIRETIME"]
redisClient_official_ali = conredis_official(config_official_ali)
pipe_official_ali = redisClient_official_ali.pipeline(transaction=False)
# 1 天有效期
ex_official_ali = config_official_ali['REDIS_EXPIRETIME']
idx = 1
for i, row in enumerate(ite, start=1):
idx = i
redis_key = 'product_similarity:' + str(row.product_id1)
fv = {}
for x in col_names:
if x != 'product_id2_list':
continue
val = getattr(row, x, None)
if val is not None:
pipe_official_ali.ltrim(redis_key, -1, 0)
for i in list(val):
pipe_official_ali.rpush(redis_key, i)
# fv[x] = val
# pipe.hmset(redis_key, fv)
pipe_official_ali.expire(redis_key, ex_official_ali)
if i % batch_size == 0:
pipe_official_ali.execute()
# 批量提交后,睡眠指定时间,控制写入频率
sleep(sleep_secs)
if idx % batch_size != 0:
# 最后不够 batch_size 的数据批量提交
pipe_official_ali.execute()
# 分布批量写入
df.repartition(num_partitions).foreachPartition(_save_2_redis)
############################################ 01 获取相似度表数据 ############################################
sql = '''
select product_id1, product_id2, similarity_rank
from mix_products_similarity
where similarity_rank <= 100
-- and valid_code=1
-- limit 100
'''
mix_prds_sim = pd.read_sql(sql, con=online_xyqb_recommender_system)
print('商品数:', len(mix_prds_sim))
mix_prds_sim_df = spark.createDataFrame(mix_prds_sim)
grouped = mix_prds_sim_df.groupBy("product_id1").agg(collect_list(struct("similarity_rank", "product_id2")).alias("tmp"))
grouped2 = grouped.select("product_id1", sort_array("tmp")["product_id2"].alias("product_id2_list"))
similarity_product_2_redis(grouped2, num_partitions=200)
# 近线层redis写相似表 - Ali
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment