首页 文章 精选 留言 我的

精选列表

搜索[快速入门],共10000篇文章
优秀的个人博客,低调大师

IBeacon(i 比肯)物联网最快速的连接器

Ibeacon一项低耗能蓝牙技术技术,工作原理类似之前的蓝牙技术,由iBeacon发射信号,IOS设备定位接受,反馈信号。根据这项简单的定位技术可以做出许多的相应技术应用。[ 兼容设备编辑 支持蓝牙4.0的IOS设备(iPhone4s及以上,iPad第三代及以上,iPad mini第一代及以上,iPod Touch第五代)。 支持OS X Mavericks 10.9操作系统和蓝牙4.0的苹果计算机。 安卓4.3及以上(如三星 Galaxy S3/S4/S4 Mini, 三星 Galaxy Note 2/3, HTC One, Google/LG Nexus 7 2013 version/Nexus 4/Nexus 5, HTC Butterfly, OnePlus One) 支持Lumia Cyan及以上更新服务Windows Phone设备(报告显示,不包含Windows Phone 8.1)

优秀的个人博客,低调大师

ELK日志服务器的快速搭建并收集nginx日志

今天给大家带来的是开源实时日志分析 ELK , ELK 由 ElasticSearch 、 Logstash 和 Kiabana 三个开源工具组成。官方网站:https://www.elastic.co 其中的3个软件是: Elasticsearch 是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制, restful 风格接口,多数据源,自动搜索负载等。 Logstash 是一个完全开源的工具,他可以对你的日志进行收集、分析,并将其存储供以后使用(如,搜索)。 kibana 也是一个开源和免费的工具,他 Kibana 可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志 系统 系统需要安装的软件 ip 描述 centos6.4 Elasticsearch/test5 192.168.48.133 搜索存储日志 centos6.4 Elasticsearch/test4 192.168.48.131 搜索存储日志 centos6.4 Logstash/nginx/test1 192.168.48.129 用来收集日志给上面 centos6.4 kibana,nginx/test2 192.168.48.130 用来后端的展示 软件下载:链接:链接:http://share.weiyun.com/3b6c27e33cbf4d0acc0ccfa14877bd05 (密码:IWLe) 架构原理图: 一、先安装elasticsearch集群,并测试通过再进行其他软件安装。 在test5,test4上安装分别安装elasticsearch-2.3.3.rpm 前提要安装java1.8 步骤如下: yumremovejava-1.7.0-openjdk rpm-ivhjdk-8u91-linux-x64.rpm yumlocalinstallelasticsearch-2.3.3.rpm 配置elasticsearch 在目录/etc/elasticsearch目录下面 lasticsearch.yml elasticsearch.yml.bak logging.yml scripts 编辑lasticsearch.yml 修改如下配置 cluster.name: myelk #设置集群的名称,在一个集群里面都是这个名称,必须相同 node.name: test5 #设置每一个节点的名,每个节点的名称必须不一样。 path.data: /path/to/data #指定数据的存放位置,线上的机器这个要放到单一的大分区里面。 path.logs: /path/to/logs #日志的目录 bootstrap.mlockall: true #启动最优内存配置,启动就分配了足够的内存,性能会好很多,测试我就不启动了。 network.host: 0.0.0.0 #监听的ip地址,这个表示所有的地址。 http.port: 9200 #监听的端口号 discovery.zen.ping.unicast.hosts: ["192.168.48.133", "192.168.48.131"] #知道集群的ip有那些,没有集群就会出现就一台工作 建立目录 mkdir-pv/pach/to/{data,logs} chownelasticsearch.elasticsearch/path-R 启动服务器 service elasticsearch start 并查看监控端口启动 访问9200端口查看服务 两台的配置都一样就是上面的IP和note名称要配置不一样就行 安装插件 head和kopf 之后访问ip:9200/_plugin/head 和ip:9200/_plugin/kopf (插件可以图形查看elasticsearch的状态和删除创建索引) /usr/share/elasticsearch/bin/plugininstalllmenezes/elasticsearch-kopf /usr/share/elasticsearch/bin/plugininstallmobz/elasticsearch-head 二、安装nginx和logstash软件 在test1上安装好nginx服务 就是收集它的日志呢 yum-yinstallzlibzlib-developensslopenssl--develpcrepcre-devel ./configure--prefix=/usr/local/nginx--with-pcre--with-openssl=--with-zlib= make&&makeinstall 日志在/usr/local/nginx/logs/access.log 然后在test1上安装logstash-2.3.3-1.noarch.rpm yumremovejava-1.7.0-openjdk rpm-ivhjdk-8u91-linux-x64.rpm rpm-ivhlogstash-2.3.3-1.noarch.rpm /etc/init.d/logstashstart#启动服务 /opt/logstash/bin/logstash-e"input{stdin{}}output{stdout{codec=>"rubydebug"}}"#检测环境执行这个命令检测环境正常否,启动完成后直接输入东西就会出现 之后输入/opt/logstash/bin/logstash -e 'input {stdin{}} output{ elasticsearch { hosts => ["192.168.48.131:9200"] index => "test"}}' 就是输入东西到48.131的elasticsearch上 会在/path/to/data/myelk/nodes/0/indices 生成你名称test索引文件目录 可以多输入几个到48.131的目录看看有没有文件有就证明正常。 之后在/etc/logstash/conf.d 建立以.conf结尾的配置文件,我收集nginx就叫nginx.conf了内容如下; ########################################################################################### input { file { type => "accesslog" path => "/usr/local/nginx/logs/access.log" #日志的位置 start_position => "beginning" #日志收集文件,默认end } } output { if [type] == "accesslog" { elasticsearch { hosts => ["192.168.0.87"] ###elasticearch的地址 index => "nginx-access-%{+YYYY.MM.dd}" #生成的索引和刚才的test一样会在那里生成后面的是日期变量。 } } } ########################################################################################## 一定要仔细,之后运行/etc/init.d/logstash configtest检测配置是否正常。 查看进程是否启动 之后在elasticearch查看有没有索引生成。多访问下nginx服务 如果没有就修改这个文件 vi /etc/init.d/logstash ###################################################################################################### LS_USER=root ###把这里换成root或者把访问的日志加个权限可以让logstash可以读取它 重启服务就会生成索引了 LS_GROUP=root LS_HOME=/var/lib/logstash LS_HEAP_SIZE="1g" LS_LOG_DIR=/var/log/logstash LS_LOG_FILE="${LS_LOG_DIR}/$name.log" LS_CONF_DIR=/etc/logstash/conf.d LS_OPEN_FILES=16384 LS_NICE=19 KILL_ON_STOP_TIMEOUT=${KILL_ON_STOP_TIMEOUT-0} #default value is zero to this variable but could be updated by user request LS_OPTS="" ####################################################################################################### 看logstash的日志有下面的信息就成功了 三、安装kibana软件 上面的都安装完成后在test2上面安装kibana rpm-ivhkibana-4.5.1-1.x86_64.rpm 编辑配置文件在这里/opt/kibana/config/kibana.yml 就修改下面几项就行 ####################################################################################################### server.port: 5601 端口 server.host: "0.0.0.0" 监听 elasticsearch.url: "http://192.168.48.131:9200"elasticsearch地址 ###################################################################################################### /etc/init.d/kibana start 启动服务 访问kibana http://ip:5601 添加展示的索引,就是在上面定义的 nginx-access-2016.07.03 四、其他的一些配置。 kibana是直接访问的比较不安全,我们需要用nginx访问代理,并设置权限用户名和密码访问 先在kibana服务器上安装nginx 不介绍了 在nginx里面配置 ################################################################################# server { listen 80; server_name localhost; auth_basic "Restricted Access"; auth_basic_user_file /usr/local/nginx/conf/htpasswd.users; #密码和用户 location / { proxy_pass http://localhost:5601; #代理kibana的5601之后就可以直接80访问了 proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header REMOTE-HOST $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } } #################################################################################### 创建密码和用户文件:htpasswd.users 需要安装httpd-tool包先安装它 htpasswd -bc /usr/local/nginx/conf/htpasswd.users admin paswdadmin #前面是用户后面是密码 ################################################################################## 之后通过访问需要密码和用户并且是80端口了 到这里就完成了,谢谢你的阅读。

优秀的个人博客,低调大师

容器服务之快速搭建使用阿里云rds的wordpress网站

登录容器服务控制台,创建好集群,如果用户已有ECS机器,可以创建0节点的集群,然后将已有机器加入集群。 登录容器服务控制台,选择侧边栏的应用,在下拉框选择相应的集群,选择创建应用 填写应用名称,本例为wordpress-rds, 选择使用编排模板创建 在编辑框中输入以下编排模板请注意将WORDPRESS_DB_USER修改为您数据库用户名,将WORDPRESS_DB_PASSWORD修改为您数据库的密码,将WORDPRESS_DB_NAME修改为您数据库的名称,同时将db服务下面的host修改为您要连接的数据库的域名,将ports修改为您要连接的数据库的端口。 web: image: wordpress ports: - '80' restart: always li

优秀的个人博客,低调大师

全面解析 SeaTunnel API 源码:从入门到精通数据集成

引言 随着大数据技术的发展,数据集成和数据流处理需求日益增长。Apache SeaTunnel 作为一款开源的数据集成框架,不仅支持多种数据源和目标,还提供了灵活的 API 来满足各种复杂的业务需求。 本文将深入解析 Apache SeaTunnel 的 API,帮助开发者更好地理解其使用场景和实现方式。 从接口定义来看SeaTunnel 从官网的这个图中, 可以看到在SeaTunnel中, 定义了以下几种类型: 数据源 API(Source API):用于定义数据的输入源。 数据转换 API(Transform API):用于处理和转换数据。 数据目标 API(Sink API):用于定义数据的输出目标。 三种类型/算子 所以我想先从接口的定义上来看下Apache SeaTunnel的设计理念. SeaTunnelSource SeaTunnelSource是数据读取的接口定义, 在这个接口中, 定义了如何从某个数据源中抽取数据. public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends Serializable> extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware { /** * 返回当前Source的类型,是[有界批数据]还是[无界流数据] */ Boundedness getBoundedness(); /** * 此方法后面将弃用 * 后面将使用Catalog来表示数据,可以添加更多的元数据来描述数据 */ default SeaTunnelDataType<T> getProducedType() { return (SeaTunnelDataType) getProducedCatalogTables().get(0).getSeaTunnelRowType(); } /** * 当前SeaTunnel是支持多表读取的, 所以这里会返回一个list类型的结构 * 每个catalog则是对读取的表的元数据信息 */ default List<CatalogTable> getProducedCatalogTables() { throw new UnsupportedOperationException( "getProducedCatalogTables method has not been implemented."); } /** * 创建 Reader,Reader是真正去读取数据的类 */ SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception; /** * 这两个方法是创建/恢复 SplitEnumerator */ SourceSplitEnumerator<SplitT, StateT> createEnumerator( SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception; SourceSplitEnumerator<SplitT, StateT> restoreEnumerator( SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception; /** * 这两个方法一般不会修改, 如果需要对Enumerator自定义序列化方式可以重写 */ default Serializer<SplitT> getSplitSerializer() { return new DefaultSerializer<>(); } default Serializer<StateT> getEnumeratorStateSerializer() { return new DefaultSerializer<>(); } } 从这个接口中可以看到有两个主要的方法 createReader createEnumerator createEnumerator方法创建的SourceSplitEnumerator作用是对要抽取的数据进行任务拆分. createReader方法创建的SourceReader则会依据这些拆分的任务进行实际的任务读取 首先来看下SourceSplitEnumerator的代码 SourceSplitEnumerator public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends AutoCloseable, CheckpointListener { void open(); void run() throws Exception; @Override void close() throws IOException; void addSplitsBack(List<SplitT> splits, int subtaskId); int currentUnassignedSplitSize(); void handleSplitRequest(int subtaskId); void registerReader(int subtaskId); StateT snapshotState(long checkpointId) throws Exception; default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {} interface Context<SplitT extends SourceSplit> { int currentParallelism(); Set<Integer> registeredReaders(); void assignSplit(int subtaskId, List<SplitT> splits); default void assignSplit(int subtaskId, SplitT split) { assignSplit(subtaskId, Collections.singletonList(split)); } void signalNoMoreSplits(int subtask); void sendEventToSourceReader(int subtaskId, SourceEvent event); MetricsContext getMetricsContext(); EventListener getEventListener(); } } SourceSplitEnumerator接口中定义了一些方法以及一个内部类Context 先看下自身的一些方法. 可以看到有3个跟生命周期相关的方法,open(), run()和close()。这几个方法就需要连接器自己去根据相关的实现来做一下资源的创建或关闭动作. registerReader(int subtaskId)方法 reader主动向split enumerator进行注册 handleSplitRequest(int subtaskId) reader主动向split enumerator进行请求,获取自己将要执行的抽取任务. (但看了代码实现,这种方式比较少,大部分还是enumerator主动向reader推送任务) addSplitsBack(List<SplitT> splits, int subtaskId) 则是当某个reader出现异常后,需要将它运行的任务重新分配,此时需要将它运行的任务重新添加回队列中,后面进行重新分配到其他节点进行容错。 而在Context接口定义中有两个关键方法 assignSplit(int subtaskId, List<SplitT> splits) split enumerator主动向某个reader推送任务 signalNoMoreSplits(int subtaskId) split enumerator告诉某个reader,它后面将不会再有其他任务被分配。 SourceReader public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener { void open() throws Exception; @Override void close() throws IOException; void pollNext(Collector<T> output) throws Exception; List<SplitT> snapshotState(long checkpointId) throws Exception; void addSplits(List<SplitT> splits); void handleNoMoreSplits(); default void handleSourceEvent(SourceEvent sourceEvent) {} interface Context { int getIndexOfSubtask(); Boundedness getBoundedness(); void signalNoMoreElement(); void sendSplitRequest(); void sendSourceEventToEnumerator(SourceEvent sourceEvent); MetricsContext getMetricsContext(); EventListener getEventListener(); } } 在Reader接口定义中, 也有一个内部类Context 首先来看下我们来看下几个主要方法: pollNext(Collector<T> output) 抽取数据的主要方法,在这个方法中每个连接器都会实现从自己相应的数据源中抽取数据,转换成seatunnel的内部数据结构SeaTunnelRow,然后再添加到Collector中 addSplits(List<SplitT> splits) reader接收split enumerator分配给自己的任务后的相关处理 snapshotState(long checkpointId) 这个方法是做checkpoint时会被调用, 需要reader记录一些状态, 从而可以进行后续的容错 用一张图来总结一下 在这里的一个Split表示对数据源数据读取拆分的一个任务,可以是一个Hive表的一个分区,可以是一个Kafka的分区,也可以是JDBC查询语句的拆分,总之核心思想是将一个数据的读取拆分成多个互相不影响的读取任务,从而可以交给不同的reader实例去执行,从而加快查询速度。 举个例子:在批处理下可以将数据拆分为N份,使得数据抽取可以并行执行,达到提升速度的目的。 而对于流式处理, 有两种方式, 一个是将数据拆分为有限的无界数据流(例如Kafka根据分区进行拆分, 一个分区一个任务,每个任务都是无界的数据流)。 另外一种方式是生成无限的有界数据流(同样以Kafka为例, 每次抽取某个分区中的一部分数据, 无限次生成任务定义)。 至于一个数据读取任务最终会被切分成多少个Split以及如何实现切分,则是每个连接器的自己实现,每个连接器可以根据实际读取的分区或者参数来决定。 接下来看下Transform的相关代码! SeaTunnelTransform public interface SeaTunnelTransform<T> extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware { default void open() {} default void setTypeInfo(SeaTunnelDataType<T> inputDataType) { throw new UnsupportedOperationException("setTypeInfo method is not supported"); } // 获取Transform处理之后的数据结构 CatalogTable getProducedCatalogTable(); // 从这里可以看出,SeaTunnel里面的Transform是仅支持map操作的 // 对于Join这种多个数据源的操作是不支持的 T map(T row); default void close() {} } 在transform中就一个关键的方法T map(T row),就是对原有的一条数据进行map处理,得到一条新数据。新数据的结构则是与getProducedCatalogTable()一致。 这个代码是基于2.3.6版本, 目前社区也正在做Transform的多表读取,写入功能 目前仅支持map这种一对一的算子, 我看社区也在讨论是否支持flatMap这种一对多的算子, 从而可以在同步过程中进行一些数据展开的操作. 再来看下Sink的代码! SeaTunnelSink public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware { @Deprecated // 这两个方法也都被标记为废弃, 后续都将使用Catalog来进行表示 default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { throw new UnsupportedOperationException("setTypeInfo method is not supported"); } @Deprecated default SeaTunnelDataType<IN> getConsumedType() { throw new UnsupportedOperationException("getConsumedType method is not supported"); } /** * 创建/恢复 SinkWriter,Writer是真正执行数据写入的类 */ SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException; default SinkWriter<IN, CommitInfoT, StateT> restoreWriter( SinkWriter.Context context, List<StateT> states) throws IOException { return createWriter(context); } default Optional<Serializer<StateT>> getWriterStateSerializer() { return Optional.empty(); } default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException { return Optional.empty(); } default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() { return Optional.empty(); } default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException { return Optional.empty(); } default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() { return Optional.empty(); } } 在Sink中,有几个关键的方法: createWriter(SinkWriter.Context context) 创建Writer实例,与Source类似,数据的实际写入是由Writer来写入。 createCommitter() 可选,在需要二阶段提交时,创建一个SinkCommitter,由SinkCommitter来完成二阶段提交, 此方式也不再推荐, 推荐使用createAggregatedCommitter()来进行二阶段提交。 createAggregatedCommitter() 可选,与SinkCommitter类似,都是在提交阶段进行二阶段提交使用。 不同的点在于SinkAggregatedCommitter是单一实例去执行,不会存在多实例,将所有的提交任务集中到一个地方执行。所以如果连接器需要二阶段提交, 推荐使用createAggregatedCommitter()来创建 SinkWriter public interface SinkWriter<T, CommitInfoT, StateT> { void write(T element) throws IOException; default void applySchemaChange(SchemaChangeEvent event) throws IOException {} Optional<CommitInfoT> prepareCommit() throws IOException; default List<StateT> snapshotState(long checkpointId) throws IOException { return Collections.emptyList(); } void abortPrepare(); void close() throws IOException; interface Context extends Serializable { int getIndexOfSubtask(); default int getNumberOfParallelSubtasks() { return 1; } MetricsContext getMetricsContext(); EventListener getEventListener(); } } 可以看到SinkWriter的结构与SourceReader结构有些类似, 来看下一些关键方法: write(T element) 当接收到一条上游数据时, 写入到目标数据库的实现; applySchemaChange(SchemaChangeEvent event) 当上游数据的表结构变动后, 下游如何进行相应的实现, 例如增删字段, 修改字段名称. 但这个跟具体的实现有关; prepareCommit() 当需要二阶段提交时, 生成此次需要提交的信息, 该信息将交给SinkCommitter/SinkAggregatedCommitter来进行二阶段提交. 这个方法的调用, 是在做checkpoint时会被调用, 也就是每次checkpoint时才会提交刚刚产生的信息到目标端连接器; snapshotState() 当做checkpoint时, 存储writer的一些状态, 从而可以进行后续的容错; 用图小结 当需要对某个数据源进行读取时,会先由SourceSplitEnumerator来进行任务的切分,再由SourceReader来执行拆分的数据读取任务,读取是需要将原始数据转换为SeaTunnel内部的SeaTunnelRow,然后传递给下游,交由Transform进行数据转换,转换完成后交由SinkWriter实现将SeaTunnelRow的数据写入到相应的连接器中,在写入过程中如果需要二阶段提交,则需要额外实现SinkCommitter的相关类。 Collector 在上面的几个章节, 已经将source, transform, sink的功能描述了一下. 但source,transform, sink之间的数据传递是如何实现的呢, 如果一个source有多个下游, 消息是如何全部分发给多个下游的呢? 这个地方就是由Collector接口来定义的. 从数据流转的方向上, 仅会有source到下游或者transform到下游传递数据, 所以可以看到有两个Collector的接口定义, 分别在source和transform包下。 先来看下source中Collector的定义: public interface Collector<T> { void collect(T record); default void markSchemaChangeBeforeCheckpoint() {} default void collect(SchemaChangeEvent event) {} default void markSchemaChangeAfterCheckpoint() {} Object getCheckpointLock(); default boolean isEmptyThisPollNext() { return false; } default void resetEmptyThisPollNext() {} } 而transform中的Collector定义就相对简单了 public interface Collector<T> { void collect(T record); void close(); } Collector将多个算子进行解耦, 每个算子仅需关心如何处理数据, 而无需关心结果数据需要发送给谁. 对上面的图再更新一下大致是这样(多加了一个Transform来显示多下游的场景) 在分布式系统中, 每个任务都使用单独的线程来运行, 那么多个线程之间的数据传递就存在两种情况, 一个是同进程之间的传递, 另外就是跨进程之间的传递也就是Shuffle. 而SeaTunnel的定位是一个数据传输工具, 数据产生之后, 并不需要对数据进行计算, 数据的传输可以仅在进程之间传递. 不需要做shuffle. 从而提升性能以及减少由shuffle带来的其他问题. 所以在SeaTunnel中, Collector的实现就是实现一个进程内跨线程的数据管道. 同样, 由于这样的设计, 之前在issue区看到有人想实现多个source并行读取, 单个sink写入的功能也就无法实现了. 如果需要想实现单点写入的功能, 那么就需要连接器实现SinkAggregatedCommitter来进行单点的二阶段提交(至于具体是否是单节点写入, 也需要看连接器的具体实现). Factory 在新的设计中,所有连接器都是通过Factory工厂类来进行创建的。 TableSourceFactory TableTransformFactory TableSinkFactory 他们均实现了Factory接口,而他们本身也是接口,需要各个连接器去具体实现。 里面有这么几个方法 public interface Factory { String factoryIdentifier(); OptionRule optionRule(); } public interface TableSourceFactory extends Factory { default <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } Class<? extends SeaTunnelSource> getSourceClass(); } public interface TableTransformFactory extends Factory { default <T> TableTransform<T> createTransform(TableTransformFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } } public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory { default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink( TableSinkFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } @Deprecated default List<String> excludeTablePlaceholderReplaceKeys() { return Collections.emptyList(); } } 在这几个接口定义中, 有两个公共方法 factoryIdentifier() 进行连接器标识, 每个连接器应该唯一 optionRule() 声明该连接器所需要的参数,哪些是必填, 哪些是选填, 哪些一起填会存在冲突等等, 在创建连接器时会先对配置参数来进行校验. 当参数验证通过后, 会调用相应的方法来创建相应的连接器SeaTunnelSource/SeaTunnelTransform/SeaTunnelSink 总结 Apache SeaTunnel 的 API 提供了强大的数据集成能力,能够灵活应对不同的业务场景。通过这几个关键API的定义, 我们可以看到, Apache SeaTunnel是将数据同步进行了高度抽象, 并且可以灵活的对各个连接器进行组装, 各个连接器之间是没有依赖的. 希望开发者能够更加深入地理解 SeaTunnel API,并在实际项目中高效地利用这些 API 进行数据处理和集成。 本文由 白鲸开源科技 提供发布支持! 询问AI

优秀的个人博客,低调大师

LLM 工程师入门:生成式AI的简易指南

编者按: 大模型发展了近两年,Baihai IDP 也分享了近百篇LLM各环节的技术洞察,有前沿探讨、有落地实践、有应用经验。但回头来看,我们似乎从来没有认真、从0开始探讨过LLM的基本原理。 最近,一些企业客户和伙伴来询问,是否有LLM的从0到1的科普贴。他们说: "虽然在很多场景中,LLM都已经渗透入我们的工作生活,但对其内部的运作机制,仍有很多谜团待解决。 在应用落地时,LLMs 这种"黑箱式"的运作模式,不仅使我们难以完全信任这些模型的输出结果,也阻碍了我们对其进一步研究和优化的步伐。如果我们无法理解 LLMs 的工作原理,就很难评估它们的局限性,进而制定出有针对性的解决方案。" 因此,我们把这篇LLM基础原理文章推荐给大家。 本文为希望深入了解生成式AI的开发者、技术爱好者、AI落地的领导者和研究者们编写,以通俗易懂的语言,系统地剖析了大语言模型的内部结构和训练流程,从 token、next token predictions,到马尔可夫链、神经网络等核心概念,循序渐进地揭示了 LLM 是如何生成文本的。 作者 | Miguel Grinberg 编译 | 岳扬 毫无疑问,随着大语言模型[1](LLMs)的新闻不断出现在我们的日常生活,生成式人工智能[2](GenAI)已经成为了我们无法忽视的存在。或许你早已体验过 ChatGPT[3] ,甚至把它当作日常生活的小助理了。 面对这场 GenAI 变革,许多人心中都有一个疑问:这些模型表面上的智能(intelligence)究竟源自何处?本文将试图用浅显易懂的语言,不涉及复杂数学公式,来揭秘生成式文本模型的工作原理,让你认识到它们并非魔法,而是计算机算法的产物。 01 What Does An LLM Do? 首先,我要澄清人们对大语言模型工作原理的一个重大误解。人们通常认为,这些模型能够回答我们的问题或与我们进行对话,但实际上,它们所能做的是基于我们提供的文本输入,预测下一个单词(更准确地说,是下一个 token)。 现在,让我们一步步揭开 LLMs 神秘面纱背后的"真面目",从 token 开始探索。 1.1 Tokens token 是大语言模型(LLM)处理文本时的基本单元。虽然我们可以简单地将 token 视为单个单词,但 LLM 的目的是以最高效的方式对文本进行编码。因此在很多情况下,token 可能是比单个单词短或长的字符序列。 标点符号和空格同样以 token 的形式存在,它们可以单独表示为一个 token,也可以与其他字符组合。 LLM 所使用的所有 token 构成了它的词汇表(vocabulary),这个词汇表能够用来表达所有可能的文本内容。大语言模型通常采用 BPE(Byte Pair Encoding)[4]算法来根据输入数据集创建 token 词汇表。以 GPT-2 语言模型[5](开源模型,可供深入研究)为例,其词汇表拥有 50,257 个 token。 每个 token 在 LLM 的词汇表中都有一个独一无二的标识符(通常是一个数字编号)。LLM 通过分词器将常规文本字符串转换为一系列 token 编号。 如果您对 Python 有所了解,并且想要尝试对 token 进行操作,可以安装 OpenAI 提供的 tiktoken 软件包: 然后请在 Python 命令行中尝试以下操作: 在本实验中,我们可以观察到,GPT-2 语言模型的词汇表中,token 464 对应的是单词"The",而 token 2068 则对应" quick",这个 token 包括了单词之前的空格。在该模型中,句号由 token 13 表示。 由于 token 是通过算法来决定的,因此我们可能会遇到一些奇特的情况,比如 GPT-2 会将单词"the"的以下三种形式编码为不同的 token: BPE 算法并不会总是将整个单词直接转化为一个 token。事实上,那些使用频率较低的单词不会被单独表示为一个 token,而是需要通过多个 token 的组合来进行编码。下文这个例子,展示了 GPT-2 模型是如何用一个由两个 token 组成的序列来编码某个单词的: 1.2 Next Token Predictions 如上文所述,语言模型会根据给定文本预测之后可能出现的 token。如果用 Python 伪代码来展示这个过程,下文就演示了如何使用这些模型来预测下一个 token: 这个函数将用户输入的提示词转换成的 token 列表作为模型输入。这里假设每个单词都是一个单独的 token。为了简单起见,此处使用了每个 token 的文字形式,但实际上,每个 token 都是以数字的形式传递给模型的。 该函数返回的是一种特有的数据结构,它为词汇表中的每个 token 分配了一个在输入文本后紧接着出现的概率。如果使用的是 GPT-2 模型,那么返回的将是一个包含 50,257 个浮点数的列表(list),列表中每个数字代表相应 token 紧接着文本内容出现的概率。 在上述案例中,可以设想训练效果良好的语言模型会为"jumps"这个 token 分配一个较高的概率,以接续短语"The quick brown fox"[6]。同样地,如果模型训练得当,那么像"potato"这样的随机单词接在这个短语后面的概率就会低很多,几乎接近于0。 为了做出合理的预测,语言模型需要经过一个训练过程。在训练期间,模型会学习大量文本内容。训练结束后,模型就能利用它从训练文本中构建的特有数据结构,来计算给定 token 序列的下一个 token 的概率。 这与你的预期是否有所不同?我希望现在这个概念看起来不再那么神秘了。 1.3 生成长文本序列 由于模型只能预测下一个出现的 token,因此要想让它生成完整的句子,就必须在 for 循环中多次运行模型。每一次循环迭代,都会根据返回的概率列表选择一个新的token。这个新 token 会被加入到下一次循环迭代中模型的输入序列中,如此循环往复,一直持续到生成足够的文本为止。 下面是一个更完整的 Python 伪代码示例,演示了这个过程: generate_text() 函数需要用户提供提示词内容,比如可以是一个问题。 tokenize() 这个辅助函数负责将用户的提示词转换成一系列 token,这个过程会用到 tiktoken 或类似的库。在 for 循环中,get_token_predictions() 函数负责调用AI模型,获取下一个 token 的概率列表,与上文案例中描述的过程相同。 select_next_token() 函数的作用是根据模型给出的下一个 token 的概率列表,从候选 token 中挑选出最合适的 token 来放入输入序列。这个函数可以采取最简单的方法,即选择概率最高的 token ,这在机器学习中被称为"greedy selection"。然而,为了增加生成文本的多样性,该函数通常会采用更高级的策略,使用一个随机数生成器来选择 token,即使是在随机选择 token 的情况下,也会优先选择那些概率较高的 token。通过这种方式,即便是给出相同的输入提示词,模型也能生成不同的文本响应。 为了进一步增加 token 选择过程的灵活性,可以通过调整超参数来改变 LLMs 返回的概率分布。 这些超参数为传递给文本生成函数的参数,能够帮助用户控制 token 选择过程的"greediness"(译者注:模型在选择下一个 token 时所表现出的倾向性,是倾向于选择概率最高的token(即最可能的token),还是允许一些不太可能的token(即概率较低的token)被选中。)。如果你以前经常使用 LLMs,那么就很可能对 "temperature" 超参数比较熟悉。当 temperature 值较高时,token 的概率分布会被 flattened out(译者注:模型会考虑更多的token,包括那些概率较低的token,使得概率分布更加均匀。),这样做的结果是,之前不太可能被选中的 token 现在有更大的机会被选中,从而使生成的文本看起来更具创造性和新颖性。除了 temperature 之外,还有两个超参数 top_p 和 top_k,它们分别用来控制在选择过程中高概率 token 被选中的数量。 通过调整这些超参数,可以进一步影响文本生成的风格和多样性。 一旦模型选定了下一个token,循环就会继续迭代。此时,模型将接收到一个新的输入序列,这个输入序列的末尾添加了新 token(译者注:上一次迭代选择的 token)。num_tokens 参数决定了循环的迭代次数,也就是生成文本的长度。生成的文本可能会(并且经常)在句子中间结束,因为大语言模型(LLM)并没有句子或段落的概念,它只是逐个处理 token。为了防止生成的文本内容在句子中间就结束了,我们可以将 num_tokens 参数视为一个最大数量值,而不是一个确切的 token 数量。在这种情况下,我们可以在模型生成一个句号 token 时结束循环。 如果你已经阅读到此处,并且理解了前文的所有内容,那么恭喜你,你现在对 LLMs 的工作原理已经有了较为深入的理解。各位读者是否对更多技术细节感兴趣?在下一节中,我将介绍更多技术细节,同时尽量避免提及关于这项技术的复杂数学知识。 02 Model Training 不幸的是,要想不涉及数学知识就讨论模型的训练过程,实在是件不容易的事。接下来,我将首先向大家展示一种非常简单的训练方法。 我们的目标是预测 token 后面可能出现的其他 tokens,因此,训练模型的简单方法就是从训练数据集中提取所有连续的 tokens 对,然后用这些数据来构建一个概率表(table of probabilities)。 让我们用一个简短的词汇表(vocabulary)和数据集(dataset)来做这件事。假设模型的词汇表有以下五个 token : 为了使这个例子简明扼要,我们不把空格和标点符号算作 token。 我们使用以下三个句子组成的训练数据集: I like apples I like bananas you like bananas 我们可以制作一个 5x5 的表格,每个单元格记录的是该行 token 后面跟着该列 token 的频次。表格如下: 该数据集中,"I like" 出现了两次,"you like" 一次,"like apples" 一次,而 "like bananas" 则是两次。 现在我们知道了训练数据集中每对 tokens 的出现频率,就可以推算出它们相互跟随的概率。做法是将表格中每一行的数字转换成概率。比如,表格里"like"这一行,我们看到它后面跟着 "apples" 一次,跟着 "bananas" 两次。这意味着在"like"之后出现 "apples" 的概率是33.3%,而出现 "bananas" 的概率则是66.7%。 下面是计算出所有概率的完整表格。那些空白单元格代表的概率自然就是0%。 对于 "I"、"you" 和 "like" 这些行的概率计算比较轻松、直接,但 "apples" 和 "bananas" 这两行就有点难办了,因为数据集中并没有 "apples" 和 "bananas" 后面跟着其他 token 的情况。这就好比在我们的训练数据中出现了"缺口"。为了确保模型在遇到这种未训练的情况时也能有所输出,我决定将 "apples" 和 "bananas" 后续 token 的概率均匀分配给其他四个 tokens。这种做法可能会导致一些不太自然的输出结果,但至少模型在处理这两个 token 时不会卡壳。 训练数据中的这种"缺口"问题不容小觑。在真正的 LLMs 中,由于训练数据集规模非常庞大,我们不太可能遇到像本文这个简单例子中这么明显的缺口。但是,由于训练数据的覆盖面不足,那些较小的、较难发现的缺口确实是存在的,并且相当常见。 LLMs 在这些训练不足的区域所做的 tokens 预测质量可能会不高,而且这些问题往往不易被察觉。这也是 LLMs 有时会出现"幻觉[7]"(即生成的文本虽然朗朗上口,但可能包含与事实不符的内容或前后矛盾之处。)的原因之一。 借助上面的概率表,你现在可以构思一下 get_token_predictions() 函数的实现方法。以下是用 Python 伪代码表示的一种实现方式: 是不是比想象中还要简单些呢?这个函数可以接收来自用户提示词的 tokens 序列。它提取出这个序列中的最后一个 token ,并找到概率表中对应的那一行。 假如你用 ['you', 'like'] 作为 input tokens 调用这个函数,它会给出 "like" 对应的那一行,这使得 "apples" 有 33.3% 的可能性放入句子的下一部分,而 "bananas" 则有 66.7% 的可能性。根据这些概率值可以得知,上面提到的 select_next_token() 函数在三次中有一次会挑选 "apples"。 当 "apples" 被选为 "you like" 的后续词时,我们得到了句子 "you like apples"。这个句子在训练数据集中并不存在,但它却是完全合理的。希望这个小例子能让您开始意识到,这些模型是如何通过重新利用训练中学到的 patterns(译者注:在数据集中识别和学习的重复出现的 tokens 序列组合。) 和拼接不同的信息片段,来形成看似原创的想法或概念的。 2.1 上下文窗口(The Context Window) 在上一部分,我用来训练那个微型语言模型的方法,称为马尔可夫链[8](译者注:Markov chain,一种数学系统,用于描述一系列可能的事件,其中每个事件的发生概率只依赖于前一个事件。)。 这种技术存在的问题是,它只根据一个 token(输入序列中的最后一个 token)来预测接下来的内容。在此之前的文本在决定如何延续文本内容时并不起作用,因此我们可以认为这种方法的上下文窗口仅限于一个 token,这是非常小的。由于上下文窗口太小,模型会不断"忘记"自己的思路,从一个词跳到下一个词,显得杂乱无章。 为了提高模型的预测能力,我们可以构建一个更大的概率表。如果想使用两个 token 的上下文窗口,就需要在概率表中添加代表所有两个 token 组合的新行。以前文的例子来说,五个 token 将会在概率表增加 25 行两个 token 组合的新行,再加上原有的 5 个单 token 行。这次除了考虑两个 token 的组合外,还需要考虑三个 token 的组合,因此必须再次训练模型。在 get_token_predictions() 函数的每次循环迭代中,如果条件允许,将使用输入序列的最后两个 tokens 来在一个更大的概率表中找到与这两个 tokens 相对应的行。 但是,两个 tokens 的上下文窗口仍然不够。为了让生成的文本不仅在结构上能够自洽,还能在一定程度上具备一些意义,需要一个更大的上下文窗口。如果没有足够大的上下文窗口,新生成的 token 就无法与之前的 tokens 所表达的概念(concepts)或想法(ideas)建立联系。那么我们应该如何是好?将上下文窗口扩展到 3 个 tokens 会在概率表中增加 125 行新内容,但质量可能仍然不尽人意。我们又需要多大的上下文窗口呢? OpenAI 的开源模型 GPT-2 使用的上下文窗口其大小为 1024 个 tokens。如果我们要用马尔可夫链(Markov chains)实现这样一个大小的上下文窗口,概率表的每一行都需要代表一个长度在 1 到 1024 个 tokens 之间的序列。以前文例子中使用的 5 个 tokens 的词汇表为例,1024 个 token 长度的序列共有 5 种可能的 token 组合。我在 Python 中进行了计算: >>> pow(5, 1024) 55626846462680034577255817933310101605480399511558295763833185422180110870347954896357078975312775514101683493275895275128810854038836502721400309634442970528269449838300058261990253686064590901798039126173562593355209381270166265416453973718012279499214790991212515897719252957621869994522193843748736289511290126272884996414561770466127838448395124802899527144151299810833802858809753719892490239782222290074816037776586657834841586939662825734294051183140794537141608771803070715941051121170285190347786926570042246331102750604036185540464179153763503857127117918822547579033069472418242684328083352174724579376695971173152319349449321466491373527284227385153411689217559966957882267024615430273115634918212890625 这个行数多得吓人!这还只是整个概率表的一小部分,因为我们还需要长度从 1023 个 tokens 到 1 个 token 的所有序列,以确保在模型输入中没有足够 token 时,较短的 token 序列也能被处理。虽然马尔可夫链很有趣,但它们也确实存在很大的可扩展性问题。 而且,1024 个 tokens 的上下文窗口现在也已经不算特别大了。在 GPT-3 中,上下文窗口增加到了 2048 个 tokens,然后在 GPT-3.5 中增加到了 4096 个 token。GPT-4 最开始拥有 8192 个 tokens 的上下文窗口,后来增加到了 32K,再后来又增加到了 128K(没错,128,000 个 tokens !)。现在,具有 1M 或更大上下文窗口的模型也开始出现了,更大的上下文窗口允许模型更好地理解和利用之前输入的文本信息,从而在生成新的文本时保持更高的连贯性和准确性。 总之,马尔可夫链让我们以正确的方式思考文本生成问题,但它们也存在一些重大问题,使得我们无法将其视为一个可行的解决方案。 2.2 从马尔可夫链到神经网络 显然,我们必须放弃使用概率表的想法,因为一个合理上下文窗口的概率表需要的 RAM 大得惊人。我们可以采取的替代方案是用函数(function)来代替表格(table),这个函数是通过算法生成的,而不是存储在一个大表格中。这正是神经网络擅长的领域。 神经网络是一种特殊的函数,它接受 inputs ,对其进行计算,并返回 output 。对于语言模型来说,input 代表提示词转换的 tokens,output 是对下一个 token 的预测概率列表。 我说过神经网络是一种"特殊"的函数,是因为除了函数的逻辑之外,它们对 input 进行的计算还受到一系列外部定义参数的控制。一开始,神经网络的参数是未知的,因此,函数产生的输出是完全无用的。神经网络的训练过程包括寻找参数,找到使函数在对训练数据集进行评估时表现最佳的参数,其假设是,如果函数在训练数据上表现良好,那么它也会在其他数据上表现良好。 在训练过程中,神经网络的参数会通过反向传播算法[9],以极小的增量进行迭代调整。这个过程涉及大量的数学计算,因此在这里不会详细展开。每次调整后,神经网络的预测能力都会略有提升。更新参数后,神经网络会再次评估其在训练数据集上的表现,并根据评估结果进行下一轮的调整。这个过程会一直持续下去,直到神经网络在训练数据集上对下一个 token 的预测达到令人满意的效果。 下面帮助各位读者了解一下神经网络的规模,GPT-2 模型大约有 15 亿个参数,而 GPT-3 将参数数量增加到 1750 亿,GPT-4 据说有大约 1.76 万亿个参数。使用目前这一代硬件来训练这样大规模的神经网络需要相当长的时间,通常是几周甚至几个月。 有趣的是,由于参数数量众多,而且都是通过长时间的迭代过程计算出来的,没有人类干预,因此很难理解模型是如何工作的。训练好的大语言模型就像一个黑盒子,非常难以调试,因为模型的大部分"思维"都隐藏在参数中。即使是那些训练它们的人,也很难解释其内部的工作原理。 2.3 模型层、Transformers 和注意力机制 你可能非常想知道,神经网络函数内部究竟进行了哪些神奇的运算,使得它们在这些经过精心调优的参数的帮助下,能够接收 input tokens 列表,并以某种方式"猜"出下一个 token 的合理概率。 神经网络就像是一套复杂的操作链条⛓,链条的每个链环都被称为一个"模型层"。第一层接收 inputs (译者注:神经网络接收的初始数据。),对其进行某种形式的转换。经过转换的 input 接着进入下一层,再次进行转换。这一过程一直持续到数据最终到达最后一层,并在这里进行最后一次转换,从而产生最终的模型输出。 机器学习专家们设计了各种类型的模型层,它们能够对输入数据执行数学转换。同时,他们还找到了模型层的不同组织方式和分组方法,以达到预期的效果。有些模型层是通用的,适用于多种类型的输入数据;而有些模型层则是专门为处理特定类型的数据而设计的,比如图像数据或大语言模型中的经过分词后的文本。 在大语言模型的文本生成场景中,目前最流行的神经网络架构是 Transformer[10]。使用这种架构设计的 LLM(大语言模型)被称为 GPT,即 Generative Pre-Trained Transformers[11]。 Transformer 模型的独特之处在于它们执行的一种称为 Attention[12] 的模型层计算方式,这使得它们能够从上下文窗口中的 tokens 中推断出它们之间的关系(relationships)和模式(patterns),随后将这些关系和模式反映在下一个 token 的预测概率中。 Attention 机制最初被用于语言翻译场景,是一种找出输入序列中哪些 tokens 对理解其意义最为重要的方法。这种机制使现代翻译器能够通过关注(或集中"注意力"于)重要的单词或tokens,从根本上"理解"一个句子。 03 Do LLMs Have Intelligence? 您可能已经开始思考,大语言模型(LLMs)在生成文本时是否展现出某种智能了? 我个人并不认为 LLMs 具备推理或创造原创思想的能力,但这并不意味着它们一无是处。由于 LLMs 对上下文窗口中的 tokens 进行了巧妙的计算,所以 LLMs 能够识别出用户提示词中存在的 patterns,并将它们与训练期间学到的类似 patterns 相匹配。 它们生成的文本大部分都是由零碎的训练数据组成的,但它们将单词(实际上是 tokens)拼接在一起的方式非常复杂,在很多情况下,它们生成的模型输出结果既有原创感,又非常有用。 鉴于 LLM 容易产生幻觉,我不会信任未经人工验证的情况下,任何直接将 LLMs 的 output 发送给用户的工作流程。 在未来的几个月或几年里,是否会有更大的 LLM 出现?它们能够实现真正的智能吗?由于 GPT 架构的诸多限制,我觉得这是不太可能实现的,但谁知道呢,也许随着未来创新成果的不断涌现,我们会实现这一目标。 04 The End 感谢您与我一起坚持到最后!我希望本文已经能够激发你的学习兴趣,让你决定继续学习大模型相关知识,并最终面对那些令人畏惧的数学知识,如果您想详细了解每一个细节的话,就无法回避这些数学知识。在这种情况下,我强烈推荐 Andrej Karpathy 的《Neural Networks: Zero to Hero》[13]系列视频。 Thanks for reading! Hope you have enjoyed and learned new things from this blog! About the authors Hi, my name is Miguel. I'm a software engineer, but also tinker with photography and filmmaking when I have time. I was born in Buenos Aires, Argentina, but I lived most of my adult life in Portland, Oregon, USA, the place that comes to mind when I think of home. As of 2018 I'm living in Ireland. END 本期互动内容 🍻 ❓通过阅读本文,你能够理解为什么 LLMs 会产生"幻觉"或者不可靠的输出吗?你认为造成这些问题的原因是什么? 🔗文中链接🔗 [1]https://en.wikipedia.org/wiki/Large_language_model [2]https://en.wikipedia.org/wiki/Generative_artificial_intelligence [3]https://chat.openai.com/ [4]https://en.wikipedia.org/wiki/Byte_pair_encoding [5]https://github.com/openai/gpt-2 [6]https://en.wikipedia.org/wiki/The_quick_brown_fox_jumps_over_the_lazy_dog [7]https://en.wikipedia.org/wiki/Hallucination_(artificial_intelligence) [8]https://en.wikipedia.org/wiki/Markov_chain [9]https://en.wikipedia.org/wiki/Backpropagation [10]https://en.wikipedia.org/wiki/Transformer_(deep_learning_architecture) [11]https://en.wikipedia.org/wiki/Generative_pre-trained_transformer [12]https://en.wikipedia.org/wiki/Attention_(machine_learning) [13]https://karpathy.ai/zero-to-hero.html 原文链接: https://blog.miguelgrinberg.com/post/how-llms-work-explained-without-math

优秀的个人博客,低调大师

Q-learning 入门:以 Frozen Lake 游戏环境为例

编者按:近年来,强化学习在游戏和机器人控制等领域取得了较大的进步。如何设计一种强化学习算法,使机器人或Agent能够在复杂环境中学习最优策略(Optimal Policy)并作出最优的决策,这成为一个重要课题。 我们今天为大家带来的这篇文章,作者指出可以通过设计并训练Q-learning算法来解决强化学习中的决策问题。 作者首先以Frozen Lake游戏为例导入问题。然后详细介绍Q-learning的设计思路,包括构建Q-table、定义value更新公式、设置reward机制、添加epsilon-greedy探索策略等方法。最后作者通过代码示例详细展示了如何从零开始实现Q-learning算法,并取得不错的实验效果。 本文内容详实,示例代码易于理解,对于读者学习和应用强化学习算法具有一定的参考价值。 作者 | Maxime Labonne 编译|岳扬 🚢🚢🚢欢迎小伙伴们加入AI技术软件及技术交流群,追踪前沿热点,共探技术难题~ 本文的目标是教会人工智能如何使用强化学习算法解决❄️Frozen Lake游戏环境。我们将从头开始,尝试自己重新创建Q-learning算法。我们不仅要了解它是如何工作的,更重要的是,懂得为什么要这样设计。 我们希望通过本文让读者能够掌握Q-learning算法,并能够将其应用于其他实际问题。这是一个很有趣的迷你项目,能够帮助我们更好地理解强化学习的工作原理,并希望能够激发读者产生更多有创意的产品灵感。 我们首先需要安装Frozen Lake游戏环境,并导入以下必要的库:用于模拟游戏环境的gym、用于生成随机数的random和用于数学运算的numpy。 !pip install-q gym !pip install-q matplotlib importgym importrandom importnumpyasnp 01 ❄️ Frozen Lake 现在,让我们来谈一谈在本教程中要用算法解决的游戏。Frozen Lake是一个由方块组成的简单游戏环境,AI必须从起始方块移动到目标方块。 方块可以代表安全的冰面✅,也可以代表洞❌,一旦掉进洞中就会永远被困住。 AI或agent可以执行4种动作:向左移动◀️,向下移动🔽,向右移动▶️,或向上移动🔼。 agent必须学会避开洞,以最少的动作次数到达目标方块。 默认情况下, 游戏环境的配置始终保持不变。 在游戏环境的代码中,每个方块都用一个字母表示,如下所示: S F F F(S:starting point,safe) F H F H(F:frozen surface,safe) F F F H(H:hole,stuck forever) H F F G(G:goal,safe) 我们可以尝试手动解决上述例子,以帮助我们理解这个游戏。看看下面这一系列动作是否是一种正确的解法:向右移动→向右移动→向右移动→向下移动→向下移动→向下移动。agent从方块S开始,所以我们向右移动还是在冰面上✅,然后再次向右移动✅,再次向右移动✅,然后再向下移动就会掉入洞❌中。 其实,要找到几种正确的解法并不难:向右移动→向右移动→向下移动→向下移动→向下移动→向右移动就是一种简单的解法。但是我们也可以制定一连串的动作,绕着某一个洞转10圈,然后再到达目标方块。这个agent的动作路径是有效的,但它不符合游戏的要求:agent需要以最少的操作次数达到目标方块。在这个例子中,完成该局游戏的最少动作次数是6次。我们需要记住这个结论,以便检查agent是否真正掌握了Frozen Lake游戏。 让我们借助gym库来初始化游戏环境。这个游戏有两个版本:一个是冰面湿滑的版本,在这个版本中,游戏指令有一定的概率被agent忽略;另一个是不湿滑的版本,在这个版本中,游戏指令不会被忽略。本文首先使用不湿滑的版本,因为这个版本更容易理解。 environment=gym.make("FrozenLake-v1",is_slippery=False) environment.reset() environment.render() 🟥FFF FHFH FFFH HFFG 我们可以看到,所创建的这局游戏环境与前文示例的游戏配置完全一致。agent的位置用一个红色矩形表示。可以使用一个简单的脚本和if...else条件判断来解决这局游戏,这种方法有助于将我们的人工智能与更简单的方法进行对比。然而,我们想尝试一种更有趣的解决方案:强化学习。 02 🏁 Q-table 在Frozen Lake游戏中,有16个方块,这意味着agent可以处于16个不同的位置,这些位置也称为状态(states),也就是说有16个可能的状态。对于每个状态,都有4种可能的动作可供选择:向左移动◀️,向下移动🔽,向右移动▶️和向上移动🔼。学习如何玩Frozen Lake游戏就像学习在每个状态下应该选择哪个动作。为了确定在给定状态下哪个动作是最优的,我们需要为动作分配一个质量值。 有16个状态和4种动作,因此需要计算出16 x 4=64个质量值。 一种很好的表示方法是使用一个称为Q-table的表格,其中行表示状态s,列表示动作a。在这个Q-table中,每个单元格都会包含一个值Q(s,a),表示状态s中动作a的质量值(如果是当前状态的最佳动作,质量值则为1,如果是当前状态的最差动作,质量值则为0)。 当agent处于某个特定的状态s时,它只需查看这个表格,看看哪个动作具有最高的质量值。选择具有最高质量值的动作是合理的选择,但是我们稍后会看到,我们可以设计出更好的解决方案... S◀️LEFT🔽DOWN▶️RIGHT🔼UP 0Q(0,◀️)Q(0,🔽)Q(0,▶️)Q(0,🔼) 1Q(1,◀️)Q(1,🔽)Q(1,▶️)Q(1,🔼) 2Q(2,◀️)Q(2,🔽)Q(2,▶️)Q(2,🔼) ... ... ... ... ... 14Q(14,◀️)Q(14,🔽)Q(14,▶️)Q(14,🔼) GQ(15,◀️)Q(15,🔽)Q(15,▶️)Q(15,🔼) Q-table的一个简单示例。其中每个单元格都包含给定状态s(行)下行动a(列)的值Q(a,s)。 先创建一个 Q-table,然后在表中所有单元格都填入0,因为暂时还不知道每个状态下每个操作的值。 #Our table has the following dimensions: #(rows x columns)=(states x actions)=(16 x 4) qtable=np.zeros((16, 4)) #Alternatively,the gym library can also directly g #give us the number of states and actions using #"env.observation_space.n"and"env.action_space.n" nb_states=environment.observation_space.n#=16 nb_actions=environment.action_space.n#=4 qtable=np.zeros((nb_states,nb_actions)) #Let's see how it looks print('Q-table=') print(qtable) Q-table= [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]] 太好了!现在按照预期,我们已经拥有了一个16行(16个状态)和4列(4种动作)的Q-table。接下来我们应该怎么做:Q-table的每个值都为0,因此还没有任何信息可供使用。假设agent随机选择了一种动作:向左移动◀️,向下移动🔽,向右移动▶️或向上移动🔼。 我们可以使用random库的choice方法来随机选择一个动作。 random.choice(["LEFT", "DOWN", "RIGHT", "UP"]) 'LEFT' 不过,当前agent处于初始方块S,意味着只有两种动作可行:向右移动▶️和向下移动🔽。agent也可以选择向上移动🔼和向左移动◀️,但实际上agent并不能移动:因为它的状态不会改变。因此,我们不对可选择的动作进行任何限制,agent会自然而然地理解其中一些动作没有实际效果。 我们可以继续使用random.choice(),但gym库已经具备一种随机选择动作的方法。可能可以为我们省去一些麻烦,所以让我们试试吧。 environment.action_space.sample() 0 哎呀......得到的随机动作是数字。我们可以去阅读gym的文档[1],但遗憾的是,文档内容很少。不过不用担心,我们可以在GitHub上查看源代码[2],来了解这些数字的含义。这其实非常简单明了: ◀️LEFT= 0 🔽DOWN= 1 ▶️RIGHT= 2 🔼UP= 3 好的,既然现在已经了解 gym如何将数字与方向联系起来,让我们开始尝试用gym将agent向右移动▶️。这次,我们可以使用step(action)方法来实现。可以尝试直接使用与选择的方向(右)相对应的数字2,然后检查agent是否移动。 environment.step(2) environment.render() (Right) S🟥FF FHFH FFFH HFFG 好极了!代码成功运行了。红色方块从初始位置S向右移动了。我们与环境交互所需的全部信息如下: 如何使用action_space.sample()随机选择一种动作; 如何使用step(action)执行这种动作,并让agent移动到指定的方向。 为了完整了解具体环境情况,我们还可以添加以下内容: 如何使用render()方法显示当前地图,来查看当前游戏环境情况; 当agent掉入洞或到达目标G时,如何使用reset()方法重新开始游戏。 现在我们了解了如何与gym环境进行交互,开始回到本文要讲述的Q-learning算法。在强化学习中,当agent完成预定的目标时,游戏环境会给agent提供奖励。 在Frozen Lake 游戏中,只有当agent到达状态G时才会获得奖励(请参阅源代码)。我们无法控制这种奖励,它是由环境设定的:当agent到达G时,奖励值为1,没有到达G时,为0。 让我们在每次执行动作时打印出奖励值。奖励内容由step(action)方法给出: action=environment.action_space.sample() #2.Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Display the results(reward and map) environment.render() print(f'Reward={reward}') (Left) 🟥FFF FHFH FFFH HFFG Reward= 0.0 奖励值确实是0...😱哇,我想我们陷入困境了,因为整个游戏中只有一个状态能给我们带来正面的奖励。如果我们只有在游戏结束时才能验证,那么我们怎么能在一开始就选择正确的方向呢?如果我们希望奖励值为1,就需要足够幸运,在偶然的机会下找到正确的操作顺序。不幸的是,这正是它的工作原理......在agent随机到达目标G之前,Q-table 中一直都是0。 如果能够在中间过程获得较小的奖励来引导agent朝着目标G的路径前进,问题将变得简单得多。然而,这实际上是强化学习的主要问题之一:这种现象被称为稀疏奖励(sparse rewards),使得agent在只有经过一系列长时间的动作之后才能获得奖励的问题上非常难以训练。 有一些技术可以缓解这个问题,但我们将在以后讨论这些技术。 03 🤖 Q-learning 让我们回到要讨论的问题上来。现在我们需要足够幸运才能意外地找到目标G。但一旦找到了,如何将信息反向传播到初始状态呢?Q-learning算法为这个问题提供了一个巧妙的解决方案。需要更新 state-action pairs(Q-table中的每个单元格)的值,更新时要考虑(1)到达下一个状态的奖励值,以及(2)下一个状态的最大可能值。 我们知道,当agent移动到目标方块G时会获得奖励(数值为1)。正如我们刚才所说的,通过奖励,与G相邻状态(我们称其为G-1)的值会增加。好了,剧情结束:agent赢了,重新开始游戏。当下一次agent到达G-1旁边的状态时,它将使用与到达G-1相关的操作,来增加这个状态的值(我们称之为G-2)。agent下一次处于G-2旁边的状态时,也会做同样的事情。如此反复,直到到达初始状态S。 让我们试着找出一个相关的公式,将数值从状态G反向传播到状态S。请记住:该数值表示特定状态下动作的质量(如果是该状态下的最糟糕动作,则数值为0;如果是该状态下的最佳动作,则数值为1)。需要更新状态sₜ(当agent处于初始状态S时,sₜ=0)中动作aₜ(例如,如果动作是向左,则aₜ=0)的值。这个值只是Q-table 中的一个单元格,对应于行号sₜ和列号aₜ的值,即Q(sₜ,aₜ)。 如前所述,我们需要使用(1)下一个状态的奖励值(表示为rₜ)和(2)下一个状态的最大可能值(表示为maxₐQ(sₜ₊₁,a))来更新它。因此,更新公式应如下所示: 新的值是当前值加上奖励值再加上下一个状态中最高的值。我们可以手动验证该公式是否正确:假设agent第一次处于目标G旁边的状态G-1,我们可以用以下方法更新G-1状态中获胜操作所对应的值: 初始时,Q(G-1,aₜ)=0,maxₐQ(G,a)=0,因为Q-table 为空,而rₜ=1,因为在游戏环境中只获得了唯一的奖励,然后得到Q{new}(G-1,aₜ)=1。下一次,当agent处于与此状态(G-2)相邻的状态时,我们也使用该公式进行更新,并得到相同的结果:Q{new}(G-2,aₜ)=1。最后,我们从G向S在Q-table中进行反向传播。这样确实可以工作,但结果是二元的:要么是错误的state-action pair,要么是最佳的state-action pair。 我们希望能有更多的细微差别...... 实际上,我们已经接近找到真正的 Q-learning算法更新公式,但我们还需要添加两个参数: α是💡学习率(介于0和1之间),表示原始的Q(sₜ,aₜ)值的变化程度。 如果α=0,值永远不会改变,但如果α=1,值会变化得非常快。在上文的示例中,学习率没有被限制,因此α=1。但在实际情况中,这样太快了:奖励值和下一个状态中的最大值很快就会超过当前值。我们需要找到一个平衡点。 γ是📉折扣因子(介于0和1之间),它决定了agent对未来奖励与当前奖励的关心程度(俗话说,“一鸟在手胜过双鸟在林”)。 如果γ=0,agent只关注即时奖励,但如果γ=1,任何潜在的未来奖励与及时奖励具有相同的价值。在❄️Frozen Lake中,由于游戏仅在结尾才会产生奖励,我们需要使用较高的折扣因子。 在真正的Q-learning算法中,新值的计算公式如下: 好呀,让我们先试试这个新公式,然后再来使用它。再次假设 agent第一次处于目标G旁边的状态。可以使用该公式更新state-action pair,以便在游戏中取得胜利:Q{new}(G-1,aₜ)=0+α-(1+γ-0-0)。我们可以任意赋值给α和γ来计算结果。当α=0.5和γ=0.9时,将得到Q{new}(G-1,aₜ)=0+0.5-(1+0.9-0-0)=0.5。agent第二次处于这种状态时,会得到Q{new}(G-1,aₜ)=0.5+0.5-(1+0.9-0-0.5)=0.75,然后是0.875、0.9375、0.96875等等。 使用代码训练agent意味着: 如果当前状态所有动作的数值都为零,则随机选择一个动作(使用action_space.sample())。否则,使用np.argmax()函数选择当前状态中数值最高的动作。 使用step(action)函数执行所选择的动作,即朝着期望的方向移动。 使用新状态的信息和step(action)给出的奖励值,用所选择的动作更新原始状态的数值。 不断重复这三个步骤,直到agent坠入冰窟中或到达目标方块G。 当发生这种情况后,使用reset()函数重置游戏环境,并开始一个新的游戏回合,直到达到1,000个游戏回合。此外,我们可以绘制每次agent运行的游戏结果(如果未达到目标则为失败,否则为成功),以观察进展。 importmatplotlib.pyplotasplt plt.rcParams['figure.dpi'] = 300 plt.rcParams.update({'font.size': 17}) #We re-initialize the Q-table qtable=np.zeros((environment.observation_space.n,environment.action_space.n)) #Hyperparameters episodes= 1000 #Total number of episodes alpha= 0.5 #Learning rate gamma= 0.9 #Discount factor #List of outcomes to plot outcomes= [] print('Q-table before training:') print(qtable) #Training for_in range(episodes): state=environment.reset() done= False #By default,we consider our outcome to be a failure outcomes.append("Failure") #Until the agent gets stuck in a hole or reaches the goal,keep training it while notdone: #Choose the action with the highest value in the current state ifnp.max(qtable[state]) > 0: action=np.argmax(qtable[state]) #If there's no best action(only zeros),take a random one else: action=environment.action_space.sample() #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update Q(s,a) qtable[state,action] =qtable[state,action] +\ alpha* (reward+gamma*np.max(qtable[new_state]) -qtable[state,action]) #Update our current state state=new_state #If we have a reward,it means that our outcome is a success ifreward: outcomes[-1] = "Success" print() print('===========================================') print('Q-table after training:') print(qtable) #Plot outcomes plt.figure(figsize=(12, 5)) plt.xlabel("Run number") plt.ylabel("Outcome") ax=plt.gca() ax.set_facecolor('#efeeea') plt.bar(range(len(outcomes)),outcomes,color="#0A047A",width=1.0) plt.show() Q-table before training: [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]] =========================================== Q-table after training: [[0. 0. 0.59049 0. ] [0. 0. 0.6561 0. ] [0. 0.729 0. 0. ] [0. 0. 0. 0. ] [0. 0.02050313 0. 0. ] [0. 0. 0. 0. ] [0. 0.81 0. 0. ] [0. 0. 0. 0. ] [0. 0. 0.17085938 0. ] [0. 0. 0.49359375 0. ] [0. 0.9 0. 0. ] [0. 0. 0. 0. ] [0. 0. 0. 0. ] [0. 0. 0. 0. ] [0. 0. 1. 0. ] [0. 0. 0. 0. ]] 这个agent已经训练好了!图中的每条蓝色条纹都代表agent在一局游戏中获胜,因此我们可以看到在训练刚开始时,agent很难到达目标方块,但是当agent连续几次到达目标方块后,就能够稳定地取得游戏胜利了。🥳经过训练后的Q-table也非常有用:这些值表示agent学习到的到达目标方块的最佳动作序列。 现在,通过让agent完成100回合游戏来评估其表现。目前可以认为agent的训练已经结束,所以不需要再更新Q-table了。为了了解agent在❄️Frozen Lake游戏环境中的表现,可以计算它成功到达目标方块的次数百分比(成功率)。 episodes= 100 nb_success= 0 #Evaluation for_in range(100): state=environment.reset() done= False #Until the agent gets stuck or reaches the goal,keep training it while notdone: #Choose the action with the highest value in the current state ifnp.max(qtable[state]) > 0: action=np.argmax(qtable[state]) #If there's no best action(only zeros),take a random one else: action=environment.action_space.sample() #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update our current state state=new_state #When we get a reward,it means we solved the game nb_success+=reward #Let's check our success rate! print (f"Success rate={nb_success/episodes*100}%") Success rate=100.0% 这个agent不仅已经训练好了,而且它的成功率达到了100%。干得漂亮!不湿滑版本的Frozen Lake游戏已经被我们解决了! 通过执行下面的代码,我们可以看到agent在地图上移动,并打印它所采取的动作序列,以检查是否是最佳动作序列。 fromIPython.displayimportclear_output importtime state=environment.reset() done= False sequence= [] while notdone: #Choose the action with the highest value in the current state ifnp.max(qtable[state]) > 0: action=np.argmax(qtable[state]) #If there's no best action(only zeros),take a random one else: action=environment.action_space.sample() #Add the action to the sequence sequence.append(action) #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update our current state state=new_state #Update the render clear_output(wait=True) environment.render() time.sleep(1) print(f"Sequence={sequence}") (Right) SFFF FHFH FFFH HFF🟥 Sequence= [2, 2, 1, 1, 1, 2] agent可以学习到多种正确的动作序列,例如[2,2,1,1,1,2]、[1,1,2,2,1,2]等等。从上面的输出结果可以看出,agent学习到的动作序列中只有6个动作,就是我们在文章开头计算出的最短动作序列长度:这意味着agent学会了以最优的方式解决❄️Frozen Lake游戏。输出结果[2,2,1,1,1,2]对应的动作序列是RIGHT→RIGHT→DOWN→DOWN→DOWN→RIGHT,正是我们在文章一开始预测的序列。📣 04 📐 ε-贪心算法 在上文使用的方法中,agent总是选择具有最高数值的动作。因此,一旦state-action pair开始具有非零值,agent就会始终选择这个动作。其他动作将永远不会被选择,也不会更新这些动作的值...但是,如果其中一种动作比agent经常选择的动作更优呢?难道我们不应该鼓励agent不时尝试新的方法,看看是否能有所改进吗? 换句话说,我们希望允许agent进行以下这些操作之一: 选择具有最高值的动作(利用已经学到的知识来做出最优的决策); 选择一种随机动作,尝试找到更好的动作(探索是否有更优的动作)。 这两种操作之间的权衡非常重要:如果agent只专注于第一种操作,它就无法尝试新的解决方案,从而无法再学习。另一方面,如果agent只能够采取随机动作,那么它的训练就会毫无意义,因为没有使用Q-table。因此,我们希望随着时间的推移改变这个参数:在训练开始时,希望尽可能多地探索游戏环境。但是随着agent已经了解了每种可能的state-action pairs,探索就会变得越来越无趣。这个参数表示agent在动作选择时的随机程度。 这种技术通常被称为ε-贪心算法,其中ε是一种参数。这是一种简单但极其有效的方法,可以找到帮助我们设定一个折中的方案。每当agent需要选择一个动作来进入下一个方块时,它就有概率ε选择一种随机的动作,有1-ε的概率选择具有最高数值的动作。 我们可以通过固定的数值(即线性衰减)或基于当前ε的值(即指数衰减)来在每次游戏结束时减小ε的值。 先选择线性衰减的方案。在此之前,让我们来看看任意参数下的曲线是什么样子的。从ε=1开始,进入完全探索模式,并在每个游戏回合后将该值减少0.001。 现在我们对该方案有了一个较为清晰的理解,可以开始真正实施该方案,并观察它如何改变agent的行为。 qtable=np.zeros((environment.observation_space.n,environment.action_space.n)) #Hyperparameters episodes= 1000 #Total number of episodes alpha= 0.5 #Learning rate gamma= 0.9 #Discount factor epsilon= 1.0 #Amount of randomness in the action selection epsilon_decay= 0.001 #Fixed amount to decrease #List of outcomes to plot outcomes= [] print('Q-table before training:') print(qtable) #Training for_in range(episodes): state=environment.reset() done= False #By default,we consider our outcome to be a failure outcomes.append("Failure") #Until the agent gets stuck in a hole or reaches the goal,keep training it while notdone: #Generate a random number between 0 and 1 rnd=np.random.random() #If random number<epsilon,take a random action ifrnd<epsilon: action=environment.action_space.sample() #Else,take the action with the highest value in the current state else: action=np.argmax(qtable[state]) #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update Q(s,a) qtable[state,action] =qtable[state,action] +\ alpha* (reward+gamma*np.max(qtable[new_state]) -qtable[state,action]) #Update our current state state=new_state #If we have a reward,it means that our outcome is a success ifreward: outcomes[-1] = "Success" #Update epsilon epsilon= max(epsilon-epsilon_decay, 0) print() print('===========================================') print('Q-table after training:') print(qtable) #Plot outcomes plt.figure(figsize=(12, 5)) plt.xlabel("Run number") plt.ylabel("Outcome") ax=plt.gca() ax.set_facecolor('#efeeea') plt.bar(range(len(outcomes)),outcomes,color="#0A047A",width=1.0) plt.show() Q-table before training: [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]] =========================================== Q-table after training: [[0.531441 0.59049 0.59049 0.531441 ] [0.531441 0. 0.6561 0.56396466] [0.58333574 0.729 0.56935151 0.65055117] [0.65308668 0. 0.33420534 0.25491326] [0.59049 0.6561 0. 0.531441 ] [0. 0. 0. 0. ] [0. 0.81 0. 0.65519631] [0. 0. 0. 0. ] [0.6561 0. 0.729 0.59049 ] [0.6561 0.81 0.81 0. ] [0.72899868 0.9 0. 0.72711067] [0. 0. 0. 0. ] [0. 0. 0. 0. ] [0. 0.81 0.9 0.729 ] [0.81 0.9 1. 0.81 ] [0. 0. 0. 0. ]] 现在,agent需要花费更多时间训练才能持续赢得游戏!而且,Q-table中的非零值也比之前多得多,这说明agent已经学会了多种行动序列来到达目标方块。这也是可以理解的,因为这个新agent不得不探索新的state-action pairs,而非一味地使用非零值对应的行为。 让我们看看它是否能像前一个agent一样成功地到达目标方块。在评估模式下,我们不再需要agent进入探索模式,因为agent已经经过训练了。 episodes= 100 nb_success= 0 #Evaluation for_in range(100): state=environment.reset() done= False #Until the agent gets stuck or reaches the goal,keep training it while notdone: #Choose the action with the highest value in the current state action=np.argmax(qtable[state]) #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update our current state state=new_state #When we get a reward,it means we solved the game nb_success+=reward #Let's check our success rate! print (f"Success rate={nb_success/episodes*100}%") Success rate=100.0% 哇,又是100%的成功率!并没有降低模型的性能。😌这种方法在这个例子中的好处可能不太明显,但我们的模型不再是一成不变的,而是更加灵活。它学会了从S方块到G方块的不同路径(动作序列),而不是像之前的方法,只学习到一条路径。 agent进行更多的探索可能会降低性能,但对于训练能够适应新环境的agent来说这是必要的。 05 ❄️ 挑战:地面湿滑版本的Frozen Lake 我们并没有解决整个Frozen Lake游戏环境的问题:只是在地面非湿滑版本的游戏环境中训练了一个agent,在初始化时参数设置为is_slippery=False。在地面湿滑版本的游戏环境中,agent选择的动作只有33%的成功几率。如果失败,将随机选择其它三种动作中的一种。这个特性增加了训练的随机性,对于agent来说,学习解决这个问题更加困难。看看之前的代码在这个新的游戏环境中的表现如何... environment=gym.make("FrozenLake-v1",is_slippery=True) environment.reset() #We re-initialize the Q-table qtable=np.zeros((environment.observation_space.n,environment.action_space.n)) #Hyperparameters episodes= 1000 #Total number of episodes alpha= 0.5 #Learning rate gamma= 0.9 #Discount factor epsilon= 1.0 #Amount of randomness in the action selection epsilon_decay= 0.001 #Fixed amount to decrease #List of outcomes to plot outcomes= [] print('Q-table before training:') print(qtable) #Training for_in range(episodes): state=environment.reset() done= False #By default,we consider our outcome to be a failure outcomes.append("Failure") #Until the agent gets stuck in a hole or reaches the goal,keep training it while notdone: #Generate a random number between 0 and 1 rnd=np.random.random() #If random number<epsilon,take a random action ifrnd<epsilon: action=environment.action_space.sample() #Else,take the action with the highest value in the current state else: action=np.argmax(qtable[state]) #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update Q(s,a) qtable[state,action] =qtable[state,action] +\ alpha* (reward+gamma*np.max(qtable[new_state]) -qtable[state,action]) #Update our current state state=new_state #If we have a reward,it means that our outcome is a success ifreward: outcomes[-1] = "Success" #Update epsilon epsilon= max(epsilon-epsilon_decay, 0) print() print('===========================================') print('Q-table after training:') print(qtable) #Plot outcomes plt.figure(figsize=(12, 5)) plt.xlabel("Run number") plt.ylabel("Outcome") ax=plt.gca() ax.set_facecolor('#efeeea') plt.bar(range(len(outcomes)),outcomes,color="#0A047A",width=1.0) plt.show() episodes= 100 nb_success= 0 #Evaluation for_in range(100): state=environment.reset() done= False #Until the agent gets stuck or reaches the goal,keep training it while notdone: #Choose the action with the highest value in the current state action=np.argmax(qtable[state]) #Implement this action and move the agent in the desired direction new_state,reward,done,info=environment.step(action) #Update our current state state=new_state #When we get a reward,it means we solved the game nb_success+=reward #Let's check our success rate! print (f"Success rate={nb_success/episodes*100}%") Q-table before training: [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]] =========================================== Q-table after training: [[0.06208723 0.02559574 0.02022059 0.01985828] [0.01397208 0.01425862 0.01305446 0.03333396] [0.01318348 0.01294602 0.01356014 0.01461235] [0.01117016 0.00752795 0.00870601 0.01278227] [0.08696239 0.01894036 0.01542694 0.02307306] [0. 0. 0. 0. ] [0.09027682 0.00490451 0.00793372 0.00448314] [0. 0. 0. 0. ] [0.03488138 0.03987256 0.05172554 0.10780482] [0.12444437 0.12321815 0.06462294 0.07084008] [0.13216145 0.09460133 0.09949734 0.08022573] [0. 0. 0. 0. ] [0. 0. 0. 0. ] [0.1606242 0.18174032 0.16636549 0.11444442] [0.4216631 0.42345944 0.40825367 0.74082329] [0. 0. 0. 0. ]] Success rate=17.0% 呀!这种方法不太好。但是,你能通过调整前文讨论过的不同参数来改善模型性能吗?我鼓励你接受这个小挑战,自己尝试亲自实践强化学习,验证一下自己对本文的理解。 为什么不对ε-贪婪算法也进行指数衰减呢?在本文介绍这个小案例的过程中,你可能会意识到稍微修改超参数就会完全改变运行结果。 这是强化学习的另一个“怪处”:它的超参数非常敏感,如果你想调整它们,理解它们的含义是很重要的。尝试、测试新的技术组合总是有好处的,可以帮助我们建立算法直觉,提高效率。祝你好运,玩得愉快! 06 Conclusion Q-learning是一种简单而强大的算法,是强化学习的核心。在本文中: 我们可以学习如何与gym环境进行交互、选择动作和移动agent; 本文介绍了Q-table的概念,其中行代表状态,列代表动作,每个单元格代表给定状态下动作的数值; 为了解决稀疏奖励问题,本文通过实验重新定义了Q-learning算法的更新公式; 本文实现了完整的模型训练和评估过程,并以100%的成功率解决了Frozen Lake游戏环境; 实现了著名的ε-贪婪算法,以在(1)探索未知的state-action pairs和(2)利用最成功的state-action pairs两种方案之间进行权衡。 Frozen Lake是一个非常简单的游戏环境,但其他环境的状态和行为可能非常多,以至于无法在内存中存下Q-table。尤其是在事件不是离散而是连续的环境中(如《超级马里奥兄弟》或Minecraft) ,情况更加复杂。当面临这些挑战时,常用的解决方案是训练深度神经网络来模拟Q-table。 这种方法会增加一些复杂度,因为神经网络不是太稳定。 🚢🚢🚢欢迎小伙伴们加入AI技术软件及技术交流群,追踪前沿热点,共探技术难题~ END 参考资料 [1]https://gym.openai.com/docs/ [2]https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py#L10 原文链接: https://towardsdatascience.com/q-learning-for-beginners-2837b777741

优秀的个人博客,低调大师

深入跨域 - 从初识到入门 | 京东物流技术团队

前言 跨域这两个字就像一块狗皮膏药一样黏在每一个前端开发者身上,无论你在工作上或者面试中无可避免会遇到这个问题。如果在网上搜索跨域问题,会出现许许多多方案,这些方案有好有坏,但是对于阐述跨域的原理和在什么情况下需要用什么方案,缺少系统性的说明。大家在工作中可能因为大佬们已经配置好了,不会产生跨域,但是作为一个前端的开发人员,面对跨域的问题,还是需要从原理上去理解跨域的原因,在不同的情况中,我们该如何去处理。 1 业务场景 1.1 介绍 WMS6.0是一款专门为仓储业务打造的合作开发平台,前台BP可以独立开发或者定制现有的流程,接入到WMS6.0中,实现自定义业务,使前台BP只需要关注自己的业务,不用专注其他功能,提升前台BP的开发效率。。 作为一个合作平台,WMS6.0 PC端支持独立页面扩展和页面内部功能扩展,支持前台BP可以进行独立部署,实现最大程度的解耦。接入方案如下: 独立页面扩展,以完全独立业务模块的方式接入。针对部分合作方需要自己完全独立开发页面的情况,WMS6.0提供了微前端的框架进行接入。 页面内部功能扩展,以预留插槽的方式接入。如图1中标注部分所示,整体页面被划分为多个区域,其中包含了通用的数据模块 + bp接入模块。当合作方有个性化的数据统计需求时,可以进行独立开发,然后接入现有公用页面中。 在bp接入平台的过程中,我们遇到了各种各样的问题,如前后端如何联调、如何在不冲突的情况下自定义全局属性、如何部署上线等等,下面我们主要就前后端联调中遇到的跨域问题进行讨论。 在使用上述预留插槽的接入方式时,为了通用模块与接入模块之间的数据同步等方便进行,WMS6.0中并没有使用老式的iframe,而是采用了vue注册的方式,实现在同一个页面中加载。因此合作方在独立模块中发起的服务端请求,其来源其实仍是当前通用页面。 而WMS6.0并不能确保所有的合作方服务端均在同一个域名下,由此也就产生了各种交互问题。 1.2 wms6.0请求链路 我们先来看一下WMS6.0现有的通用网络请求整体链路。 当用户触发了网络请求,会通过基站或者仓库的路由发出,然后通过网络到达物流网关,物流网关把请求转发到Nginx,Nginx会把请求分发到具体的服务器上进行数据处理。 下面我们就抽取一个WMS6.0通过物流网关访问的请求,作为实例来看一下。 通过response Headers(相应头)我们可以看到,公司现有的物流网关会对指定域名的页面进行CORS跨域处理。通过Access-Control-Allow-Origin: http://a..com,我们可以知道物流网关可以接受来自指定域 http://a..com 的跨域资源请求,不会产生跨域报错。 但是咱们部分bp合作方的接口并不是通过物流网关的,这就需要我们自己对此类接口进行跨域处理了。假如没有进行跨域处理,那么就会报下面的错了。 1.3 跨域的产生 Access to XMLHttpRequest at '' from origin '' has been blocked by CORS policy Response to preflight request doesn't pass access control check No 'Access-Control-Allow-Origin' header is present on the requested resource. 报错解析: 从源“本地路径”访问 “目标路径(请求链接)”的文本传输请求已被CORS策略阻塞:对预检请求的响应未通过访问控制检查。请求的资源上不存在'Access- control - allow - origin '报头。 错误原因: 本地路径和目标路径不在同一个域名下引起的跨域问题。 同时需要注意的是,就算两个域名是同一个二级域名、不同三级域名的时候,例如 a.baidu.com 和 b.baidu.com ,也是属于不同域的,仍会出现这个问题。 那么到底什么是跨域,跨域既然影响了我们的开发工作,那又为什么要有对跨域的限制呢?下面让我们来了解一下跨域的历史产生原因和作用吧。 2 跨域 2.1 演变史 以下内容为个人猜测,仅供参考,勿喷 🤞 第一阶段 互联网始于1969年的美国。在互联网的最早期,美军在ARPA(阿帕网,美国guofang部研究计划署)制定的协定下,首先用于军事连接。 随后主要都是美国高校连入的网络,如美国西南部的加利福尼亚大学洛杉矶分校、斯坦福大学研究学院、UCSB(加利福尼亚大学)和犹他州大学的四台主要的计算机。服务器上存放的都是公开资料。 这个时候网站更像是一个公共图书馆,账户密码都没有,更没存放着什么机密资料 第二阶段 后来,有人觉得可以在上面放一些私人资料,私人信息。于是为了安全,便有了账户和密码。可是如果每次访问都需要输入账户和密码,是一件很烦的事情。 所以浏览器实现了cookie,用来存储用户登陆的账户和密码。当用户访问了曾经已经登陆过的网站,浏览器将会自动在请求中加入账户和密码,而账户和密码通常是通过 Request Header(请求头) 中的cookie或指定的头信息进行通信的。 而直接存储账户和密码太过于危险,如果被攻破,损失相当大。所以浏览器都不直接存储账户和密码,而是存储登陆令牌。 第三阶段 - 现代浏览器同源策略 但是存储登陆令牌也有一个问题,如果你登陆了某个流氓网站,同时这个流氓网站在它的JS里访问了你已经登录的其他网站,那么就能够拿到你已经登录的其他网站里面的一些重要数据。 所以浏览器为了安全是不能够让这个流氓网站访问你已经登录的其他网站的。由此产生了浏览器的同源策略:哪里来的,就只能访问哪里的数据。 综上,我们就可以基本了解对跨域的定义了,如下: 2.2 定义 跨域是指向一个与当前页面所在域不同的目标地址发送请求的过程,这样之所以会产生跨域报错是因为浏览器的同源策略限制。看起来同源策略影响了我们开发的顺畅性。实则不然,同源策略存在的必要性之一是为了隔离攻击。 MDN上对同源策略的解释为: 同源策略限制了从同一个源加载的文档或脚本如何与来自另一个源的资源进行交互。这是一个用于隔离潜在恶意文件的重要安全机制。 下面就拿同源策略隔离的主要攻击之一CSRF为例讲述下同源策略存在的必要性: 2.3 举例说明重要性-跨站请求伪造(CSRF) CSRF,cross-site request forgery,又称跨站请求伪造,指非法网站挟持用户cookie在已登陆网站上实施非法操作的攻击,这是基于部分页面使用cookie在网站免登和用户信息留存。 正常网站免登的请求流程如下: 我们进入一个网站,发送登陆请求给后端 后端接受登陆请求,判断登陆信息是否准确 判断信息准确后,后端会发送response给浏览器 浏览器接受response返给用户,并将response header中的set-cookie进行保存。或者cookie通过报文返回,进而使用脚本进行缓存 用户关闭当前网站窗口后再次打开时,浏览器会自动将cookie加入request header实现免登 受攻击场景: bank.com网站是一家银行,在用户登录以后,bank.com网站在用户的当前终端上设置了一个Cookie,这其中包含了一些隐私信息(比如存款金额)。 如果这个时候,七大姑在社交app上给你发了一篇养生文章链接,其实这个网页是个diaoyu网站evil.com,访问链接后就把你重定向到一个嵌入了 iframe 的攻击网站。 而这个时候如果没有跨域限制,这个iframe会自动加载银行网站的留存信息,读取到bank.com网站的Cookie,那么用户的信息就会泄露,更可怕的是,Cookie往往是用来保存用户的登录状态,如果用户没有退出登录,其他的网站就可以冒充用户,为所欲为,控制 iframe 的 DOM,通过一系列骚操作把你卡里的钱转走。 没有同源策略: 有同源策略: 而同源策略,也就是跨域限制的出现,限制了cookie的命名区域,使攻击者无法直接获取cookie的内容本身。 下面就让我们一起来了解一下什么是同源策略。 3 url的组成 在了解同源策略之前,我们需要先对一个url的各个组成部分进行初步了解: 协议部分:该URL的协议部分为“http:”,这代表网页使用的是什么通信协议。在Internet中可以使用多种协议,如HTTP、HTTPS、FTP等等。本例中使用的是HTTP协议。在"HTTP"后面的“//”为分隔符。 域名部分:该URL的域名部分为“www.a.com”。一个URL中,也可以使用IP地址作为域名使用。 端口部分:跟在域名后面的是端口,域名和端口之间使用“:”作为分隔符。端口不是一个URL必须的部分,如果省略端口部分,将采用默认端口,http为80,https为443,FTP为21。 虚拟目录部分:从域名后的第一个“/”开始到最后一个“/”为止,是虚拟目录部分。虚拟目录也不是一个URL必须的部分。本例中的虚拟目录是“/news/”。 文件名部分:从域名后的最后一个“/”开始到“?”为止,是文件名部分。如果没有“?”,则是从域名后的最后一个“/”开始到“#”为止,是文件部分。如果没有“?”和“#”,那么从域名后的最后一个“/”开始到结束,都是文件名部分。本例中的文件名是“index.html”。文件名部分也不是一个URL必须的部分。 参数部分:从“?”开始到“#”为止之间的部分为参数部分,又称搜索部分、查询字符串。本例中的参数部分为“boardID=5&ID=24618&page=1”。查询字符串中允许有多个参数,参数与参数之间用“&”作为分隔符。 锚部分:从“#”开始到最后,都是锚部分。本例中的锚部分是“name”。锚部分也不是一个URL必须的部分。 以上,我们已经大致了解了一个url的基本组成。 4 同源策略(SOP - same origin policy) 它是由 Netscape(美国网景公司) 提出的一个重要的安全策略,现在所有支持 JavaScript 的浏览器都会使用这个策略。 4.1 作用 同源策略作为浏览器安全的基石,它是浏览器最核心也最基本的安全功能,如果缺少了同源策略,则浏览器的正常功能可能都会受到影响,如个人信息将不再具有安全性。可以说 Web 是构建在同源策略基础之上的,浏览器只是针对同源策略的一种实现。 它的核心就在于它认为自任何站点加载的内容都是不安全的。当被浏览器半信半疑的脚本运行在沙箱时,它们应该只被允许访问来自同一站点的资源,而不是那些来自其它站点可能怀有恶意的资源。 因此,出于安全原因,对于跨源HTTP请求,浏览器禁止发起请求,或者允许发起请求,服务端也能收到请求并正常返回结果,但是浏览器会对返回结果进行拦截。 例如,XMLHttpRequest 和Fetch API 遵循同源策略,这意味着使用这些API的Web应用程序只能从加载应用程序的同一个域请求HTTP资源,除非服务器同意访问。譬如服务器对预检请求的响应 Header 中有 Access-Control-Allow-Origin: *,那么跨域请求即可正确访问。 简单来说,同源策略就是浏览器的一个安全限制,它阻止了不同【域】之间进行的数据交互。 那么是如何定义一个请求是否满足同源要求的呢? 4.2 同源的判断标准 协议相同 域名相同 端口相同 4.3 跨域示例 URL 说明 是否允许通信 http://www.a.com/a.jshttp://www.a.com/b.js 同一域名,不同路径 允许 http://www.a.com:8080/a.jshttp://www.a.com/a.js 同一域名,不同端口 不允许 http://www.a.com/a.jshttps://www.a.com/a.js 同一域名,不同协议 不允许 http://www.a.comhttp://www.b.com 域名不同 不允许 http://www.a.com/a.jshttp://script.a.com/a.js 主域相同,子域不同 不允许 4.4 同源策略的限制内容 禁止跨域操作DOM,也就是无法接触非同源网页的 DOM。 禁止跨域资源请求,也就是无法向非同源地址发送 AJAX 请求(可以发送,但浏览器会拒绝接受响应)。 禁止跨域读取 Cookie、LocalStorage,也就是无法读取非同源网页的 Cookie、LocalStorage。 4.5 允许跨域的情况 另外,我们知道通过 JavaScript 脚本可以拿到其他窗口的window对象。如果是非同源的网页,目前允许一个窗口可以接触其他网页的window对象的九个属性和四个方法。 window.closed - 只读,判断当前窗口是否关闭 window.frames - 只读,获取窗口中所有命名的框架 window.length - 只读,获取当前窗口中frames的数量(包括iframes) window.location - 可读写。非同源的情况下,只允许调用location.replace()方法和写入location.href属性 window.opener - 只读,获取对创建该窗口的window对象的引用 window.parent - 只读,父窗口 window.self - 只读,对自己的引用,window.window == window.self window.top - 只读,获取最顶层窗口对象的引用 window.window - 只读,对自己的引用,window.window == window window.blur() - 失焦 window.close() - 关闭当前窗口 window.focus() - 聚焦当前窗口 window.postMessage() - 跨域通信API 4.6 允许跨域加载资源的标签 <img src=XXX> <link href=XXX> <script src=XXX> <iframe src=XXX> ps: 超链接<a>标签、<form>标签中的action行为也可以进行跨域 5 跨域解决方案一览图 6 后续 综上,我们完成了对跨域的初识,后面我们将对跨域的解决方案进行探讨,从上述的九种跨域解决方案进行一一描述,敬请期待。 作者:京东物流 李菲菲 来源:京东云开发者社区 自猿其说Tech 转载请注明来源

优秀的个人博客,低调大师

【建议收藏】超详细的Canal入门,看这篇就够了!!!

概述 canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。 背景 早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 工作原理 Mysql的BinLog 它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。 binlog 有三种模式:STATEMENT、ROW、MIXED STATEMENT 记录的是执行的sql语句 ROW 记录的是真实的行数据记录 MIXED 记录的是1+2,优先按照1的模式记录 举例说明 举例来说,下面的sql COPYupdate user set age=20 对应STATEMENT模式只有一条记录,对应ROW模式则有可能有成千上万条记录(取决数据库中的记录数)。 MySQL主备复制原理 Slave 上面的IO线程连接上 Master,并请求从指定日志文件的指定位置(或者从最开始的日志)之后的日志内容; Master 接收到来自 Slave 的 IO 线程的请求后,通过负责复制的 IO 线程根据请求信息读取指定日志指定位置之后的日志信息,返回给 Slave 端的 IO 线程。返回信息中除了日志所包含的信息之外,还包括本次返回的信息在 Master 端的 Binary Log 文件的名称以及在 Binary Log 中的位置; Slave 的 IO 线程接收到信息后,将接收到的日志内容依次写入到 Slave 端的Relay Log文件(mysql-relay-bin.xxxxxx)的最末端,并将读取到的Master端的bin-log的文件名和位置记录到master- info文件中,以便在下一次读取的时候能够清楚的高速Master“我需要从某个bin-log的哪个位置开始往后的日志内容,请发给我” Slave 的 SQL 线程检测到 Relay Log 中新增加了内容后,会马上解析该 Log 文件中的内容成为在 Master 端真实执行时候的那些可执行的 Query 语句,并在自身执行这些 Query。这样,实际上就是在 Master 端和 Slave 端执行了同样的 Query,所以两端的数据是完全一样的。 当然这个过程本质上还是存在一定的延迟的。 mysql的binlog文件长这个样子。 COPYmysql-bin.003831 mysql-bin.003840 mysql-bin.003849 mysql-bin.003858 启用Binlog注意以下几点: Master主库一般会有多台Slave订阅,且Master主库要支持业务系统实时变更操作,服务器资源会有瓶颈; 需要同步的数据表一定要有主键; canal能够同步数据的原理 理解了mysql的主从同步的机制再来看canal就比较清晰了,canal主要是听过伪装成mysql从server来向主server拉取数据。 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议 mysql master收到dump请求,开始推送binary log给slave(也就是canal) canal解析binary log对象(原始为byte流) Canal架构 canal的设计理念 canal的组件化设计非常好,有点类似于tomcat的设计。使用组合设计,依赖倒置,面向接口的设计。 canal的组件 canal server 这个代表了我们部署的一个canal 应用 canal instance 这个代表了一个canal server中的多个 mysql instance ,从这一点说明一个canal server可以搜集多个库的数据,在canal中叫 destionation。 每个canal instance 有多个组件构成。在conf/spring/default-instance.xml中配置了这些组件。他其实是使用了spring的容器来进行这些组件管理的。 instance 包含的组件 这里是一个cannalInstance工作所包含的大组件。截取自 conf/spring/default-instance.xml COPY<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"> <property name="destination" value="${canal.instance.destination}" /> <property name="eventParser"> <ref local="eventParser" /> </property> <property name="eventSink"> <ref local="eventSink" /> </property> <property name="eventStore"> <ref local="eventStore" /> </property> <property name="metaManager"> <ref local="metaManager" /> </property> <property name="alarmHandler"> <ref local="alarmHandler" /> </property> </bean> EventParser设计 eventParser 最基本的组件,类似于mysql从库的dump线程,负责从master中获取bin_log 整个parser过程大致可分为几步: Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点) Connection建立链接,发送BINLOG_DUMP指令 // 0. write command number // 1. write 4 bytes bin-log position to start at // 2. write 2 bytes bin-log flags // 3. write 4 bytes server id of the slave // 4. write bin-log file name Mysql开始推送Binaly Log 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息 // 补充字段名字,字段类型,主键信息,unsigned类型处理 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 存储成功后,定时记录Binaly Log位置 EventSink设计 eventSink 数据的归集,使用设置的filter对bin log进行过滤,工作的过程如下。 说明: 数据过滤:支持通配符的过滤模式,表名,字段内容等 数据路由/分发:解决1:n (1个parser对应多个store的模式) 数据归并:解决n:1 (多个parser对应1个store) 数据加工:在进入store之前进行额外的处理,比如join 数据1:n业务 为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注 数据n:1业务 同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并. EventStore设计 eventStore 用来存储filter过滤后的数据,canal目前的数据只在这里存储,工作流程如下 目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式 借鉴了Disruptor的RingBuffer的实现思路 定义了3个cursor Put : Sink模块进行数据存储的最后一次写入位置 Get : 数据订阅获取的最后一次提取位置 Ack : 数据消费成功的最后一次消费位置 借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看: 实现说明: Put/Get/Ack cursor用于递增,采用long型存储 buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高) metaManager metaManager 用来存储一些原数据,比如消费到的游标,当前活动的server等信息 alarmHandler alarmHandler 报警,这个一般情况下就是错误日志,理论上应该是可以定制成邮件等形式,但是目前不支持 各个组件目前支持的类型 canal采用了spring bean container的方式来组装一个canal instance ,目的是为了能够更加灵活。 canal通过这些组件的选取可以达到不同使用场景的效果,比如单机的话,一般使用file来存储metadata就行了,HA的话一般使用zookeeper来存储metadata。 eventParser eventParser 目前只有三种 MysqlEventParser 用于解析mysql的日志 GroupEventParser 多个eventParser的集合,理论上是对应了分表的情况,可以通过这个合并到一起 RdsLocalBinlogEventParser 基于rds的binlog 的复制 eventSink eventSink 目前只有EntryEventSink 就是基于mysql的binlog数据对象的处理操作 eventStore eventStore 目前只有一种 MemoryEventStoreWithBuffer,内部使用了一个ringbuffer 也就是说canal解析的数据都是存在内存中的,并没有到zookeeper当中。 metaManager metaManager 这个比较多,其实根据元数据存放的位置可以分为三大类,memory,file,zookeeper Canal-HA机制 canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。 canal的ha分为两部分,canal server和canal client分别有对应的ha实现 canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。 canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。 server ha的架构图如下 大致步骤: canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动) 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。 canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。 Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制. canal的工作过程 dump日志 启动时去MySQL 进行dump操作的binlog 位置确定 工作的过程。在启动一个canal instance 的时候,首先启动一个eventParser 线程来进行数据的dump 当他去master拉取binlog的时候需要binlog的位置,这个位置的确定是按照如下的顺序来确定的(这个地方讲述的是HA模式哈)。 在启动的时候判断是否使用zookeeper,如果是zookeeper,看能否拿到 cursor (也就是binlog的信息),如果能够拿到,把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog 通过1拿不到的话(一般是zookeeper当中每一,比如第一次搭建的时候,或者因为某些原因zk中的数据被删除了),就去配置文件配置当中的去拿,把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog 通过2依然没有拿到的话,就去mysql 中执行一个sql show master status 这个语句会显示当前mysql binlog最后位置的信息,也就是刚写入的binlog所在的位置信息。把这个信息存到内存中(MemoryLogPositionManager),然后拿这个信息去mysql中dump binlog。 后面的eventParser的操作就会以内存中(MemoryLogPositionManager)存储的binlog位置去master进行dump操作了。 mysql的show master status 操作 COPYmysql> show master status\G *************************** 1. row *************************** File: mysql-bin.000028 Position: 635762367 Binlog_Do_DB: Binlog_Ignore_DB: Executed_Gtid_Set: 18db0532-6a08-11e8-a13e-52540042a113:1-2784514, 318556ef-4e47-11e6-81b6-52540097a9a8:1-30002, ac5a3780-63ad-11e8-a9ac-52540042a113:1-5, be44d87c-4f25-11e6-a0a8-525400de9ffd:1-156349782 1 row in set (0.00 sec 归集(sink)和存储(store) 数据在dump回来之后进行的归集(sink)和存储(store) sink操作是可以支撑将多个eventParser的数据进行过滤filter filter使用的是instance.properties中配置的filter,当然这个filter也可以由canal的client端在进行subscribe的时候进行设置。如果在client端进行了设置,那么服务端配置文件instance.properties的配置都会失效 sink 之后将过滤后的数据存储到eventStore当中去。 目前eventStore的实现只有一个MemoryEventStoreWithBuffer,也就是基于内存的ringbuffer,使用这个store有一个特点,这个ringbuffer是基于内存的,大小是有限制的(bufferSize = 16 * 1024 也就是16M),所以,当canal的客户端消费比较慢的时候,ringbuffer中存满了就会阻塞sink操作,那么正读取mysql binlog的eventParser线程也会受阻。 这种设计其实也是有道理的。 因为canal的操作是pull 模型,不是producer push的模型,所以他没必要存储太多数据,这样就可以避免了数据存储和持久化管理的一些问题。使数据管理的复杂度大大降低。 上面这些整个是canal的parser 线程的工作流程,主要对应的就是将数据从mysql搞下来,做一些基本的归集和过滤,然后存储到内存中。 binlog的消费者 canal从mysql订阅了binlog以后主要还是想要给消费者使用。那么binlog是在什么时候被消费呢。这就是另一条主线了。就像咱们做一个toC的系统,管理系统是必须的,用户使用的app或者web又是一套,eventParser 线程就像是管理系统,往里面录入基础数据。canal的client就像是app端一样,是这些数据的消费方。 binlog的主要消费者就是canal的client端。使用的协议是基于tcp的google.protobuf,当然tcp的模式是io多路复用,也就是nio。当我们的client发起请求之后,canal的server端就会从eventStore中将数据传输给客户端。根据客户端的ack机制,将binlog的元数据信息定期同步到zookeeper当中。 canal的目录结构 配置父目录: 在下面可以看到 COPYcanal ├── bin │ ├── canal.pid │ ├── startup.bat │ ├── startup.sh │ └── stop.sh └── conf ├── canal.properties ├── gamer ---目录 ├── ww_social ---目录 ├── wother ---目录 ├── nihao ---目录 ├── liveim ---目录 ├── logback.xml ├── spring ---目录 ├── ym ---目录 └── xrm_ppp ---目录 这里是全部展开的目录 COPYcanal ├── bin │ ├── canal.pid │ ├── startup.bat │ ├── startup.sh │ └── stop.sh └── conf ├── canal.properties ├── game_center │ └── instance.properties ├── ww_social │ ├── h2.mv.db │ ├── h2.trace.db │ └── instance.properties ├── wwother │ ├── h2.mv.db │ └── instance.properties ├── nihao │ ├── h2.mv.db │ ├── h2.trace.db │ └── instance.properties ├── movie │ ├── h2.mv.db │ └── instance.properties ├── logback.xml ├── spring │ ├── default-instance.xml │ ├── file-instance.xml │ ├── group-instance.xml │ ├── local-instance.xml │ ├── memory-instance.xml │ └── tsdb │ ├── h2-tsdb.xml │ ├── mysql-tsdb.xml │ ├── sql │ └── sql-map └── ym └── instance.properties Canal应用场景 同步缓存redis/全文搜索ES canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示。 下发任务 另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等先关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。 数据异构 在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。 所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。 本文由传智教育博学谷狂野架构师教研团队发布。 如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力。 转载请注明出处!

优秀的个人博客,低调大师

每日一博 | 一文带你入门图机器学习

本文主要涉及图机器学习的基础知识。 我们首先学习什么是图,为什么使用图,以及如何最佳地表示图。然后,我们简要介绍大家如何在图数据上学习,从神经网络以前的方法 (同时我们会探索图特征) 到现在广为人知的图神经网络 (Graph Neural Network,GNN)。最后,我们将一窥图数据上的 Transformers 世界。 什么是图? 本质上来讲,图描述了由关系互相链接起来的实体。 现实中有很多图的例子,包括社交网络 (如推特,长毛象,以及任何链接论文和作者的引用网络) 、分子、知识图谱 (如 UML 图,百科全书,以及那些页面之间有超链接的网站) 、被表示成句法树的句子、3D 网格等等。因此,可以毫不夸张地讲,图无处不在。 图 (或网络) 中的实体称为 节点 (或顶点) ,它们之间的连接称为 边 (或链接) 。举个例子,在社交网络中,节点是用户,而边是他 (她) 们之间的连接关系;在分子中,节点是原子,而边是它们之间的分子键。 可以存在不止一种类型的节点或边的图称为 异构图 (heterogeneous graph) (例子:引用网络的节点有论文和作者两种类型,含有多种关系类型的 XML 图的边是多类型的) 。异构图不能仅由其拓扑结构来表征,它需要额外的信息。本文主要讨论同构图 (homogeneous graph) 。 图还可以是 有向 (directed) 的 (如一个关注网络中,A 关注了 B,但 B 可以不关注 A) 或者是 无向 (undirected) 的 (如一个分子中,原子间的关系是双向的) 。边可以连接不同的节点,也可以自己连接自己 (自连边,self-edges) ,但不是所有的节点都必须有连接。 如果你想使用自己的数据,首先你必须考虑如何最佳地刻画它 (同构 / 异构,有向 / 无向等) 。 图有什么用途? 我们一起看看在图上我们可以做哪些任务吧。 在 图层面,主要的任务有: 图生成,可在药物发现任务中用于生成新的可能的药物分子, 图演化 (给定一个图,预测它会如何随时间演化) ,可在物理学中用于预测系统的演化, 图层面预测 (基于图的分类或回归任务) ,如预测分子毒性。 在 节点层面,通常用于预测节点属性。举个例子,Alphafold 使用节点属性预测方法,在给定分子总体图的条件下预测原子的 3D 坐标,并由此预测分子在 3D 空间中如何折叠,这是个比较难的生物化学问题。 在 边层面,我们可以做边属性预测或缺失边预测。边属性预测可用于在给定药物对 (pair) 的条件下预测药物的不良副作用。缺失边预测被用于在推荐系统中预测图中的两个节点是否相关。 另一种可能的工作是在 子图层面 的,可用于社区检测或子图属性预测。社交网络用社区检测确定人们之间如何连接。我们可以在行程系统 (如 Google Maps) 中发现子图属性预测的身影,它被用于预测到达时间。 完成这些任务有两种方式。 当你想要预测特定图的演化时,你工作在 直推 (transductive) 模式,直推模式中所有的训练、验证和推理都是基于同一张图。如果这是你的设置,要多加小心!在同一张图上创建训练 / 评估 / 测试集可不容易。 然而,很多任务其实是工作在不同的图上的 (不同的训练 / 评估 / 测试集划分) ,我们称之为 归纳 (inductive) 模式。 如何表示图? 常用的表示图以用于后续处理和操作的方法有 2 种: 表示成所有边的集合 (很有可能也会加上所有节点的集合用以补充) 。 或表示成所有节点间的邻接矩阵。邻接矩阵是一个 $node_size \times node_size$ 大小的方阵,它指明图上哪些节点间是直接相连的 (若 $n_i$ 和 $n_j$ 相连则 $A_{ij} = 1$,否则为 0) 。 注意:多数图的边连接并不稠密,因此它们的邻接矩阵是稀疏的,这个会让计算变得困难。 虽然这些表示看上去很熟悉,但可别被骗了! 图与机器学习中使用的典型对象大不相同,因为它们的拓扑结构比序列 (如文本或音频) 或有序网格 (如图像和视频) 复杂得多:即使它们可以被表示成链表或者矩阵,但它们并不能被当作有序对象来处理。 这究竟意味着什么呢?如果你有一个句子,你交换了这个句子的词序,你就创造了一个新句子。如果你有一张图像,然后你重排了这个图像的列,你就创造了一张新图像。 但图并不会如此。如果你重排了图的边列表或者邻接矩阵的列,图还是同一个图 (一个更正式的叫法是置换不变性 (permutation invariance) ) 。 基于机器学习的图表示 使用机器学习处理图的一般流程是:首先为你感兴趣的对象 (根据你的任务,可以是节点、边或是全图) 生成一个有意义的表示,然后使用它们训练一个目标任务的预测器。与其他模态数据一样,我们想要对这些对象的数学表示施加一些约束,使得相似的对象在数学上是相近的。然而,这种相似性在图机器学习上很难严格定义,举个例子,具有相同标签的两个节点和具有相同邻居的两个节点哪两个更相似? 注意:在随后的部分,我们将聚焦于如何生成节点的表示。一旦你有了节点层面的表示,就有可能获得边或图层面的信息。你可以通过把边所连接的两个节点的表示串联起来或者做一个点积来得到边层面的信息。至于图层面的信息,可以通过对图上所有节点的表示串联起来的张量做一个全局池化 (平均,求和等) 来获得。当然,这么做会平滑掉或丢失掉整图上的一些信息,使用迭代的分层池化可能更合理,或者增加一个连接到图上所有其他节点的虚拟节点,然后使用它的表示作为整图的表示。 神经网络以前的方法 只使用手工设计特征 在神经网络出现之前,图以及图中的感兴趣项可以被表示成特征的组合,这些特征组合是针对特定任务的。尽管现在存在 更复杂的特征生成方法,这些特征仍然被用于数据增强和 半监督学习。这时,你主要的工作是根据目标任务,找到最佳的用于后续网络训练的特征。 节点层面特征 可以提供关于其重要性 (该节点对于图有多重要?) 以及 / 或结构性 (节点周围的图的形状如何?) 信息,两者可以结合。 节点 中心性 (centrality) 度量图中节点的重要性。它可以递归计算,即不断对每个节点的邻节点的中心性求和直到收敛,也可以通过计算节点间的最短距离来获得,等等。节点的 度 (degree) 度量节点的直接邻居的数量。聚类系数 (clustering coefficient) 度量一个节点的邻节点之间相互连接的程度。图元度向量 (Graphlets degree vectors,GDV) 计算给定根节点的不同图元的数目,这里图元是指给定数目的连通节点可创建的所有迷你图 (如:3 个连通节点可以生成一个有两条边的线,或者一个 3 条边的三角形) 。 边层面特征带来了关于节点间连通性的更多细节信息,有效地补充了图的表示,有:两节点间的 最短距离 (shortest distance),它们的公共邻居 (common neighbours),以及它们的 卡兹指数 (Katz index) (表示两节点间从所有长度小于某个值的路径的数目,它可以由邻接矩阵直接算得) 。 图层面特征 包含了关于图相似性和规格的高层信息。总 图元数 尽管计算上很昂贵,但提供了关于子图形状的信息。核方法 通过不同的 “节点袋 (bag of nodes) ” (类似于词袋 (bag of words) ) 方法度量图之间的相似性。 基于游走的方法 基于游走的方法 使用在随机游走时从节点j访问节点i的可能性来定义相似矩阵;这些方法结合了局部和全局的信息。举个例子,Node2Vec模拟图中节点间的随机游走,把这些游走路径建模成跳字 (skip-gram) ,这与我们处理句子中的词很相似,然后计算嵌入。基于随机游走的方法也可被用于加速 Page Rank方法,帮助计算每个节点的重要性得分 (举个例子:如果重要性得分是基于每个节点与其他节点的连通度的话,我们可以用随机游走访问到每个节点的频率来模拟这个连通度) 。 然而,这些方法也有限制:它们不能得到新的节点的嵌入向量,不能很好地捕获节点间的结构相似性,也使用不了新加入的特征。 图神经网络 神经网络可泛化至未见数据。我们在上文已经提到了一些图表示的约束,那么一个好的神经网络应该有哪些特性呢? 它应该: 满足置换不变性: 等式:,这里 f 是神经网络,P 是置换函数,G 是图。 解释:置换后的图和原图经过同样的神经网络后,其表示应该是相同的。 满足置换等价性 公式:,同样 f 是神经网络,P 是置换函数,G 是图。 解释:先置换图再传给神经网络和对神经网络的输出图表示进行置换是等价的。 典型的神经网络,如循环神经网络 (RNN) 或卷积神经网络 (CNN) 并不是置换不变的。因此,图神经网络 (Graph Neural Network, GNN) 作为新的架构被引入来解决这一问题 (最初是作为状态机使用) 。 一个 GNN 由连续的层组成。一个 GNN 层通过 消息传递 (message passing) 过程把一个节点表示成其邻节点及其自身表示的组合 (聚合 (aggregation)) ,然后通常我们还会使用一个激活函数去增加一些非线性。 与其他模型相比:CNN 可以看作一个邻域 (即滑动窗口) 大小和顺序固定的 GNN,也就是说 CNN 不是置换等价的。一个没有位置嵌入 (positional embedding) 的 Transformer 模型可以被看作一个工作在全连接的输入图上的 GNN。 聚合与消息传递 多种方式可用于聚合邻节点的消息,举例来讲,有求和,取平均等。一些值得关注的工作有: 图卷积网络 对目标节点的所有邻节点的归一化表示取平均来做聚合 (大多数 GNN 其实是 GCN) ; 图注意力网络 会学习如何根据邻节点的重要性不同来加权聚合邻节点 (与 transformer 模型想法相似) ; GraphSAGE 先在不同的跳数上进行邻节点采样,然后基于采样的子图分多步用最大池化 (max pooling) 方法聚合信息; 图同构网络 先计算对邻节点的表示求和,然后再送入一个 MLP 来计算最终的聚合信息。 选择聚合方法:一些聚合技术 (尤其是均值池化和最大池化) 在遇到在邻节点上仅有些微差别的相似节点的情况下可能会失败 (举个例子:采用均值池化,一个节点有 4 个邻节点,分别表示为 1,1,-1,-1,取均值后变成 0;而另一个节点有 3 个邻节点,分别表示为 - 1,0,1,取均值后也是 0。两者就无法区分了。) 。 GNN 的形状和过平滑问题 每加一个新层,节点表示中就会包含越来越多的节点信息。 一个节点,在第一层,只会聚合它的直接邻节点的信息。到第二层,它们仍然只聚合直接邻节点信息,但这次,他们的直接邻节点的表示已经包含了它们各自的邻节点信息 (从第一层获得) 。经过 n 层后,所有节点的表示变成了它们距离为 n 的所有邻节点的聚合。如果全图的直径小于 n 的话,就是聚合了全图的信息! 如果你的网络层数过多,就有每个节点都聚合了全图所有节点信息的风险 (并且所有节点的表示都收敛至相同的值) ,这被称为过平滑问题 (the oversmoothing problem)。 这可以通过如下方式来解决: 在设计 GNN 的层数时,要首先分析图的直径和形状,层数不能过大,以确保每个节点不聚合全图的信息 增加层的复杂性 增加非消息传递层来处理消息 (如简单的 MLP 层) 增加跳跃连接 (skip-connections) 过平滑问题是图机器学习的重要研究领域,因为它阻止了 GNN 的变大,而在其他模态数据上 Transformers 之类的模型已经证明了把模型变大是有很好的效果的。 图 Transformers 没有位置嵌入 (positional encoding) 层的 Transformer 模型是置换不变的,再加上 Transformer 模型已被证明扩展性很好,因此最近大家开始看如何改造 Transformer 使之适应图数据 (综述) 。多数方法聚焦于如何最佳表示图,如找到最好的特征、最好的表示位置信息的方法以及如何改变注意力以适应这一新的数据。 这里我们收集了一些有意思的工作,截至本文写作时为止,这些工作在现有的最难的测试基准之一 斯坦福开放图测试基准 (Open Graph Benchmark, OGB) 上取得了最高水平或接近最高水平的结果: Graph Transformer for Graph-to-Sequence Learning (Cai and Lam, 2020) 介绍了一个图编码器,它把节点表示为它本身的嵌入和位置嵌入的级联,节点间关系表示为它们间的最短路径,然后用一个关系增强的自注意力机制把两者结合起来。 Rethinking Graph Transformers with Spectral Attention (Kreuzer et al, 2021) 介绍了谱注意力网络 (Spectral Attention Networks, SANs) 。它把节点特征和学习到的位置编码 (从拉普拉斯特征值和特征向量中计算得到) 结合起来,把这些作为注意力的键 (keys) 和查询 (queries) ,然后把边特征作为注意力的值 (values) 。 GRPE: Relative Positional Encoding for Graph Transformer (Park et al, 2021) 介绍了图相对位置编码 Transformer。它先在图层面的位置编码中结合节点信息,在边层面的位置编码中也结合节点信息,然后在注意力机制中进一步把两者结合起来。 Global Self-Attention as a Replacement for Graph Convolution (Hussain et al, 2021) 介绍了边增强 Transformer。该架构分别对节点和边进行嵌入,并通过一个修改过的注意力机制聚合它们。 Do Transformers Really Perform Badly for Graph Representation (Ying et al, 2021) 介绍了微软的 Graphormer, 该模型在面世时赢得了 OGB 第一名。这个架构使用节点特征作为注意力的查询 / 键 / 值 (Q/K/V) ,然后在注意力机制中把这些表示与中心性,空间和边编码信息通过求和的方式结合起来。 最新的工作是 Pure Transformers are Powerful Graph Learners (Kim et al, 2022),它引入了 TokenGT。这一方法把输入图表示为一个节点和边嵌入的序列 (并用正交节点标识 (orthonormal node identifiers) 和可训练的类型标识 (type identifiers) 增强它) ,而不使用位置嵌入,最后把这个序列输入给 Tranformer 模型。超级简单,但很聪明! 稍有不同的是,Recipe for a General, Powerful, Scalable Graph Transformer (Rampášek et al, 2022) 引入的不是某个模型,而是一个框架,称为 GraphGPS。它允许把消息传递网络和线性 (长程的) transformer 模型结合起来轻松地创建一个混合网络。这个框架还包含了不少工具,用于计算位置编码和结构编码 (节点、图、边层面的) 、特征增强、随机游走等等。 在图数据上使用 transformer 模型还是一个非常初生的领域,但是它看上去很有前途,因为它可以减轻 GNN 的一些限制,如扩展到更大 / 更稠密的图,抑或是增加模型尺寸而不必担心过平滑问题。 更进阶的资源 如果你想钻研得更深入,可以看看这些课程: 学院课程形式 斯坦福大学图机器学习 麦吉尔大学图表示学习 视频形式 几何深度学习课程 相关书籍 图表示学习*,汉密尔顿著 不错的处理图数据的库有 PyGeometric (用于图机器学习) 以及 NetworkX (用于更通用的图操作)。 如果你需要质量好的测试基准,你可以试试看: OGB, 开放图测试基准 (the Open Graph Benchmark) :一个可用于不同的任务和数据规模的参考图测试基准数据集。 Benchmarking GNNs: 用于测试图机器学习网络和他们的表现力的库以及数据集。相关论文特地从统计角度研究了哪些数据集是相关的,它们可被用于评估图的哪些特性,以及哪些图不应该再被用作测试基准。 长程图测试基准 (Long Range Graph Benchmark): 最新的 (2022 年 10 月份) 测试基准,主要关注长程的图信息。 Taxonomy of Benchmarks in Graph Representation Learning: 发表于 2022 年 Learning on Graphs 会议,分析并对现有的测试基准数据集进行了排序。 如果想要更多的数据集,可以看看: Paper with code 图任务排行榜: 公开数据集和测试基准的排行榜,请注意,不是所有本排行榜上的测试基准都仍然适宜。 TU 数据集: 公开可用的数据集的合辑,现在以类别和特征排序。大多数数据集可以用 PyG 加载,而且其中一些已经被集成进 PyG 的 Datsets。 SNAP 数据集 (Stanford Large Network Dataset Collection): MoleculeNet 数据集 关系数据集仓库 外部图像来源 缩略图中的 Emoji 表情来自于 Openmoji (CC-BY-SA 4.0),图元的图片来自于 Biological network comparison using graphlet degree distribution (Pržulj, 2007)。 英文原文: https://huggingface.co/blog/intro-graphml 译者: Matrix Yao (姚伟峰)

优秀的个人博客,低调大师

解读重要功能特性:新手入门 Apache SeaTunnel CDC

引言 点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/incubator-seatunnel 为什么说 CDC 是SeaTunnel平台中的一个重要功能特性?今天这篇文章跟大家分享一下 CDC 是什么?目前市面上的 CDC 工具现有的痛点有哪些?SeaTunnel面对这些痛点设计的架构目标是什么?另外包括社区的展望和目前在做的一些事情。 总体来说,市面上已经有这么多 CDC 工具了,我们为什么还要重复去造一个轮子? 带着这个疑问,我先给大家简要介绍下 CDC 是什么! CDC 的全称是 Change Data Capture,它就是一个数据变更捕获。变更数据捕获 (CDC) 使用 Server 代理来记录应用于表的插入、更新和删除活动。 这样,就可以按易于使用的关系格式提供这些更改的详细信息。 将为修改的行捕获列信息以及将更改应用于目标环境所需的元数据,并将其存储在镜像所跟踪源表的列结构的更改表中。 CDC的使用场景 异构数据库之间的数据同步或备份 / 建立数据分析计算平台 在 MySQL,PostgreSQL,MongoDB 等等数据库之间互相同步数据,或者把这些数据库的数据同步到 Elasticsearch 里以供全文搜索,当然也可以基于 CDC 对数据库进行备份。而数据分析系统可以通过订阅感兴趣的数据表的变更,来获取所需要的分析数据进行处理,不需要把分析流程嵌入到已有系统中,以实现解耦。 微服务之间共享数据状态 在微服务大行其道的今日,微服务之间信息共享一直比较复杂,CDC 也是一种可能的解决方案,微服务可以通过 CDC 来获取其他微服务数据库的变更,从而获取数据的状态更新,执行自己相应的逻辑。 更新缓存 / CQRS 的 Query 视图更新 通常缓存更新都比较难搞,可以通过 CDC 来获取数据库的数据更新事件,从而控制对缓存的刷新或失效。 而 CQRS 是什么又是一个很大的话题,简单来讲,你可以把 CQRS 理解为一种高配版的读写分离的设计模式。举个例子,我们前面讲了可以利用 CDC 将 MySQL 的数据同步到 Elasticsearch 中以供搜索,在这样的架构里,所有的查询都用 ES 来查,但在想修改数据时,并不直接修改 ES 里的数据,而是修改上游的 MySQL 数据,使之产生数据更新事件,事件被消费者消费来更新 ES 中的数据,这就基本上是一种 CQRS 模式。而在其他 CQRS 的系统中,也可以利用类似的方式来更新查询视图。 现有CDC组件 开源组件 Canal Debezium Flink CDC 支持数据库 仅支持MySQL 支持MySQL、Postgre SQL、Oracle 等 支持MySQL、Postgre SQL、Oracle 等 同步历史数据 不支持 单并行锁表 多并行无锁 输出端 Kafka、RocketMQ Kafka Flink Connector Canal 数据库它仅支持MySQL,不支持同步历史数据,只能同步增量数据,输出端除了支持 canal client/adapter(适配工作量很大),还支持了的Kafka 和 RocketMQ。 Debezium 支持的数据库比较多,不仅支持MySQL,PG,Oracle,还支持其它 Mongo DB 等数据库,同时支持同步历史数据,不过历史数据读取方式是:一个快照读整个表,如果你表很大,就会像sqoop一样读特别久。如果中途失败了,需要从头开始读,这样会出现一些问题。而且输出端上支持的就更加少,仅仅支持通过 Kafka 输出。 Flink CDC Flink CDC 和前两个定位上就不一样。它实际就是 Flink 生态的 connector,就是连接器组。目前也支持比较多的数据库,像 MySQL PG,Oracle, Mongo 这些数据库都是支持的。 相对于前面的开源组件,它持一个多边形无锁的算法。当然它也是参考到 Netflix DBLog 的无锁算法。因为它是基于 Flink 生态的,所以它输出端就比较多。只要是 Flink 生态有的connector,支持Upsert的Connector都是可以使用的。当然它也会存在很多问题,这个问题就是后面我会提到的。 现有组件存在的痛点 单表配置 如果用过Flink CDC 的朋友就会发现,我们需要对每一个表进行配置。比如我们想同步 10 张表,就要写 10 个 source 的SQL, 10 个 sink 的 SQL,如果你要进行 transform,就还要写 transform 的 SQL 。 这个情况下,小数量的表手写还可以应付,如果数量大可能就出现类型映射错误的问题,或者参数配置错误的问题,就会产生很高的运维成本(配置麻烦)。而 Apache SeaTunnel 定位就是一个简单易用的数据集成平台,我们期望解决这个问题。 不支持 Schema Evolution 支不支持 schema 的变更。实际上像Flink CDC 和 Debezium,两者支持 DDL 事件发送,但是不支持发送到Sink,让 Sink 做同步变更。或者 Fink CDC能拿到事件,但是无法发送到引擎中,因为引擎不能基于 DDL 事件去变更 transform 的 Type information ,Sink 没办法跟着 DDL 事件进行变更。 持有链接过多 如果有 100 张表,因为 Flink CDC 只支持一个 source 去同步一张表,每一张表都会使用一个链接,当表多的时候,使用的链接就特别多,就会对源头的 JDBC 数据库造成了很大的连接压力,并且会持有特别多的Binlog,也会像 worker 这种,也还会造成重复的日志解析。 SeaTunnel CDC架构目标 SeaTunnel CDC是基于市面上现有的 CDC 组件的优缺点,以及相关痛点问题做的架构设计。 支持基础的CDC 支持无锁并行快照历史数据 支持日志心跳检测和动态加表 支持分库分表和多结构表读取 支持Schema evolution 支持增量日志的读取,还至少要能够支持无锁并行快照历史数据的能力。 我们期望能够减少用户的运维成本,能够动态的加表,比如有时候想同步整个库,后面新增了一张表,你不需要手动去维护,可以不用再去改Job配置,也不用停止Job再重启一遍,这样就会减少很多麻烦。 支持分库分表和多结构表的读取,其实这也是我们最开始提到的每个表单独配置的问题。并且还支持 Schema evolution, DDL 的传输,还有在引擎中能支持 schema evolution 的变更,能够变更到 Transform 和 Sink 上面去。 CDC 基本流程 CDC基础流程包含: 快照阶段:用于读取表的历史数据 最小Split粒度:表的主键范围数据 增量阶段:用于读取表的增量日志更改数据 最小Split粒度:以表为单位 快照阶段 枚举器生成一个表的多个 SnapshotSplit,并将它们分配给 reader。 // pseudo-code. public class SnapshotSplit implements SourceSplit { private final String splitId; private final TableId tableId; private final SeaTunnelRowType splitKeyType; private final Object splitStart; private final Object splitEnd; } 当 SnapshotSplit 读取完成时,读取器将拆分的高水位线报告给枚举器。当所有 SnapshotSplit 都报告高水位线时,枚举器开始增量阶段。 // pseudo-code. public class CompletedSnapshotSplitReportEvent implements SourceEvent { private final String splitId; private final Offset highWatermark; } 快照阶段 - SnapshotSplit 读取流程 有4个步骤: 日志低水位线:读取快照数据前获取当前日志偏移量。 读取 SnapshotSplit 数据:读取属于split 的数据范围,这里分为两种情况 案例1:步骤1&2不能原子化(MySQL) 因为我们不能加表锁,也不能加基于低水位线的区间锁,所以第 1 步和第 2 步不是孤立的。 exactly-once:使用内存表保存历史数据 & 过滤日志数据从低水位线到高水位线 At-least-once:直接输出数据并使用低水位线而不是高水位线 案例 2:步骤 1 和 2 可以原子化(Oracle) 可以使用 for scn 来保证两步的原子化 Exactly-Once:直接输出数据并使用低水位线而不用去获取高水位线 加载高水位线数据: 步骤 2 中案例 1 & Exactly-Once:读取快照数据后获取当前日志偏移量。 其他:使用低水位线代替高水位线 如果高水位线>低水位线,读取范围日志数据 快照阶段—MySQL Snapshot Read & Exactly-once 因为我们无法确定查询语句在高低水位之间执行的位置,为了保证数据的 exactly-once,我们需要使用内存表来临时保存数据。 日志低水位线:读取快照数据前获取当前日志偏移量。 读取 SnapshotSplit 数据:读取属于 split 的范围数据,写入内存表。 日志高水位线:读取快照数据后获取当前日志偏移量。 读取范围日志数据:读取日志数据并写入内存表 输出内存表的数据,释放内存使用量。 增量阶段 当所有快照拆分报告水位时,开始增量阶段。 结合所有快照拆分和水位信息,获得 LogSplits。 我们希望最小化日志连接的数量: 增量阶段默认只有一个 reader 工作,用户也可以根据需求去配置选项指定数量(不能超过 reader 数量) 一个 reader 最多获得一个连接 // pseudo-code. public class LogSplit implements SourceSplit { private final String splitId; /** * All the tables that this log split needs to capture. */ private final List<TableId> tableIds; /** * Minimum watermark for SnapshotSplits for all tables in this LogSplit */ private final Offset startingOffset; /** * Obtained by configuration, may not end */ private final Offset endingOffset; /** * SnapshotSplit information for all tables in this LogSplit. * </br> Used to support Exactly-Once. */ private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos; /** * Maximum watermark in SnapshotSplits per table. * </br> Used to delete information in completedSnapshotSplitInfos, reducing state size. * </br> Used to support Exactly-Once. */ private final Map<TableId, Offset> tableWatermarks; } // pseudo-code. public class CompletedSnapshotSplitInfo implements Serializable { private final String splitId; private final TableId tableId; private final SeaTunnelRowType splitKeyType; private final Object splitStart; private final Object splitEnd; private final Offset watermark; } Exactly-Once: 阶段 1:在水印数据之前使用 completedSnapshotSplitInfos 过滤器。 阶段2:表不再需要过滤,在 completedSnapshotSplitInfos 中删除属于该表的数据,因为后面的数据需要处理。 At-Least-Once:无需过滤数据,且 completedSnapshotSplitInfos 不需要任何数据 动态发现新表 场景 1:发现新表时,枚举器处于快照阶段,直接分配新的 split。 场景 2:发现新表时,枚举器处于增量阶段。 在增量阶段动态发现新表。 暂停 LogSplit reader。 Reader 暂停运行。 Reader 报告当前日志偏移量。 将 SnapshotSplit 分配给阅读器。 Reader 执行快照阶段读取。 Reader 报告所有 SnapshotSplit 水位。 为 Reader 分配一个新的 LogSplit。 Reader 再次开始增量读取并向枚举器确认。 多结构表同步 多结构表是为了解决连接器实例过多,配置过于复杂的问题。比如你只需要去配表的一个正则,或者配多个表名,不需要对每一个表去做配置。 优点:占用数据库连接少,减少数据库压力 缺点:在 SeaTunnel Engine 中,多个表会在一个管道中,容错的粒度会变大。 这个特性允许Source支持读取多个结构表,再使用侧流输出与单表流保持一致。Sink 如果也去支持多表,可能涉及改动比较多。所以第一阶段的目标只是让 Source 去支持多结构表,这里配置的逻辑可能会和原来的不一样,会通过 catalog 去读每一个 config 里面到底配了哪些表,再把表塞到 Source Connector 中,这里多表结构的 API 已经在 SeaTunnel 的 API 之中,但是还没有做相关的适配。 SeaTunnel CDC现状 目前开发完成的是 CDC 的基础能力,能够支持增量阶段和快照阶段, MySQL 也已经支持了,支持实时和离线。 MySQL 实时已经测试完成了,离线的测试还没有完成。 Schema 因为要涉及到Transfrom 和Sink 的变更,目前还没有支持的。动态发现新表还没有支持,多结构表目前已经预留了一些接口出来,但是适配的工作量比较大,可能等到 2023 年 Q1 季度可能会做这个事情。 Apache SeaTunnel 展望 作为一个Apache 孵化项目,Apache SeaTunnel 社区迅速发展,在接下来的社区规划中,主要有四个方向: 扩大与完善 Connector & Catalog 生态 支持更多 Connector & Catalog,如TiDB、Doris、Stripe等,并完善现有的连接器,提高其可用性与性能等; 支持CDC连接器,用于支持实时增量同步场景; 对连接器感兴趣的同学可以关注该Umbrella:https://github.com/apache/incubator-seatunnel/issues/1946 支持引擎的更多版本 如Spark 3.x, Flink 1.14.x等 对支持Spark 3.3 感兴趣的同学可以关注该PR:https://github.com/apache/incubator-seatunnel/pull/2574 支持更多数据集成场景 (SeaTunnel Engine) 用于解决整库同步、表结构变更同步、任务失败影响粒度大等现有引擎不能解决的痛点; 对engine感兴趣的同学可以关注该Umbrella:https://github.com/apache/incubator-seatunnel/issues/2272 更简单易用(SeaTunnel Web) 提供Web界面以DAG/SQL等方式使操作更简单,更加直观的展示Catalog、Connector、Job等; 接入调度平台,使任务管理更简单; 对Web 感兴趣的同学可以关注我们的Web子项目:https://github.com/apache/incubator-seatunnel-web Apache SeaTunnel Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线 & 实时)同步和转化的数据集成平台 仓库地址: https://github.com/apache/incubator-seatunnel 网址:https://seatunnel.apache.org/ Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro Apache SeaTunnel (Incubating) 下载地址:https://seatunnel.apache.org/download 衷心欢迎更多人加入! 我们相信,在 「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「** 多样性与共识决策」** 等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步! 我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源! 提交问题和建议:https://github.com/apache/incubator-seatunnel/issues 贡献代码:https://github.com/apache/incubator-seatunnel/pulls 订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org ** 开发邮件列表:**dev@seatunnel.apache.org 加入 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ 关注 Twitter: https://twitter.com/ASFSeaTunnel

优秀的个人博客,低调大师

每日一博 | 极简 Java 工作流概念入门

关于 Flowable 松哥已经更新了好几篇文章了,不过考虑到有的小伙伴可能还从来没接触过流程引擎,因此有一些基础的内容我再来和小伙伴们梳理一下。 1. 为什么需要工作流 松哥将之前的文章转发到朋友圈后,有小伙伴评论说一直不理解为什么需要工作流,今天我们就先来说说这个话题。 假设我有一个请假需求,流程如下: 请假可以提交给我的上司,上司可以选择批准或者拒绝,无论批准还是拒绝,都会给我一个通知。 这个流程比较简单,我们很容易想到解决方案,不用工作流也能解决,有一个专门的请假表,当 A 要请假的时候,就往请假表中添加一条记录,这条记录的内容包含了请假的天数、原因、请假的审批人 B 以及一个名为 status 的字段,这个 status 字段表示这个请假申请目前的状态(待审批、已批准还是已拒绝),然后 B 登录系统之后,在请假表中查询到了 A 的请假信息,然后选择批准,此时将 status 字段的值改一下就行了。 这个流程很简单,相信小伙伴们都能想到。 然而,这是一个非常简单的流程,对于这样的流程,一般来说也确实没有必要使用工作流,但是现实中,我们涉及到的工作流往往都是非常复杂的,我举个例子,就说报销审批吧,这个可能很多小伙伴都经历过。 小伙伴们看到,这个流程相对来说还是比较复杂的,此时你再用一个 status 字段去描述,就很难说的请到底是怎么回事了。每一步审批,都有可能批准也有可能拒绝,拒绝并不意味着流程结束,员工修改报销资料之后,还可以继续提交。此时如果还用 status 去描述,那么 status 将有 N 多个值去表示不同的情况,这个维护起来非常不便。 这就复杂了吗?非也非也,我们再来看一个生产笔记本电脑的例子,假设公司研发了一款新型笔记本电脑,整个研发到生产的流程可能是这样: 相比上面两个,这个就更复杂一些了,不仅有串行任务还有并行任务,如何去设计这样一个系统?单纯的通过状态字段去描述显然已经不够用了,此时我们就得考虑一种通用的、更易维护的方案来实现这样的系统了,这种通用的、易维护的方案,也就是工作流。 2. 三大工作流 一个比较早的工作流是 jBPM,这是一个由 Java 实现的企业级流程引擎,是 JBoss 公司开发的产品之一。 jBPM 的创建者是 Tom Baeyens,这个大佬后来离开了 JBoss,并加入到 Alfresco,并推出了基于 jBPM4 的开源工作流系统 Activiti,而 jBPM 则在后续的代码中完全放弃了 jBPM4 的代码。从这个过程中也能看出来,jBPM 在发展过程中,由于意见相左,后来变成了两个 jBPM 和 Activiti。 然而戏剧的是,Activiti5 没搞多久,从 Activiti 中又分出来一个 Camunda,Activiti 继续发展,又从中分出来一个 Flowable。。。 由于开发 jBPM、Activiti、Camunda 以及 Flowable 的人多多少少有一些关联性,让人不得不猜测意见相左拉一票人出来单干是他们的企业文化。 所以现在市面上主流的流程引擎就一共有三个: Activiti Flowable Camunda 这三个各有特点: Activiti 目前是侧重云,他目前的设计会向 Spring Cloud、Docker 这些去靠拢。 Flowable 核心思想还是在做一个功能丰富的流程引擎工具,除了最最基础的工作流,他还提供了很多其他的扩展点,我们可以基于 Flowable 实现出许多我们想要的功能(当然这也是小伙伴们觉得 Flowable 使用复杂的原因之一)。 Camunda 相对于前两个而言比较轻量级,Camunda 有一个比较有特色的功能就是他提供了一个小巧的编辑器,基于 bpmn.io 来实现的(松哥之前已经发文讲过了)。如果你的项目需求是做一个轻巧的、灵活的、定制性强的编辑器,工作流是嵌入式的,那么可以选择 Camunda。 如果仔细比较起这三个的差异,能列一个长长的表格,这个网上也有不少人都总结过了,松哥这里也就不啰嗦了。 3. 流程图 既然有三个不同的工作流,那么三个不同的工作流画出来的流程图是否都各不相同呢? 不是的。 工作流程图这块其实有一个统一的标准,那就是 BPMN。BPMN 全称是 Business Process Model and Notation,中文译作业务流程模型和标记法,这个中文太绕口了,还是简称 BPMN 吧。 这是一套图形化表示法,用图形来表示业务流程模型。BPMN 最初由业务流程管理倡议组织(BPMI, Business Process Management Initiative)开发,BPMI 于 2005 年与对象管理组织(OMG, Object Management Group)合并,并于 2011 年 1 月 OMG 发布 2.0 版本,同时改为现在的名称。 一句话,就是流程图这块有一个特别古老的规范,那就是 BPMN,而我们前面所说的无论是 Activiti、Flowable 还是 Camunda,都是支持这个规范的,所以呢,无论你使用哪一个流程引擎,都可以使用同一套流程图。 那么这个规范究竟都说了些什么事情呢? 我们以上面生产笔记本的流程图为例,来和小伙伴们做一个简单介绍: 从上图中可以看到,一个流程图中主要包含四方面的内容: 事件 连线 任务 网关 我们一个一个来说。 事件 首先在一个流程图中应该有开始事件和结束事件,也就是上图大家看到的两个圆圈。另外还有一些中间事件、边界事件等。举个中间定时事件的例子,比如用户下单之后,可以有一个中间定时事件,延迟 5 分钟发货。 连线 连线就是将事件、任务、网关等连在一起的线条,一般情况下就是普通连线,有的时候连线会有一些条件,例如松哥之前文章和大家分享的请假,如果经理同意请假申请,就走哪一个线条,如果经理不同意请假申请,就走哪一个线条。对应上图的笔记本生产,如果经理审批通过,就载入图纸准备生产,如果经理审批不通过,就重新设计。 任务 任务这块其实有很多分类。 如果细分大致上可以分为如下几种: 接收任务 在上面的流程图中,等待准备工作完成这一项就是一个接收任务。这个任务里并不需要额外做什么事情,流程到这一步就自动停下来了,需要人工去点一下,推动流程继续向下执行。 发送任务 这个一般用来把消息发送给外部参与者。 服务任务 这个一般由系统自动完成,其实说白了就是我们的一个自定义类,可以在一个自定义类里边完成想要做的事情。 脚本任务 一个自动化活动。当流程执行到脚本任务时,自动执行相应的脚本。 业务规则任务 BPMN2.0 新引入用来对接业务规则引擎,业务规则任务用于同步执行一个或多个规则。 用户任务 用于为那些需要由人工参与者完成的工作建模。 虽然细分类别很多,但是仔细看,其实这几种又可以归为两大类: 用户任务:表示人工要介入做的事情。比如同意与否,或者输入一些参数,要让人工完成任务,就需要一个表单系统,让人工输入数据,或者显示数据给人看,这也是为什么用户任务和表单系统结合在一起的原因,用户任务需要用户向引擎提交一个完成任务的动作,否则流程会暂停在这里等待。 服务任务:表示机器自动做的事情。调用服务的任务,这个服务可以是一个 Spring JavaBean,也可以是一个远程 REST 服务,流程会自动执行服务任务。 活动 活动可以算是一种特殊的任务。活动可以调用另外一个流程使之作为当前流程的子流程去运行。活动也可以分为用户活动、脚本活动等等。从显示上来说,活动比任务边框深一些。仅此而已。 网关 网关要是细分起来,也有很多不同类型的网关。 互斥网关 这种网关也叫排他性网关,我们之前请假流程中的那个网关,就是互斥网关。这种网关有且仅有一个有效出口。 相容网关 这种网关会有多个出口,只要条件满足,都会执行。 事件网关 事件网关是通过中间事件驱动,它在等待的事件发生后才会触发决策。基于事件的网关允许基于事件作出决策。 并行网关 并行网关一般是成对出现的,上面生产笔记本的那个流程中,生产屏幕、键盘等并行操作,就是通过并行网关来实现的。 好啦,这就是关于流程引擎的一些基本概念,捋顺了这些基本概念,在回过头看我们前面几篇关于流程引擎的文章,应该会有一些不一样的理解: Spring Boot 整合流程引擎 Flowable,so easy! SpringBoot+Vue+Flowable,模拟一个请假审批流程! 49张图带领小伙伴们体验一把 Flowable-UI Spring Security + Vue + Flowable 怎么玩?

优秀的个人博客,低调大师

【高手问答 281 期汇总】 —— SaaS攻略:从入门到进阶

SaaS(software as a service),直译为软件即服务。从本质上而言,是一种软件交付和销售方式,即订阅许可。该商业模式决定了SaaS在销售达成时,并不产生所有的收益,而是通过后期不断实现收入。即SaaS将原先的一次性买卖变成了软件的分期租赁,或者说叫订阅。 我们以「开在火车站的餐馆」和「开在写字楼下餐馆」为例。若传统的B2B软件是开在火车站的餐馆,那SaaS则更像开在写字楼下餐馆。 OSCHINA本期高手问答( 4月20日- 4月26日) 我们请来了@胡文语老师和大家一起探讨关于SaaS相关的问题。 可讨论的问题包括但不限于: 1.SaaS的产品设计 2.需求评估 3.客户教育 或者其它相关问题,也欢迎大家积极提问! 嘉宾简介 胡文语,《SaaS攻略》作者,浙江快服集团云客服事业部产品设计总监,专注ToB SaaS产品设计。 问答汇总 问:你好,很开心能看到saas相关的书籍,做saas好多年了,大B小B,B2B, B2C, 跨越了多个行业。 SAAS项目由于普遍业务复杂度比较高,非常容易出现客户反馈的各种问题,很多并不是问题而是系统设计如此,但是因为复杂度太高,没有人能一眼看出是哪的问题还是设计如此,需要消耗大量的人力去调查,然后为客户解释。 这应该就是您说的客户教育的一部分? 如何避免或减少这个现象的发生呢? 还是由于业务复杂度的问题,产品,开发,测试可能都换了一茬又一茬,这时候新的需求很尴尬,要么经过研发调查因为各种原因产品的需求达不到,只能修改路线,要么需要很大的人力去实现,这个过程产研都需要大量投入,最后这样的结果大家都很沮丧。如何避免这种情况,让需求落地? 答:很高兴收到你的提问。 SaaS复杂性的问题,并不是绝对的。有简单的SaaS业务,例如一款压缩软件;也有相对复杂的SaaS业务,例如常见的CRM(客户管理)。 当然人为的因素,确实会造成系统的复杂,可能源于: ①历史遗留问题,不断累加,不做修建,留下大量技术债务 ②团队能力,对业务的抽象和模块化不够 ③对客户的选择和需求判断,短期的利益大于了长久利益。 问题1,需要重新梳理,一座老寨 搬迁修复,先拆再修后组 问题2,这个需要更优秀、有魄力的人 问题3,看业务团队的掌舵人 问: 老师您好目前正就职于saas服务公司,观察公司一直在为抢占市场,开发新的需求和功能,但是基本上没有环节如果留着现有客户,有些功能用起来客户意见比较大,但是也只是敷衍式的应对,公司的战略有问题吗?哈哈不知道问的对不对 答:这个问题不好给你肯定的答案,不同阶段不同取舍。但从你为询问中,我想你已经有了自己的判断。 SaaS是订阅制,客户留存和获客一样重要,只获客不考虑留存,看上去热火朝天,实际是个大窟窿。 若大量客户在还没有收回成本的情况下就流失了,是不是每多获得一个客户就等于多干了一件亏本买卖。此外,产品的教育成本高、问题多,也会导致大量的人员维护,必然导致利润率低。 问:您好,请教下如何看待 saas 模式下客户的定制化需求,如何解决。 答:SaaS 从长期来看,健康的发展过程可以是一个从定制化到标准化再定制化再到标准化的螺旋式上升过程。所以,将定制通用化是其关键的解法。 此外,从客户的目的出发,而非表面的需求,我们更能找到那把将定制化转向通用化的钥匙。 问:通过演示解决用户需求后,如何教育客户非你不可? 答:客户有自己的打算,非你不可大多是理想化。 能做的是增加客户选择我们的可能性。 有什么角度呢? ①懂客户:是谁,有哪些角色,关心的优先级是什么,什么在阻碍他们达成目标 ②展示实力:产品有哪些特性和好处,提供的解决方案是什么,能带来什么影响 ③区别:相对竞争对手具有的独特优势 希望能给你提供一些视角 问:saas平台部署起来比较方便,但是如果用户的数据量特别大,规模十分庞大,特别占用平台的资源怎么办? 目前saas平台遇到某个企业的大照片,大视频流量冲击,不好运维? 答:那要重新考虑定价模式的了。SaaS的定价主要有3个维度:用户(坐席)、功能/服务,使用量。因此你可以从使用量进行套餐的划分,像云盘、短信服务都是这种操作方式。 问:sass平台,需要面对很多客户,对于客户提出的各种 用户体验不好的提的需求,应该怎么解决?例如,需要查询已删除的商品列表和已删除的记录 ​​​​​​​答:这个问题很常见,一方面在于对产品边界的把控,一方面是对需求的真伪过滤和抽象 京东购买链接:https://item.jd.com/10042953285214.html 高手问答栏目查看:主题 高手问答 - OSCHINA - 中文开源技术交流社区

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册