使用 MySQL 为 SpreadJS 协同服务器提供存储支持
在多人实时编辑的场景下,SpreadJS 协同服务器需要持久化存储 文档信息、操作日志、快照分片 以及 里程碑快照。 如果你的系统更偏向关系型数据库,那么 MySQL 就是一个很合适的选择。
本文将带你实现 SpreadJS 协同服务器的 MySQL 数据库适配器。
🗂️ 数据库建表设计
我们需要 4 张核心表:
- documents:存储文档基本信息(文档 ID、类型、版本号、快照版本号)
- operations:存储用户的操作记录,用于 OT 算法重放
- snapshot_fragments:存储快照的分片数据
- milestone_snapshot:存储里程碑快照,提升文档恢复速度
📌 建表 SQL 示例:
CREATE TABLE IF NOT EXISTS documents(
id VARCHAR(255) PRIMARY KEY,
type VARCHAR(255) NOT NULL,
version INT NOT NULL,
snapshot_version INT NOT NULL
);
CREATE TABLE IF NOT EXISTS operations(
doc_id VARCHAR(255) NOT NULL,
version INT NOT NULL,
operation TEXT NOT NULL,
PRIMARY KEY(doc_id, version),
FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS snapshot_fragments(
doc_id VARCHAR(255) NOT NULL,
fragment_id VARCHAR(255) NOT NULL,
data TEXT NOT NULL,
PRIMARY KEY(doc_id, fragment_id),
FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS milestone_snapshot(
doc_id VARCHAR(255) NOT NULL,
version INT NOT NULL,
snapshot TEXT NOT NULL,
PRIMARY KEY(doc_id, version),
FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);
⚠️ 列名并非固定,开发者可以根据需要调整。 如果要扩展更多业务数据,可以新建表,并通过 中间件 / 钩子函数 与协同服务集成。
🛠️ MySQL 适配器实现
适配器 MySQLDb
负责与数据库交互,实现以下功能:
- 文档信息存取
- 操作日志管理
- 快照分片存储与更新
- 事务保障一致性
核心逻辑:
getDocument
/getSnapshot
→ 查询文档信息、快照和分片getOps
→ 按版本范围获取操作日志commitOp
→ 事务执行操作写入 & 文档版本更新commitSnapshot
→ 事务更新快照版本 & 分片数据
示例:
export class MySQLDb extends Db {
constructor(pool) {
super();
this.pool = pool;
}
async getDocument(docId) {
console.log("Fetching document:", docId);
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT * FROM documents WHERE id = ?',
[docId]
);
console.log("Fetched document rows:", rows);
connection.release();
if (0 === rows.length) return;
const row = rows[0];
return {
id: row.id,
type: row.type,
version: row.version,
snapshotVersion: row.snapshot_version
};
}
async getSnapshot(docId) {
console.log("Fetching snapshot for document:", docId);
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT * FROM documents WHERE id = ?',
[docId]
);
connection.release();
console.log("Fetched snapshot rows:", rows);
if (0 === rows.length) return;
const row = rows[0];
const fragments = await this.getFragments(docId);
return {
id: row.id,
v: row.snapshot_version,
type: row.type,
fragments: fragments
};
}
async getFragments(docId) {
console.log("Fetching fragments for document:", docId);
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT fragment_id, data FROM snapshot_fragments WHERE doc_id = ?',
[docId]
);
console.log("Fetched fragments rows:", rows);
connection.release();
const fragments = {};
for (const row of rows) {
fragments[row.fragment_id] = JSON.parse(row.data);
}
return fragments;
}
async getFragment(docId, fragmentId) {
console.log("Fetching fragment:", docId, fragmentId);
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT data FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
[docId, fragmentId]
);
connection.release();
console.log("Fetched fragment rows:", rows);
if (0 === rows.length) return;
return JSON.parse(rows[0].data);
}
async getOps(docId, from, to) {
console.log("Fetching operations for document:", docId, "from version:", from, "to version:", to);
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT operation FROM operations WHERE doc_id = ? AND version >= ?' + (to ? ' AND version <= ?' : '') + " ORDER BY version",
to ? [docId, from, to] : [docId, from]
);
connection.release();
console.log("Fetched operations rows:", rows);
if (0 === rows.length) return [];
return rows.map(row => JSON.parse(row.operation));
}
async commitOp(docId, op, document) {
console.log("Committing operation:", docId, op, document);
const connection = await this.pool.getConnection();
try {
connection.beginTransaction();
// query version from document db
const [rows] = await connection.execute(
'SELECT version FROM documents WHERE id = ?',
[docId]
);
if (op.create) {
console.log("Creating new document:", docId, rows);
if (rows.length > 0) {
await connection.rollback();
return false;
}
await connection.execute(
'INSERT INTO documents (id, type, version, snapshot_version) VALUES (?, ?, ?, ?)',
[docId, document.type, document.version, document.snapshotVersion]
);
await connection.execute(
'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
[docId, op.v, JSON.stringify(op)]
);
await connection.commit();
console.log("Operation create successfully.");
return true;
}
else if (op.del) {
console.log("Deleting document:", docId, rows);
if (rows.length === 0) {
await connection.rollback();
return false;
}
await connection.execute(
'DELETE FROM documents WHERE id = ?',
[docId]
);
await connection.commit();
console.log("Operation delete successfully.");
return true;
}
else {
console.log("Updating operation:", docId, op, rows);
if (rows.length === 0 || rows[0].version !== op.v) {
await connection.rollback();
return false;
}
await connection.execute(
'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
[docId, op.v, JSON.stringify(op)]
);
await connection.execute(
'UPDATE documents SET version = ? WHERE id = ?',
[document.version, docId]
);
await connection.commit();
console.log("Operation update successfully.");
return true;
}
} catch (error) {
console.error('Error committing operation:', error);
await connection.rollback();
return false;
}
finally {
connection.release();
}
}
async commitSnapshot(docId, snapshot) {
console.log("Committing snapshot for document:", docId, snapshot);
const connection = await this.pool.getConnection();
try {
connection.beginTransaction();
// query snapshot_version from document db
const [rows] = await connection.execute(
'SELECT snapshot_version FROM documents WHERE id = ?',
[docId]
);
if (0 === rows.length) {
await connection.rollback();
return false;
}
const currentSnapshotVersion = rows[0].snapshot_version;
if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v <= currentSnapshotVersion) {
await connection.rollback();
return false;
}
await connection.execute(
'UPDATE documents SET snapshot_version = ? WHERE id = ?',
[snapshot.v, docId]
);
if (snapshot.fragmentsChanges.deleteSnapshot) {
await connection.execute(
'DELETE FROM snapshot_fragments WHERE doc_id = ?',
[docId]
);
} else {
const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;
console.log("Committing snapshot fragments changes:", createFragments, updateFragments, deleteFragments);
if (createFragments) {
for (const [fragmentId, data] of Object.entries(createFragments)) {
await connection.execute(
'INSERT INTO snapshot_fragments (doc_id, fragment_id, data) VALUES (?, ?, ?)',
[docId, fragmentId, JSON.stringify(data)]
);
}
}
if (updateFragments) {
for (const [fragmentId, data] of Object.entries(updateFragments)) {
await connection.execute(
'UPDATE snapshot_fragments SET data = ? WHERE doc_id = ? AND fragment_id = ?',
[JSON.stringify(data), docId, fragmentId]
);
}
}
if (deleteFragments) {
for (const fragmentId of deleteFragments) {
await connection.execute(
'DELETE FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
[docId, fragmentId]
);
}
}
}
await connection.commit();
return true;
} catch (error) {
console.error('Error committing snapshot:', error);
await connection.rollback();
return false;
} finally {
connection.release();
}
}
async close() {
await this.pool.end();
}
}
📌 数据流转示意:
用户操作 --> commitOp --> operations表
--> documents表(version更新)
快照提交 --> commitSnapshot --> snapshot_fragments表
--> documents表(snapshot_version更新)
🏷️ 里程碑快照存储
与 MongoDB 版本类似,MySQL 适配器也支持 里程碑快照,用于快速恢复文档。
实现类 MySQLMilestoneDb
提供两个接口:
saveMilestoneSnapshot
→ 定期保存完整快照getMilestoneSnapshot
→ 查找 ≤指定版本 的最近快照
示例:
export class MySQLMilestoneDb {
constructor(pool, interval) {
this.pool = pool;
this.interval = interval || 100; // 默认间隔
}
async saveMilestoneSnapshot(snapshot) {
const connection = await this.pool.getConnection();
try {
await connection.beginTransaction();
await connection.execute(
'INSERT INTO milestone_snapshot (doc_id, version, snapshot) VALUES (?, ?, ?)',
[snapshot.id, snapshot.v, JSON.stringify(snapshot)]
);
await connection.commit();
return true;
} catch (error) {
await connection.rollback();
return false;
} finally {
connection.release();
}
}
async getMilestoneSnapshot(id, version) {
const connection = await this.pool.getConnection();
const [rows] = await connection.execute(
'SELECT * FROM milestone_snapshot WHERE doc_id = ? AND version <= ? ORDER BY version DESC LIMIT 1',
[id, version]
);
connection.release();
if (0 === rows.length) return;
return JSON.parse(rows[0].snapshot);
}
}
📌 恢复流程:
里程碑快照 (v=1000) + 增量操作 (1001-1020) = 最新文档
⚙️ 在 DocumentServices 中集成
最后,在协同服务器中配置 MySQL 适配器:
const mySqlAdapter = new MySQLDb(mySqlPool);
const MySQLMilestoneAdapter = new MySQLMilestoneDb(mySqlPool, 100);
const documentServices = new OT.DocumentServices({
db: mySqlAdapter,
milestoneDb: MySQLMilestoneAdapter
});
server.useFeature(OT.documentFeature(documentServices));
至此,SpreadJS 协同服务器即可使用 MySQL 作为后端存储。
✅ 总结
本文展示了如何基于 MySQL 为 SpreadJS 协同服务器实现数据库适配,主要内容包括:
- 建表设计 → documents / operations / snapshot_fragments / milestone_snapshot
- MySQL 适配器实现 → 支持事务,保证操作和快照一致性
- 里程碑快照 → 提升文档恢复效率
- 服务集成 → 在
DocumentServices
中配置适配器
通过 MySQL,你可以获得更强的数据一致性和事务控制能力,非常适合企业级协同编辑场景。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
SpreadJS 协同服务器 MongoDB 数据库适配支持
为了支持 SpreadJS 协同编辑场景,协同服务器需要持久化存储文档、操作、快照及里程碑数据。本文介绍了 MongoDB 数据库适配器的实现方法,包括集合初始化、适配器接口实现以及里程碑存储支持。 一、MongoDB 集合初始化 协同编辑服务需要以下集合(Collections)来存储数据: documents:存储文档基本信息(类型、版本号、快照版本号)。 operations:存储操作记录,用于实现基于 OT 的操作重放。 snapshot_fragments:存储快照的分片数据,支持大文档分段管理。 milestone_snapshot:存储里程碑快照,用于回溯和恢复。 初始化脚本示例如下: export async function InitCollections(client) {const collectionsToCreate = [ { name: 'documents', indexes: [{ key: { id: 1 }, unique: true }] }, { name: 'operations', indexes: [{ key: { doc_id: 1,...
-
下一篇
基于TinyMce富文本编辑器的客服自研知识库的技术探索和实践|得物技术
一、 背景 客服知识库是一个集中管理和存储与客服相关的信息和资源的系统,在自研知识库上线之前,得物采用的承接工具为第三方知识库系统。伴随着业务的发展,知识的维护体量、下游系统的使用面临的问题愈发明显,而当前的第三方采购系统,已经较难满足内部系统间高效协作的诉求,基于以上业务诉求,我们自研了一套客服知识库。 二、富文本编辑器的选型 以下是经过调研后列出的多款富文本编辑器综合对比情况: 2.1 编辑器的选择 自研知识库要求富文本编辑器具备表格的编辑能力,由于Quill不支持表格编辑能力(借助表格插件可以实现该能力,但经过实际验证,插件提供的表格编辑能力不够丰富,使用体验也较差),被首先被排除。 <!-- --> wangEditor体验过程中发现标题和列表(有序、无序)列表两个功能互斥,体验不太好,而这两个功能都是自研知识库刚需功能,也被排除。 <!-- --> Lexical 是facebook推出的一款编辑器,虽功能很丰富,但相较于CKEditor 和TinyMCE ,文档不够完善,社区活跃性较低,插件不成熟,故优先选择CKEditor 和TinyMCE。 CK...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Hadoop3单机部署,实现最简伪集群
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- Dcoker安装(在线仓库),最新的服务器搭配容器使用