手把手系列!用 Milvus 和 Python 搭建电影推荐系统
推荐系统(推荐引擎)是根据用户行为和兴趣点等信息去预测并推送用户当前需要或感兴趣的物品(服务)的一类应用。常见推荐系统包括电影、书籍、音乐或新闻文章推荐系统等。
基于不同的算法或技术,推荐系统有很多种类型,如协同过滤算法(Collaborative Filtering)推荐系统、基于内容(Content-Based)的推荐系统、混合推荐系统和基于向量的推荐系统。其中,基于向量的推荐系统使用向量空间来寻找(即推荐)数据库中最相似的产品或内容。而存储向量数据最有效的方法便是使用 Milvus 这样全球领先的向量数据库。
本文将介绍如何使用 Milvus 和 Python 搭建电影推荐系统。在搭建过程中,我们会使用 SentenceTransformers 将文本信息转换为向量,并将这些向量存储在 Milvus 中。搭建完成后,用户便可输入描述并在推荐系统中搜索到相似的电影。如需本教程中的所有代码,参考:
-
GitHub上的 Milvus Bootcamp 仓库
01.设置环境
开始前,请先安装:
-
Python 3.x
-
Python Package Manager (PIP)
-
Jupyter Notebook
-
Docker
-
至少 32 GB RAM 的硬件系统或 Zilliz Cloud 账号
使用Python安装所需工具和软件
$ python -m pip install pymilvus pandas sentence_transformers kaggle
向量数据库 Milvus
本教程中我们将用 Milvus 存储由电影信息转化而来的 Embedding 向量。由于使用到的数据集较大,推荐大家创建 Zilliz Cloud 集群来存储向量数据库。但如果仍想安装本地实例,可下载 docker-compose 配置文件并运行文件。
$ wget https://github.com/milvus-io/milvus/releases/download/v2.3.0/milvus-standalone-docker-compose.yml -O docker-compose.yml $ docker-compose up -d
一切准备就绪后,便可以搭建电影推荐系统啦!
02.准备并预处理数据
首先,我们选择用 Kaggle上的电影数据集,该数据集包含 45000 部电影的元数据信息。大家可以直接下载数据集或通过 Python 使用 Kaggle API 下载数据集。如需通过 Python 下载,请先在 Kaggle.com 的 Profile 中下载 kaggle.json 文件。注意!一定要将此文件存储在 API 可以获取的路径中。
接着,设置环境变量以验证 Kaggle 身份。打开 Jupyter Notebook 输入以下代码:
%env KAGGLE_USERNAME=username %env KAGGLE_KEY=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX %env TOKENIZERS_PARALLELISM=true
用 Kaggle 的 Python 依赖下载 Kaggle 上的电影数据集:
import kaggle dependency import kaggle kaggle.api.authenticate() kaggle.api.dataset_download_files('rounakbanik/the-movies-dataset', path='dataset', unzip=True)
数据集下载完成后,使用 pandas 的 read_csv()
读取数据集数据:
import pandas import pandas as pd read csv data movies=pd.read_csv('dataset/movies_metadata.csv',low_memory=False) check shapeof data movies.shape
上图展示了读取到 45466 条电影元数据。每条电影数据中包含 24 列。使用以下命令查看所有列的信息:
check column names movies.columns
搭建电影推荐系统过程中,不需要使用到所有的列。通过以下代码筛选我们需要的列:
filter required columnstrimmed_movies = movies[["id", "title", "overview", "release_date", "genres"]] trimmed_movies.head(5)
此外,一些数据中缺少部分字段。删除这些缺少字段的数据:
unclean_movies_dict = trimmed_movies.to_dict('records') print('{} movies'.format(len(unclean_movies_dict))) movies_dict = []for movie in unclean_movies_dict:if movie["overview"] == movie["overview"] and movie["release_date"] == movie["release_date"] and movie["genres"] == movie["genres"] and movie["title"] == movie["title"]: movies_dict.append(movie)
03.搭建过程
连接 Milvus
处理完数据后,连接 Milvus 集群以导入数据。我们需要使用 URI 和 token 来连接 Milvus 集群,这些信息可以在 Zilliz Cloud 界面上找到。
使用以下命令通过 PyMilvus 连接 Milvus 服务器:
import milvus dependencyfrom pymilvus import * connect to milvusmilvus_uri="YOUR_URI"token="YOUR_API_TOKEN" connections.connect("default", uri=milvus_uri, token=token) print("Connected!")
将电影信息转化为 Embedding 向量
接下来,把电影数据集转化为 Embedding 向量。首先,创建 1 个 Collection 用于存储电影 ID 和电影信息向量。创建 Collection 时还可以添加索引,使后续搜索变得更高效:
COLLECTION_NAME = 'film_vectors' PARTITION_NAME = 'Movie'Here's our record schema""" "title": Film title, "overview": description, "release_date": film release date, "genres": film generes, "embedding": embedding """ id = FieldSchema(name='title', dtype=DataType.VARCHAR, max_length=500, is_primary=True) field = FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=384) schema = CollectionSchema(fields=[id, field], description="movie recommender: film vectors", enable_dynamic_field=True) if utility.has_collection(COLLECTION_NAME): # drop the same collection created before collection = Collection(COLLECTION_NAME) collection.drop() collection = Collection(name=COLLECTION_NAME, schema=schema) print("Collection created.") index_params = {"index_type": "IVF_FLAT","metric_type": "L2","params": {"nlist": 128}, } collection.create_index(field_name="embedding", index_params=index_params) collection.load() print("Collection indexed!")
Collection创建完毕后,需要写一个函数来生成向量。电影向量中将包含电影简介、电影类型、发行日期等信息。用 SentenceTransformer 生成向量:
from sentence_transformers import SentenceTransformer import ast function to extract the text from genre columndef build_genres(data): genres = data['genres'] genre_list = "" entries= ast.literal_eval(genres) genres = ""for entry in entries: genre_list = genre_list + entry["name"] + ", " genres += genre_list genres = "".join(genres.rsplit(",", 1))return genres create an object of SentenceTransformer transformer = SentenceTransformer('all-MiniLM-L6-v2') function to generate embeddingsdef embed_movie(data): embed = "{} Released on {}. Genres are {}.".format(data["overview"], data["release_date"], build_genres(data)) embeddings = transformer.encode(embed)return embeddings
上述函数使用了 build_genres()
清除电影风格,并提取文本。随后创建了 SentenceTransformer 对象用于生成文本向量。最终用 encode()
方法生成电影向量。
将向量导入 Milvus
由于数据集过大,将数据逐条插入 Milvus 较为低效,且会导致网络流量增加。所以,我们推荐将数据批量导入 Milvus,1 次批量导入 5000 条数据。
Loop counter for batching and showing progressj = 0batch = [] for movie_dict in movies_dict: try: movie_dict["embedding"] = embed_movie(movie_dict)batch.append(movie_dict) j += 1 if j % 5 == 0: print("Embedded {} records".format(j)) collection.insert(batch) print("Batch insert completed")batch=[] except Exception as e: print("Error inserting record {}".format(e)) pprint(batch) break collection.insert(movie_dict) print("Final batch completed") print("Finished with {} embeddings".format(j))
注意:大家可以根据自己的偏好和需求调整批量上传的数据量。同时,有一部分电影可能会由于其 ID 无法转换为整数而导入失败,我们可以相应调整 Schema 或者检查数据格式来避免导入失败的问题。
04.使用 Milvus 搜索和推荐电影
借助 Milvus 的近实时向量搜索能力为用户推荐合适的电影,创建以下两个函数:
-
embed_search()
用 Transformer 将用户搜索文本(字符串)转化为 Embedding 向量,Transformer 和前面的相同。 -
search_for_movies()
用于进行向量相似性搜索。
load collection memory before search collection.load() Set search parameters topK = 5 SEARCH_PARAM = { "metric_type":"L2", "params":{"nprobe": 20}, } convertsearch string to embeddings def embed_search(search_string): search_embeddings = transformer.encode(search_string)return search_embeddings search similar embeddings for user's query def search_for_movies(search_string): user_vector = embed_search(search_string) return collection.search([user_vector],"embedding",param=SEARCH_PARAM, limit=topK, expr=None, output_fields=['title', 'overview'])
上述代码中,我们设置了以下参数:
-
Top-K:
topK = 5
, 规定了搜索将返回 5 个最相似的向量 -
相似度类型:
metric_type
设置为欧氏距离(Euclidean/L2) -
nprobe:设置为 20,规定了搜索 20 个数据聚类
最后,用 search_for_movies()
函数根据用户搜索推荐相关电影:
from pprint import pprint search_string = "A comedy from the 1990s set in a hospital. The main characters are in their 20s and are trying to stop a vampire." results = search_for_movies(search_string) check resultsfor hits in iter(results):for hit in hits:print(hit.entity.get('title'))print(hit.entity.get('overview'))print("-------------------------------")
上图显示,搜索到了 5 部相似电影。至此,我们已经成功用 Milvus 搭建了一个电影推荐系统。
本文最初发布于 The New Stack,已获得转载许可。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
代立冬:基于Apache Doris+SeaTunnel 实现多源实时数据仓库解决方案探索实践
大家好,我是白鲸开源的联合创始人代立冬,同时担任 Apache DolphinScheduler 的 PMC chair 和 SeaTunnel 的 PMC。作为 Apache Foundation 的成员和孵化器导师,我积极参与推动多个开源项目的发展,帮助它们通过孵化器成长为 Apache 的顶级项目。 今天的分享的主题其实还是从开源到商业,Apache SeaTunnel 本身就是做数据同步软件,也经历过 Apache 孵化器的孵化,目前已经毕业成为顶级项目,也会跟大家分享一下它的核心特性。包括为什么我们又重新造轮子,那 Doris 和 WhaleTunnel/SeaTunnel 这个多元实时数仓是怎么去解决一些社区用户的问题?另外也给大家分享一下我们社区的 roadmap 和最近的一些进展。 白鲸开源愿景与使命 白鲸开源其实是一家 DataOps 解决方案提供商,目前致力于数据调度和同步领域,简单理解就是大数据的降本增效。我们的目标是通过高效的数据处理解决方案来降低运营成本并提高效率。 当前的重点是改善数据调度和同步的流程,或者是从开发到生产上线整个流程的打通,从而实现从开发到生...
- 下一篇
再有人问你数据库连接池 Druid 的原理,这篇文章甩给他!
在 Spring Boot 项目中,数据库连接池已经成为标配,然而,我曾经遇到过不少连接池异常导致业务错误的事故。很多经验丰富的工程师也可能不小心在这方面出现问题。 在这篇文章中,我们将探讨数据库连接池,深入解析其实现机制,以便更好地理解和规避潜在的风险。 1 为什么需要连接池 假如没有连接池,我们操作数据库的流程如下: 应用程序使用数据库驱动建立和数据库的 TCP 连接 ; 用户进行身份验证 ; 身份验证通过,应用进行读写数据库操作 ; 操作结束后,关闭 TCP 连接 。 创建数据库连接是一个比较昂贵的操作,若同时有几百人甚至几千人在线,频繁地进行连接操作将占用更多的系统资源,但数据库支持的连接数是有限的,创建大量的连接可能会导致数据库僵死。 当我们有了连接池,应用程序启动时就预先建立多个数据库连接对象,然后将连接对象保存到连接池中。当客户请求到来时,从池中取出一个连接对象为客户服务。当请求完成时,客户程序调用关闭方法,将连接对象放回池中。 相比之下,连接池的优点显而易见: 1、资源重用: 因为数据库连接可以重用,避免了频繁创建,释放连接引起的大量性能开销,同时也增加了系统运行环境的...
相关文章
文章评论
共有0条评论来说两句吧...