网络部署上线过程记录
网络部署上线过程记录
以推荐相似单品项目为例:reco-similar-product
项目目录
Dockerfile
Jenkinsfile
README.md
config
deploy.sh
deployment.yaml
index
offline
recsys
requirements.txt
service
stat
可以看到,目录中有许多文件,这里简单介绍下各个目录的作用:
-
Dockerfile:Docker部署脚本; -
Jenkinsfile:Jenkinsfile 是一个文本文件,它包含了 Jenkins 流水线的定义并被检入源代码控制仓库; -
README.md: 项目介绍; - config:存放配置文件的位置;
-
deploy.sh:在Docker上部署的脚本; -
deployment.yaml:k8-s配置文件; - index:存放index数据的位置,这里用来做ctr平滑;
- offline:离线部分的代码;
- recsys:存放一些通用调用模块
-
requirements.txt:项目所依赖的包; - service:服务器部分的代码;
- stat:相关统计脚本
详细介绍
Dockerfile
FROM tools/python:3.6.3
ADD . /usr/local/reco_similar_product_service
WORKDIR /usr/local/reco_similar_product_service
EXPOSE 5001
# build image
RUN mkdir -p /data/reco_similar_product_service/log
RUN bash deploy.sh build
# launch image
CMD bash deploy.sh launch
-
from tools/python:3.6.3: 从此处获取python镜像包 -
ADD . /usr/local/reco_similar_product_service:ADD指令的功能是将主机构建环境(上下文)目录中的文件和目录、以及一个URL标记的文件 拷贝到镜像中。把文件放在该文件中 -
WORKDIR /usr/local/reco_similar_product_service:指定工作目录 -
EXPOSE 5000:暴露端口号,声明端口 -
RUN mkdir -p /data/algoreco_simproduct_service/log:设置日志文件地址 ,与后面的log日志地方保持一致 -
RUN bash deploy.sh build:调用deploy.sh部署 -
CMD bash deploy.sh launch:启动镜像
Jenkinsfile
node('jenkins-jnlp') {
stage("Clone") {
checkout scm
}
stage("Tag") {
script {
tag = sh(returnStdout: true, script: 'git rev-parse --short HEAD').trim()
}
}
stage("Push Docker Image") {
sh "sed -i 's@TAG@${tag}@g' deployment.yaml"
withDockerRegistry(credentialsId: '', url: '') {
sh "docker build -t reco-similar-product:${tag} ."
sh "docker push reco-similar-product:${tag}"
}
}
stage("Deployment") {
sh "kubectl apply -f deployment.yaml"
}
}
上述是 Jenkins 流水线的几个过程,克隆特定修订版本(Clone),获取tag, 创建镜像(build),将本地的镜像上传到镜像仓库(push),对文件或标准输入流更改资源应用配置(Deployment)
-
jenkins-jnlp: Jenkins的Agent大概分两种。 一是基于SSH的,需要把Master的SSH公钥配置到所有的Agent宿主机上去。 二是基于JNLP的,走HTTP协议,每个Agent需要配置一个独特的密码。 - stage('Clone'): stage 指令在 stages 部分进行,应该包含一个 实际上, 流水巷所做的所有实际工作都将封装进一个或多个 stage 指令中。
-
checkout scm: checkout 步骤将会从源代码控制中检出代码;scm 是一个特殊的变量, 它指示 checkout 步骤克隆触发流水线运行的特定修订版本; -
docker build和docker push:创建镜像和上传镜像 -
kubectl apply -f deployment.yaml: 对文件或标准输入流更改资源应用配置
deploy.sh
#!/bin/bash
# Dependencies in $PATH (in base docker image):
# - python 3.6.3
# - pip for this python
build()
{
# prepare env if there's no docker
# ENV_DIR="venv"
# pip install virtualenv
# virtualenv --no-site-packages $ENV_DIR
# source $ENV_DIR/bin/activate
pip install -r requirements.txt \
-i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
python setup.py build_ext --inplace
}
launch()
{
# start web container
cd service/
bash run.sh start
}
if [ x"$1" == "xbuild" ]; then
echo "build"
build
fi
if [ x"$1" == "xlaunch" ]; then
echo "launch"
launch
-
build: 安装相应的依赖包; -
launch: 启动服务;
deployment.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
k8s-app: reco-similar-product-dev
name: reco-similar-product-dev
namespace: dev
spec:
minReadySeconds: 5
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
replicas: 1
selector:
matchLabels:
k8s-app: reco-similar-product-dev
template:
metadata:
labels:
k8s-app: reco-similar-product-dev
spec:
serviceAccountName: dev
nodeSelector:
environment: dev
imagePullSecrets:
- name: dev
containers:
- name: reco-similar-product-dev
image: develop/reco-similar-product:TAG
env:
- name: RECO_SIM_TEST
value: "1"
ports:
- containerPort: 5000
protocol: TCP
volumeMounts:
- name: reco-similar-product-dev-log
subPath: reco-similar-product-dev-log
mountPath: /data/reco_similar_product_service/log
securityContext:
fsGroup: 1000
volumes:
- name: reco-similar-product-dev-log
persistentVolumeClaim:
claimName: dev-logs
---
kind: Service
apiVersion: v1
metadata:
labels:
k8s-app: reco-similar-product-dev
name: reco-similar-product-dev
namespace: dev
spec:
ports:
- port: 5000
targetPort: 5000
selector:
k8s-app: reco-similar-product-dev
- dev: 表示测试环境
- port: 端口号
- app: 服务名
- mountPath:日志文件路径
config
该目录主要是存放一些数据库,路径配置等配置文件
recsys
放置一些通用模块,比如基础模块:数据库连接等模块, 服务端的核心处理逻辑代码也可以放在这里
offline
离线部分代码,主要是服务功能的主逻辑部分代码。对于推荐相似单品而言,就是要去生成相应的数据,写入到表里。代码的作用是,处理数据,离线计算处理得到一份结果,将结果写入到数据库中。
#!/usr/bin/env python
# fileUsing: reco some pid from sale's pids
# retrieve pids by cv feature
# filt by type, alive
import os
import sys
import oss2
import json
import yiss
import numpy as np
from operator import itemgetter
from datetime import datetime
MODULE_REAL_DIR = os.path.dirname(os.path.realpath(__file__))
DATA_PATH = MODULE_REAL_DIR + '/../data'
CONFIG_PATH = '../../../config/'
class SearchItem():
def __init__(self, feat=None, pid=None, p_type=None, score=None):
'''
create item struct
every pid contain:
feat: cv feature
pid: product id
p_type: type
score: similar score
'''
self.feat = feat
self.pid = pid
self.p_type = p_type
self.score = score
class ImgSearch(SearchItem):
def __init__(self):
'''
retrieve mode:
ANN: Approximate Nearest Neighbor
'''
# prepare cv feature dataset
self.product_feature = {}
feature_path = DATA_PATH + '/product_cv_feature.json'
with open(feature_path, 'r') as f:
for line in f:
pid, _, feat = line.rstrip('\n').split('\t')
self.product_feature[int(pid)] = json.loads(feat) # change key type into int
# -> init build index and save index <-
self.index = yiss.VectorIndex(
data_file=feature_path, series='IDMap, Flat', suff_len=1, search_width=1)
self.index.init(save_file=MODULE_REAL_DIR + '/../data/vector.index')
# init ImageUtils
self.ImgUtil = yiss.ImageUtils(CONFIG_PATH + '/config.json')
# load tags: type, alive, color
tag_path = DATA_PATH + "/pid_tags"
self.pid_type = {}
self.pid_alive = {}
with open(tag_path, 'r') as f:
for line in f:
pid, type_top_id, is_alive = line.rstrip('\n').split('\t')
self.pid_type[pid] = type_top_id
self.pid_alive[pid] = is_alive
# init retrieve length
self.RETRIEVE_LEN = 1000
# init table length
self.TABLE_LEN = 64
def retrieve(self, input_item):
"""
search similar feature pids by faiss
"""
similar_items = []
if input_item.feat == []:
return similar_items # no feat, could not compare
feat = np.array([input_item.feat.tolist()])
feat = feat.astype(np.float32) # change feat into the faiss needed type
ds, ids = self.index.knn_search(feat, self.RETRIEVE_LEN)
for pid, distance in zip(ids, ds):
if pid != str(input_item.pid): # ignore itself
similar_items.append({"pid": pid, "score": round(distance, 4)})
return sorted(similar_items, key=lambda e: e.__getitem__('score'))
def build_item(self, pid):
feat = np.array(self.product_feature[pid]) if pid in self.product_feature else []
pid = str(pid) # pid type and color dict key type is <string>
pid_type = self.pid_type[pid] if pid in self.pid_type else self.ImgUtil.get_type_id(pid)
return SearchItem(feat, int(pid), pid_type)
def filter_pid(self, input_item, retrieve_items):
"""
filter by type, alive
"""
match_result = []
not_match_result = []
finally_match_result = []
input_pid = input_item.pid
input_type = input_item.p_type
for item in retrieve_items:
pid, score = str(item["pid"]), item["score"] # change pid type from int into string
p_type = self.pid_type[pid] if pid in self.pid_type else\
self.ImgUtil.get_type_id(pid)
p_alive = self.pid_alive[pid] if pid in self.pid_alive else "0"
item = {"pid": int(pid), "score": score, "path": "sVSM"}
if p_alive == "1":
if input_type == p_type:
match_result.append(item)
if len(match_result) >= self.TABLE_LEN:
return match_result
finally_result = match_result
return finally_result[:self.TABLE_LEN]
def search(self, pid):
"""
main function, input pid and search similar pids
"""
pid = int(pid)
# build input item
pid_item = self.build_item(pid)
# print("pid item:", pid_item.pid, pid_item.p_type, pid_item.feat)
if len(pid_item.feat) == 0: # feat is null, ignore this pid
return []
# retrieve
retrieve_item = self.retrieve(pid_item)
# filter by some condition
match_result = self.filter_pid(pid_item, retrieve_item)
return match_result
if __name__ == "__main__":
img_search = ImgSearch()
pid = sys.argv[1]
match_result = img_search.search(pid)
if len(match_result) > 0:
match_result_pid = [int(item["pid"]) for item in match_result]
print(match_result_pid)
这里先用Ann依照特征相似度进行召回,然后根据类别进行过滤,最后将结果保存下来。获得结果后上传到数据库中。
service
服务端目录
-
gconfig.py: gunicorn配置文件 -
recoservice.py: 使用flask提供线上服务接口 -
run.sh: 启动或关停服务 -
ping.sh:
gconfig.py :
import multiprocessing
bind = '0.0.0.0:5001' # 绑定ip和端口号
daemon = False
debug = False
worker_class = 'sync'
workers = multiprocessing.cpu_count() # 进程数
pidfile = 'project.pid'
log_path = "/data/algoreco_simproduct_service/log/" # 日志文件 保证与之前的位置一样
timeout = 300
graceful_timeout = 300
# http://docs.gunicorn.org/en/stable/settings.html
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" "%({xff}i)s"'
logconfig_dict = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
"gunicorn.error": {
"level": "INFO",
"handlers": ["error_file"],
"propagate": 0,
"qualname": "gunicorn.error"
},
"gunicorn.access": {
"level": "INFO",
"handlers": ["access_file"],
"propagate": 0,
"qualname": "gunicorn.access"
}
},
'handlers': {
"error_file": {
"class": "cloghandler.ConcurrentRotatingFileHandler",
"maxBytes": 1024 * 1024 * 1024,
"backupCount": 5,
"formatter": "generic",
"filename": log_path + "gunicorn_error.log",
},
"access_file": {
"level": "INFO",
"handlers": ["access_file"],
"propagate": 0,
"qualname": "gunicorn.access"
}
},
'handlers': {
"error_file": {
"class": "cloghandler.ConcurrentRotatingFileHandler",
"maxBytes": 1024 * 1024 * 1024,
"backupCount": 5,
"formatter": "generic",
"filename": log_path + "gunicorn_error.log",
},
"access_file": {
"class": "cloghandler.ConcurrentRotatingFileHandler",
"maxBytes": 1024 * 1024 * 1024,
"backupCount": 5,
"formatter": "generic",
"filename": log_path + "gunicorn_access.log",
}
},
'formatters': {
"generic": {
"format": "%(asctime)s [%(process)d] %(levelname)s"
+ " [%(filename)s:%(lineno)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S]",
"class": "logging.Formatter"
},
"access": {
"format": "[%(asmidnighttime)s] [%(process)d] %(levelname)s"
+ " [%(filename)s:%(lineno)s] %(message)s",
"class": "logging.Formatter"
}
}
}
-
worker_class: 工作进程类型, 有sync(默认), eventlet, gevent, or tornado, gthread, gaiohttp; -
workers: 工作进程的数量,通常推荐的worker数量是:(2 x $num_cores) + 1 -
log-level LEVEL: 输出error log的颗粒度,有效的LEVEL有: debug, info, warning, error, critical
recoservice.py :
import logging
import json
from flask import Flask, Response, request
from recsys.sale.reco import RecoSaleSim
from recsys.base.exception import ClientException
from recsys.base.recologging import HeartBeatFilter
app = Flask(__name__)
class RecoSimSvr():
def __init__(self):
self.SUCCESS_CODE = 200
self.CLIENT_ERROR_CODE = 400
self.SERVER_ERROR_CODE = 500
# init recsys module
self.rss = RecoSaleSim()
app.logger.info('Init done.')
def get_recos(self):
error_results = {}
results = {}
try:
pid = request.args.get('productId', type=int)
if not pid:
raise ClientException('Param missing: productId')
uid = request.args.get('uid', type=int)
if not uid:
raise ClientException('Param missing: uid')
device_id = request.args.get('deviceId', type=str)
if not device_id:
raise ClientException('Param missing: deviceId')
reco_len = request.args.get('maxLength', type=int, default=16)
reco_items = self.rss.get_reco_items(pid, reco_len)
# Output response
results['data'] = reco_items
results['code'] = self.SUCCESS_CODE
results['msg'] = 'Success'
return Response(json.dumps(results), status=200, mimetype='application/json')
except ClientException as err:
app.logger.error(str(err), exc_info=True)
error_results['code'] = self.CLIENT_ERROR_CODE
error_results['msg'] = str(err)
return Response(json.dumps(error_results), status=400, mimetype='application/json')
except Exception as err:
app.logger.error(str(err), exc_info=True)
error_results['code'] = self.SERVER_ERROR_CODE
error_results['msg'] = 'Program error!'
return Response(json.dumps(error_results), status=500, mimetype='application/json')
def check_alive(self):
results = {}
results['code'] = 200
results['msg'] = 'I\'m Sale Similar Reco'
return Response(json.dumps(results), status=200, mimetype='application/json')
# setup logger
gunicorn_access_logger = logging.getLogger('gunicorn.access')
gunicorn_access_logger.addFilter(HeartBeatFilter())
app.logger.handlers = gunicorn_access_logger.handlers
app.logger.setLevel(gunicorn_access_logger.level)
# init main class
reco_sim_svr = RecoSimSvr()
# add url rule
app.add_url_rule('/simproduct/', view_func=reco_sim_svr.check_alive, methods=['GET'])
app.add_url_rule('/simproduct/getSaleSimRecos', view_func=reco_sim_svr.get_recos, methods=['GET'])
app.logger.info('All Init Done.')
run.sh:
#!/bin/bash
boot_dir=../
start()
{
gunicorn --chdir $boot_dir --config gconfig.py service.recoservice:app
}
stop()
{
pid_file=${boot_dir}/project.pid
if [ ! -f $pid_file ];then
echo "Cannot find pid file project.pid. Maybe the service is not running."
return 255
fi
pid=`cat $pid_file`
kill -15 $pid
}
if [ $1"x" == "startx" ];then
start
fi
if [ $1"x" == "stopx" ];then
stop
fi
if [ $1"x" == "restartx" ];then
stop
echo "wait for stopping..."
sleep 5
start
fi
ping.sh:
#!/usr/bin/env bash
if [ $# -lt 1 ]; then
echo "Usage: bash $0 {online|mock|test|single|local}"
exit -1
fi
SENV=$1
BASE_DOMAIN=xxxx.com
if [ $SENV"x" == "testx" ]; then
DOMAIN="test"$BASE_DOMAIN
elif [ $SENV"x" == "onlinex" ]; then
DOMAIN=$BASE_DOMAIN
elif [ $SENV"x" == "singlex" ]; then
DOMAIN=x.x.x.x
PORT=5001
elif [ $SENV"x" == "localx" ]; then
DOMAIN=0.0.0.0
PORT=5001
elif [ $SENV"x" == "mockx" ]; then
DOMAIN="mock"$BASE_DOMAIN
fi
device='TEST85EA5969-4398-4A78-8816-348E189'
run_once()
{
curl -X GET "http://$DOMAIN:$PORT/simproduct/"; echo
curl -X GET "http://$DOMAIN:$PORT/simproduct/getSaleSimRecos?uid=1&deviceId=$device&productId=51953&maxLength=16"; echo
}
basic_test()
{
# test if success
echo "test service..."
run_once
}
basic_test
stat
统计脚本放的相关代码

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gcNptYH1-1575459559896)(/Users/duanzhicheng/Library/Application Support/typora-user-images/image-20191204152952921.png)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gcNptYH1-1575459559896)(/Users/duanzhicheng/Library/Application Support/typora-user-images/image-20191204152952921.png)]](https://img-blog.csdnimg.cn/20191204194021382.png)

