交易平台钱包管理服务原理与实现代码
交易所钱包服务是加密货币交易所系统中的重要组成部分,它负责与各种不同的区块链的交互,实现用户地址生成、充值与提现等功能。本文以对接以太坊区块链的钱包服务为例,介绍交易所系统平台中钱包管理服务的设计与实现。
交易所系统中钱包服务是一个非常重要的组件,它的主要功能包括:
- 生成以太坊充值地址
- 当监听地址发生新交易时获取通知
- 广播签名交易
- 处理ERC20代币的充值
- 在区块链中部署新的合约并操作合约方法
如果希望快速掌握区块链应用的开发,推荐汇智网的区块链应用开发系列教程, 内容涵盖比特币、以太坊、eos、超级账本fabric和tendermint等多种区块链,以及 java、go、nodejs、python、php、dart等多种开发语言。
1、开发与运行环境概述
在我们继续之前,首先要满足以下环境要求:
- Docker: Docker已经成为新应用开发的必备工具,它使得应用的构建、分享与部署都极其简单。
- Docker Compose:我们使用Docker Compose来管理所有的服务,以便轻松地进行扩展。
其他的需求都由Docker镜像来满足,我们不需要安装其他任何东西了,只需要写一个简单的Docker Compos配置文档 —— docker-compose.yml:
version: '3' services: ganache: image: trufflesuite/ganache-cli command: -m redis: image: redis:alpine ports: - "6379:6379" command: redis-server --appendonly yes volumes: - redis:/data zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "command:1:1,address.created:1:1,transaction:1:1,errors:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: redis:
只要运行docker-compose up -d
就可以轻松地启动服务,这个命令会自动从Docker中心下载必要的镜像,然后启动。下面让我们看看都有哪些服务。
1.1 Ganache-cli
如果没有接入以太坊区块链的节点,我们的钱包服务就不会有什么用。在开发期我们不需要下载整个以太坊区块链,因此只要使用Ganache仿真器即可。使用Ganache的好处是开发效率高,因为出块极快。不过在生产环境中就需要使用像Geth这样的节点软件来接入以太坊主网了。
1.2 Redis
我们需要数据库来保存我们创建的地址,并且监听这些地址相关的交易。Redis是一个很出色的内存键/值数据库,非常适合我们的应用场景。
在这个教程中,我们将使用Redis数据库来保存我们为地址生成的私钥,但是在生产服务器上应当使用更安全的硬件设施来保护这些私钥。
1.3 Kafka/Zookeeper
Apache Kafka在交易所架构中扮演着核心的角色,它负责接收所有服务的消息并分发给订阅这些消息的节点。
对于以太坊钱包服务而言,我们将使用以下这些主题进行通信:
- command
- address.created
- transaction
- errors
Apache Kafka服务器可以独立地进行扩展,为我们的服务提供了一个分布式的消息处理集群。
2、开发语言选择
就我个人而言,是非常喜欢Elixir的,因为可以用它写出极其可靠的分布式应用,而且代码也很容易理解和维护。但是考虑到以太坊的生态,Elixir就没有什么优势了。
对于以太坊开发而言最好的选择还是使用Node.js/Javascript。因为有很多你可以直接就用的组件。因此我们的以太坊钱包服务最终决定使用Node.js开发。
3、初始开发环境搭建
首先运行npm init
命令来创建默认的node包:
~/exchange-hubwiz/eth-wallet$ npm init
然后我们可以添加一些钱包服务要用到的node依赖包,执行如下命令:
~/exhcange-hubwiz/eth-wallet$ npm install --save web3 redis kafka-node ethereumjs-tx bluebird
前三个依赖包的作用容易理解:
- web3:通过websocket连接到Ganache或其他以太坊节点
- redis:连接到Redis服务器以便保存或提取数据
- kafka-node:接入Zookeeper,获取Kafka访问端结点,生产或消费Kafka消息
最后的两个依赖包有助于让我们的代码更容易理解,并且可以利用async/await的异步编程模式的优势。
接下来,我们将利用这些node包连接Redis、以太坊和Kafka服务器。
4、连接服务器
4.1 连接Redis服务器
连接Redis非常简单,创建一个redis.js文件,然后编写如下代码:
// load configuration const config = require('../../config') const redis = require('redis') const bluebird = require('bluebird') // promisify the redis client using bluebird bluebird.promisifyAll(redis.RedisClient.prototype); bluebird.promisifyAll(redis.Multi.prototype); // create a new redis client and connect to the redis instance const client = redis.createClient(config.redis_port, config.redis_host); // if an error occurs, print it to the console client.on('error', function (err) { console.error("[REDIS] Error encountered", err) }) module.exports = client;
4.2 连接以太坊节点
如果你认为连接Redis很简单了,那么使用web3连接以太坊节点简单的会让你吃惊。
创建一个ethereum.js,然后编写如下代码:
const config = require('../../config') const Web3 = require('web3') module.exports = new Web3(config.uri)
4.3 连接Kafka服务器
Kafka,需要从队列中提取消息进行消费,或者生产消息存入队列。因此我们也需要继续相关的配置。
创建一个新的文件query.js,然后编写如下的代码:
const kafka = require('kafka-node') const config = require('../../config') // configure how the consumers should connect to the broker/servers // each consumer creates his own connecto to a broker const default_options = { host: config.kafka_zookeeper_uri, autoCommit: true, fromOffset: 'earliest', } module.exports.consumer = (group_id = "ethereum_wallet_manager_consumer", topics = [], opts = {}) => { const options = Object.assign({ groupId: group_id }, default_options, opts) const consumer = new kafka.ConsumerGroup(options, topics) return consumer } // configure how the producer connects to the Apache Kafka broker // initiate the connection to the kafka client const client = new kafka.Client(config.kafka_zookeeper_uri, config.kafka_client_id) module.exports.client = client const producer = new kafka.Producer(client) // add a listener to the ready event async function on_ready(cb) { producer.on('ready', cb) } // define a method to send multiple messages to the given topic // this will return a promise that will resolve with the response from Kafka // messages are converted to JSON strings before they are added in the queue async function send(topic, messages) { return new Promise((resolve, reject) => { // convert objects to JSON strings messages = messages.map(JSON.stringify) // add the messages to the given topic producer.send([{ topic, messages}], function (err, data) { if (err) return reject(err) resolve(data) }) }) } // expose only these methods to the rest of the application and abstract away // the implementation of the producer to easily change it later module.exports.on_ready = on_ready module.exports.send = send
5、打造以太坊钱包服务
现在我们开始进入以太坊钱包服务的核心特性开发阶段。
5.1 创建新的以太坊账户
交易所和支付网关需要为客户生成新地址,以便用户可以向服务充值,或者为产品付费。生成一个没有用过的以太坊地址是任何虚拟货币服务的基本需求,因此让我们看看如何实现。
首先,创建一个commands.js,在其中我们订阅队列中的消息。主要包括以下几个步骤:
- 连接到command主题,监听新的create_account命令
- 当收到新的create_account命令时,创建新的密钥对并存入密码库
- 生成account_created消息并发送到队列的account_created主题
代码如下:
const web3 = require("./ethereum") const redis = require('./redis') const queue = require('./queue') /** * Listen to new commands from the queue */ async function listen_to_commands() { const queue_consumer = queue.consumer('eth.wallet.manager.commands', ['command']) // process messages queue_consumer.on('message', async function (topic_message) { try { const message = JSON.parse(topic_message.value) // create the new address with some reply metadata to match the response to the request const resp = await create_address(message.meta) // if successful then post the response to the queue if (resp) { await queue_producer.send('address.created', [resp]) } } catch (err) { // in case something goes wrong catch the error and send it back in the 'errors' topic console.error(topic_message, err) queue_producer.send('errors', [{type: 'command', request: topic_message, error_code: err.code, error_message: err.message, error_stack: err.stack}]) } }) return queue_consumer } /** * Create a new ethereum address and return the address */ async function create_account(meta = {}) { // generate the address const account = await web3.eth.accounts.create() // disable checksum when storing the address const address = account.address.toLowerCase() // save the public address in Redis without any transactions received yet await redis.setAsync(`eth:address:public:${address}`, JSON.stringify({})) // Store the private key in a vault. // For demo purposes we use the same Redis instance, but this should be changed in production await redis.setAsync(`eth:address:private:${address}`, account.privateKey) return Object.assign({}, meta, {address: account.address}) } module.exports.listen_to_commands = listen_to_commands
5.2 处理新交易
我们的钱包还没写完,当我们创建的地址收到用户充值时应当得到通知才对。为此,以太坊的web3客户端提供了newBlockHeaders订阅机制。此外,如果我们的服务偶然宕机,那么服务就会错过在宕机期间生产的区块,因此我们还需要检查钱包是否已经同步到了网络的最新区块。
创建 sync_blocks.js文件,编写如下代码:
const web3 = require('./ethereum') /** * Sync blocks and start listening for new blocks * @param {Number} current_block_number - The last block processed * @param {Object} opts - A list of options with callbacks for events */ async function sync_blocks(current_block_number, opts) { // first sync the wallet to the latest block let latest_block_number = await web3.eth.getBlockNumber() let synced_block_number = await sync_to_block(current_block_number, latest_block_number, opts) // subscribe to new blocks web3.eth.subscribe('newBlockHeaders', (error, result) => error && console.log(error)) .on("data", async function(blockHeader) { return await process_block(blockHeader.number, opts) }) return synced_block_number } // Load all data about the given block and call the callbacks if defined async function process_block(block_hash_or_id, opts) { // load block information by id or hash const block = await web3.eth.getBlock(block_hash_or_id, true) // call the onTransactions callback if defined opts.onTransactions ? opts.onTransactions(block.transactions) : null; // call the onBlock callback if defined opts.onBlock ? opts.onBlock(block_hash_or_id) : null; return block } // Traverse all unprocessed blocks between the current index and the lastest block number async function sync_to_block(index, latest, opts) { if (index >= latest) { return index; } await process_block(index + 1, opts) return await sync_to_block(index + 1, latest, opts) } module.exports = sync_blocks
在上面的代码中,我们从钱包服务之前处理的最新区块开始,一直同步到区块链的当前最新区块。一旦我们同步到最新区块,就开始订阅新区块事件。对于每一个区块,我们都执行如下的回调函数以处理区块头以及区块中的交易列表:
- onTransactions
- onBlock
通常包含如下的处理步骤:
- 监听新区块,获取区块中的全部交易
- 过滤掉与钱包地址无关的交易
- 将每个相关的交易都发往队列
- 将地址上的资金归集到安全的存储
- 更新已处理的区块编号
最终的代码如下:
const web3 = require("web3") const redis = require('./redis') const queue = require('./queue') const sync_blocks = require('./sync_blocks') /** * Start syncing blocks and listen for new transactions on the blockchain */ async function start_syncing_blocks() { // start from the last block number processed or 0 (you can use the current block before deploying for the first time) let last_block_number = await redis.getAsync('eth:last-block') last_block_number = last_block_number || 0 // start syncing blocks sync_blocks(last_block_number, { // for every new block update the latest block value in redis onBlock: update_block_head, // for new transactions check each transaction and see if it's new onTransactions: async (transactions) => { for (let i in transactions) { await process_transaction(transactions[i]) } } }) } // save the lastest block on redis async function update_block_head(head) { return await redis.setAsync('eth:last-block', head) } // process a new transaction async function process_transaction(transaction) { const address = transaction.to.toLowerCase() const amount_in_ether = web3.utils.fromWei(transaction.value) // check if the receiving address has been generated by our wallet const watched_address = await redis.existsAsync(`eth:address:public:${address}`) if (watched_address !== 1) { return false } // then check if it's a new transaction that should be taken into account const transaction_exists = await redis.existsAsync(`eth:address:public:${address}`) if (transaction_exists === 1) { return false } // update the list of transactions for that address const data = await redis.getAsync(`eth:address:public:${address}`) let addr_data = JSON.parse(data) addr_data[transaction.hash] = { value: amount_in_ether } await redis.setAsync(`eth:address:public:${address}`, JSON.stringify(addr_data)) await redis.setAsync(`eth:transaction:${transaction.hash}`, transaction) // move funds to the cold wallet address // const cold_txid = await move_to_cold_storage(address, amount_in_ether) // send notification to the kafka server await queue_producer.send('transaction', [{ txid: transaction.hash, value: amount_in_ether, to: transaction.to, from: transaction.from, //cold_txid: cold_txid, }]) return true } module.exports = start_syncing_blocks
6、总结
我们已经完成了交易所以太坊钱包服务的设计与实现,这个服务还可以从以下几个方面加以改进:
- 增加错误处理
- 增加命令类型
- 交易签名与交易广播
- 部署合约
你可以试着自己来设计实现上述这些特性!
原文链接:交易所钱包管理服务设计与实现 — 汇智网
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
ASP.NET Core on K8S深入学习(3-1)Deployment
上一篇《部署过程解析与安装Dashboard》中我们了解K8S的部署过程,这一篇我们来了解一下K8S为我们提供的几种应用运行方式:Deployment、DaemonSet与Job,它们是Kubernetes最重要的核心功能提供者。考虑到篇幅和更新速度,我将其分为两篇文章,本篇会主要介绍Deployment,主要参考自CloudMan《每天5分钟玩转Kubernetes》,也推荐大家购买阅读。 一、创建资源的两种方式 K8S支持两种创建资源的方式,分别是 使用kubectl命令直接创建 与 通过配置文件+kubectl apply创建,下面以上一篇中的ASP.NET Core示例来分别介绍下这两种方式。 1.1 Kubectl命令直接创建 第一种是通过kubectl命令直接创建: kubectl run k8s-demo-deployment --image=edisonsaonian/k8s-demo:latest --replicas=2 --namespace=aspnetcore 这样我们就部署了一个具有2个副本的k8s-demo(一个ASP.NET Core API示例)。 1...
- 下一篇
深入浅出 Kubernetes:浅谈 Deployment 和 ReplicaSet
深入浅出 Kubernetes:浅谈 Deployment 和 ReplicaSet 一 背景 Deployment 和 ReplicaSet 是 Kubernetes 中两个比较重要的对象,本文简单地讨论了他们之间的一些区别与联系。 二 Deployment apiVersion: apps/v1 kind: Deployment metadata: name: deployment-gysl spec: replicas: 2 selector: matchLabels: app-1: nginx app-2: busybox template: metadata: labels: app-1: nginx app-2: busybox spec: containers: - name: app-1 image: nginx:1.16.0 imagePullPolicy: Always ports: - containerPort: 80 - containerPort: 8080 - name: app-2 image: busybox imagePullPolicy: Neve...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,CentOS7官方镜像安装Oracle11G
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker安装Oracle12C,快速搭建Oracle学习环境