基于火山引擎云搜索的混合搜索实战
-
首先是不同类型查询的评分并不在同一个可比较的维度,因此不能直接进行简单的算术计算。
-
其次是在分布式检索系统中,评分通常在分片级别,需要对所有分片的评分进行全局归一化处理。
-
查询阶段:使用混合查询子句进行 Keyword Search 和 Semantic Search。
-
评分归一化和合并阶段,该阶段在查询阶段之后。
-
由于每种查询类型都会提供不同范围的评分,该阶段对每一个查询子句的评分结果执行归一化操作,支持的归一化方法有 min_max、l2、rrf。
-
对归一化后的评分进行组合,组合方法有 arithmetic_mean、geometric_mean、harmonic_mean。
-
-
根据组合后的评分对文档重新排序并返回给用户。
实现思路
-
全文检索引擎
-
向量检索引擎
-
用于向量 Embedding 的机器学习模型
-
将文本、音频、视频等数据转化成向量的数据管道
-
融合排序
-
配置创建相关对象
-
Ingestion Pipeline:支持自动调用模型把图片转换向量并存到索引中
-
Search Pipeline:支持把文本查询语句自动转换成向量以便进行相似度计算
-
k-NN索引:存放向量的索引
-
-
将图像数据集数据写入 OpenSearch 实例,OpenSearch 会自动调用机器学习模型将文本转为 Embedding 向量。
-
Client 端发起混合搜索查询时,OpenSearch 调用机器学习模型将传入的查询转为 Embedding 向量。
-
OpenSearch 执行混合搜索请求处理,组合 Keyword Seach 和 Semantic Seach 的评分,返回搜索结果。
方案实战
环境准备
-
登录火山引擎云搜索服务(https://console.volcengine.com/es),创建实例集群,版本选择 OpenSearch 2.9.0。
-
待实例创建完毕,启用 AI 节点。
-
在模型选择上,可以创建自己的模型,也可以选择公共模型。这里我们选择 公共模型,完成配置后,点击 立即启动。
数据集准备
操作步骤
安装 Python 依赖
pip install -U elasticsearch7==7.10.1
pip install -U pandas
pip install -U jupyter
pip install -U requests
pip install -U s3fs
pip install -U alive_progress
pip install -U pillow
pip install -U ipython
连接到 OpenSearch
# Prepare opensearch info
from elasticsearch7 import Elasticsearch as CloudSearch
from ssl import create_default_context
# opensearch info
opensearch_domain = '{{ OPENSEARCH_DOMAIN }}'
opensearch_port = '9200'
opensearch_user = 'admin'
opensearch_pwd = '{{ OPENSEARCH_PWD }}'
# remote config for model server
model_remote_config = {
"method": "POST",
"url": "{{ REMOTE_MODEL_URL }}",
"params": {},
"headers": {
"Content-Type": "application/json"
},
"advance_request_body": {
"model": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
}
}
# dimension for knn vector
knn_dimension = 384
# load cer and create ssl context
ssl_context = create_default_context(cafile='./ca.cer')
# create CloudSearch client
cloud_search_cli = CloudSearch([opensearch_domain, opensearch_port],
ssl_context=ssl_context,
scheme="https",
http_auth=(opensearch_user, opensearch_pwd)
)
# index name
index_name = 'index-test'
# pipeline id
pipeline_id = 'remote_text_embedding_test'
# search pipeline id
search_pipeline_id = 'rrf_search_pipeline_test'
-
填入 OpenSearch 链接地址和用户名密码信息。 model_remote_config 是远程机器学习模型连接配置,可在模型 调用信息查看,将 调用信息中的 remote_config 配置全部复制到 model_remote_config。
-
在 实例信息-> 服务访问部分,下载证书到当前目录。
-
给定索引名称、Pipeline ID 和 Search Pipeline ID。
创建 Ingest Pipeline
# Create ingest pipeline
pipeline_body = {
"description": "text embedding pipeline for remote inference",
"processors": [{
"remote_text_embedding": {
"remote_config": model_remote_config,
"field_map": {
"caption": "caption_embedding"
}
}
}]
}
# create request
resp = cloud_search_cli.ingest.put_pipeline(id=pipeline_id, body=pipeline_body)
print(resp)
创建 Search Pipeline
-
归一化方法:
min_max,l2,rrf -
加权求和方法:
arithmetic_mean,geometric_mean,harmonic_mean
# Create search pipeline
import requests
search_pipeline_body = {
"description": "post processor for hybrid search",
"request_processors": [{
"remote_embedding": {
"remote_config": model_remote_config
}
}],
"phase_results_processors": [ # normalization and combination
{
"normalization-processor": {
"normalization": {
"technique": "rrf", # the normalization technique in the processor is set to rrf
"parameters": {
"rank_constant": 60 # param
}
},
"combination": {
"technique": "arithmetic_mean", # the combination technique is set to arithmetic mean
"parameters": {
"weights": [
0.4,
0.6
]
}
}
}
}
]
}
headers = {
'Content-Type': 'application/json',
}
# create request
resp = requests.put(
url="https://" + opensearch_domain + ':' + opensearch_port + '/_search/pipeline/' + search_pipeline_id,
auth=(opensearch_user, opensearch_pwd),
json=search_pipeline_body,
headers=headers,
verify='./ca.cer')
print(resp.text)
创建 k-NN 索引
-
将事先创建好的 Ingest Pipeline 配置到 index.default_pipeline 字段中;
-
同时,配置 properties,将 caption_embedding 设置为 knn_vector,这里使用 faiss 中的 hnsw。
# Create k-NN index
# create index and set settings, mappings, and properties as needed.
index_body = {
"settings": {
"index.knn": True,
"number_of_shards": 1,
"number_of_replicas": 0,
"default_pipeline": pipeline_id # ingest pipeline
},
"mappings": {
"properties": {
"image_url": {
"type": "text"
},
"caption_embedding": {
"type": "knn_vector",
"dimension": knn_dimension,
"method": {
"engine": "faiss",
"space_type": "l2",
"name": "hnsw",
"parameters": {}
}
},
"caption": {
"type": "text"
}
}
}
}
# create index
resp = cloud_search_cli.indices.create(index=index_name, body=index_body)
print(resp)
加载数据集
# Prepare dataset
import pandas as pd
import string
appended_data = []
for character in string.digits[0:] + string.ascii_lowercase:
if character == '1':
break
try:
meta = pd.read_json("s3://amazon-berkeley-objects/listings/metadata/listings_" + character + ".json.gz",
lines=True)
except FileNotFoundError:
continue
appended_data.append(meta)
appended_data_frame = pd.concat(appended_data)
appended_data_frame.shape
meta = appended_data_frame
def func_(x):
us_texts = [item["value"] for item in x if item["language_tag"] == "en_US"]
return us_texts[0] if us_texts else None
meta = meta.assign(item_name_in_en_us=meta.item_name.apply(func_))
meta = meta[~meta.item_name_in_en_us.isna()][["item_id", "item_name_in_en_us", "main_image_id"]]
print(f"#products with US English title: {len(meta)}")
meta.head()
image_meta = pd.read_csv("s3://amazon-berkeley-objects/images/metadata/images.csv.gz")
dataset = meta.merge(image_meta, left_on="main_image_id", right_on="image_id")
dataset.head()
上传数据集
# Upload dataset
import json
from alive_progress import alive_bar
cnt = 0
batch = 0
action = json.dumps({"index": {"_index": index_name}})
body_ = ''
with alive_bar(len(dataset), force_tty=True) as bar:
for index, row in (dataset.iterrows()):
if row['path'] == '87/874f86c4.jpg':
continue
payload = {}
payload['image_url'] = "https://amazon-berkeley-objects.s3.amazonaws.com/images/small/" + row['path']
payload['caption'] = row['item_name_in_en_us']
body_ = body_ + action + "\n" + json.dumps(payload) + "\n"
cnt = cnt + 1
if cnt == 100:
resp = cloud_search_cli.bulk(
request_timeout=1000,
index=index_name,
body=body_)
cnt = 0
batch = batch + 1
body_ = ''
bar()
print("Total Bulk batches completed: " + str(batch))
混合搜索查询
match 查询,一个是 remote_neural 查询。查询时将事先创建好的 Search Pipeline 指定为查询参数,Search Pipeline 会将传入的文本转为向量,存储到 caption_embedding 字段,用于后续查询。
# Search with search pipeline
from urllib import request
from PIL import Image
import IPython.display as display
def search(text, size):
resp = cloud_search_cli.search(
index=index_name,
body={
"_source": ["image_url", "caption"],
"query": {
"hybrid": {
"queries": [
{
"match": {
"caption": {
"query": text
}
}
},
{
"remote_neural": {
"caption_embedding": {
"query_text": text,
"k": size
}
}
}
]
}
}
},
params={"search_pipeline": search_pipeline_id},
)
return resp
k = 10
ret = search('shoes', k)
for item in ret['hits']['hits']:
display.display(Image.open(request.urlopen(item['_source']['image_url'])))
print(item['_source']['caption'])
混合搜索展示
以上就是以图像搜索应用为例,介绍如何借助火山引擎云搜索服务的解决方案快速开发一个混合搜索应用的实战过程,欢迎大家登陆火山引擎控制台操作!
火山引擎云搜索服务兼容 Elasticsearch、Kibana 等软件及常用开源插件,提供结构化、非结构化文本的多条件检索、统计、报表,可以实现一键部署、弹性扩缩、简化运维,快速构建日志分析、信息检索分析等业务能力。
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
openGemini v1.2.0版本正式发布,IoT 场景性能大幅提升!
本文分享自华为云社区《openGemini v1.2.0版本正式发布,IoT 场景性能大幅提升!》,作者:华为云开源。 在openGemini v1.2.0版本中,我们为您带来了一系列令人振奋的内核优化,将您的体验提升到新的高度,这包括 针对IoT场景的性能优化,查询效率有极大的提升。 针对数据存储的优化,进一步节约磁盘空间,降低数据存储成本。 针对部分功能的优化,比如show tag keys,stream等,使得功能更加丰富。 新增了一部分内核的监控指标,进一步清楚了解内核的运行状态、行为和性能,帮助分析、定位和优化数据库性能。 此外,我们还进行了一系列的读写流程上的改进和修复,以提供更稳定和可靠的体验。 内核优化 IoT典型场景性能优化 本次版本针对IoT场景的写入和典型查询场景进行了优化,通过TSBS工具的测试结果可以看出,openGemini全面超越InfluxDB,最高提升了50倍。相比其他时序数据库,openGemini也是具有领先优势。 具体性能数据参考: https://docs.opengemini.org/zh/guide/introduction/perform...
-
下一篇
Hugging Face 与 Wiz Research 合作提高人工智能安全性
我们很高兴地宣布,我们正在与 Wiz 合作,目标是提高我们平台和整个 AI/ML 生态系统的安全性。 Wiz 研究人员 与 Hugging Face 就我们平台的安全性进行合作并分享了他们的发现。Wiz 是一家云安全公司,帮助客户以安全的方式构建和维护软件。随着这项研究的发布,我们将借此机会重点介绍一些相关的 Hugging Face 安全改进。 Wiz 与 Hugging Face 合作的更多信息 https://www.wiz.io/blog/wiz-and-hugging-face-address-risks-to-ai-infrastruct Hugging Face 最近集成了 Wiz 进行漏洞管理,这是一个持续主动的流程,可确保我们的平台免受安全漏洞的影响。此外,我们还使用 Wiz 进行云安全态势管理 (CSPM),它使我们能够安全地配置云环境并进行监控以确保其安全。 我们最喜欢的 Wiz 功能之一是从存储到计算再到网络的漏洞的整体视图。我们运行多个 Kubernetes (k8s) 集群,并拥有跨多个区域和云提供商的资源,因此在单个位置拥有包含每个漏洞的完整上下文图的中央...
相关文章
文章评论
共有0条评论来说两句吧...

微信收款码
支付宝收款码