【干货】Apache DolphinScheduler2.0升级3.0版本方案
升级背景
因项目需要使用数据质量模块功能,可以为数仓提供良好的数据质量监控功能。故要对已有2.0版本升级到3.0版本以上,此次选择测试了3.0.1 和 3.1.1 两个版本,对进行同数据等任务调度暂停等操作测试,最后选择3.0.1 版本
原因:
1. 3.1.1 在测试sql任务时 ,同时启动上百sql 任务时,会出现sql 任务报错,导致大量任务无法正常运行,询问社区大佬,这是DS本身bug导致,虽然此现象在3.0.1也有出现,不过出现几率较小。
2. DS3.0.1以上版本zookeeper的依赖版本进行了更新,查看驱动版本是3.8版本。我们生产不打算升级zk,故选择使用3.0.1版本。
此版本测试还是比较稳定的,功能比较完善,满足我们使用需求。
此次升级已经验证可行性,已在生产环境验证上线,对已有的问题,并给出了合理的解决方便,故写此篇文章,供各位同学参考。
升级方案
选定方案
采用数据库表同步的方式进行任务迁移,前期3.0 版本 和 2.0 版本同时运行,任务再验证没问题后,再逐步停止2.0版本。
原因:直接使用官网提供的升级脚本,无法正常运行,有较多问题,目前我们改造后,升级的数据库信息没问题,运行时数据信息有损坏,导致较多问题,所以为安全稳定,不直接使用官网提方案。
升级准备
1.首先对已有数据库进行备份,此项非常重要,
备份原始DS库: mysqldump -h ip -P 3306 -u 用户 -p 密码 数据库名 > /opt/new_dolphinscheduler.sql 恢复到新库: mysql -u 用户 –p 密码 数据库名 < 备份文件.sql
2.对已有备份表进行表结构变更
####此脚本是官网脚本改的,添加了字段 /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); -- uc_dolphin_T_t_ds_alert_R_sign drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_alert_R_sign; delimiter d// CREATE PROCEDURE uc_dolphin_T_t_ds_alert_R_sign() BEGIN IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='sign') THEN ALTER TABLE `t_ds_alert` ADD COLUMN `sign` char(40) NOT NULL DEFAULT '' COMMENT 'sign=sha1(content)' after `id`; ALTER TABLE `t_ds_alert` ADD INDEX `idx_sign` (`sign`) USING BTREE; END IF; END; d// delimiter ; CALL uc_dolphin_T_t_ds_alert_R_sign; DROP PROCEDURE uc_dolphin_T_t_ds_alert_R_sign; -- add unique key to t_ds_relation_project_user drop PROCEDURE if EXISTS add_t_ds_relation_project_user_uk_uniq_uid_pid; delimiter d// CREATE PROCEDURE add_t_ds_relation_project_user_uk_uniq_uid_pid() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_relation_project_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='uniq_uid_pid') THEN ALTER TABLE t_ds_relation_project_user ADD UNIQUE KEY uniq_uid_pid(user_id, project_id); END IF; END; d// delimiter ; # CALL add_t_ds_relation_project_user_uk_uniq_uid_pid; DROP PROCEDURE add_t_ds_relation_project_user_uk_uniq_uid_pid; -- drop t_ds_relation_project_user key user_id_index drop PROCEDURE if EXISTS drop_t_ds_relation_project_user_key_user_id_index; delimiter d// CREATE PROCEDURE drop_t_ds_relation_project_user_key_user_id_index() BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_relation_project_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='user_id_index') THEN ALTER TABLE `t_ds_relation_project_user` DROP KEY `user_id_index`; END IF; END; d// delimiter ; CALL drop_t_ds_relation_project_user_key_user_id_index; DROP PROCEDURE drop_t_ds_relation_project_user_key_user_id_index; -- add unique key to t_ds_project drop PROCEDURE if EXISTS add_t_ds_project_uk_unique_name; delimiter d// CREATE PROCEDURE add_t_ds_project_uk_unique_name() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_project' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_name') THEN ALTER TABLE t_ds_project ADD UNIQUE KEY unique_name(name); END IF; END; d// delimiter ; CALL add_t_ds_project_uk_unique_name; DROP PROCEDURE add_t_ds_project_uk_unique_name; drop PROCEDURE if EXISTS add_t_ds_project_uk_unique_code; delimiter d// CREATE PROCEDURE add_t_ds_project_uk_unique_code() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_project' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_code') THEN ALTER TABLE t_ds_project ADD UNIQUE KEY unique_code(code); END IF; END; d// delimiter ; CALL add_t_ds_project_uk_unique_code; DROP PROCEDURE add_t_ds_project_uk_unique_code; -- add unique key to t_ds_queue drop PROCEDURE if EXISTS add_t_ds_queue_uk_unique_queue_name; delimiter d// CREATE PROCEDURE add_t_ds_queue_uk_unique_queue_name() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_queue' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_queue_name') THEN ALTER TABLE t_ds_queue ADD UNIQUE KEY unique_queue_name(queue_name); END IF; END; d// delimiter ; CALL add_t_ds_queue_uk_unique_queue_name; DROP PROCEDURE add_t_ds_queue_uk_unique_queue_name; -- add unique key to t_ds_udfs drop PROCEDURE if EXISTS add_t_ds_udfs_uk_unique_func_name; delimiter d// CREATE PROCEDURE add_t_ds_udfs_uk_unique_func_name() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_udfs' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_func_name') THEN ALTER TABLE t_ds_udfs ADD UNIQUE KEY unique_func_name(func_name); END IF; END; d// delimiter ; CALL add_t_ds_udfs_uk_unique_func_name; DROP PROCEDURE add_t_ds_udfs_uk_unique_func_name; -- add unique key to t_ds_tenant drop PROCEDURE if EXISTS add_t_ds_tenant_uk_unique_tenant_code; delimiter d// CREATE PROCEDURE add_t_ds_tenant_uk_unique_tenant_code() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_tenant' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_tenant_code') THEN ALTER TABLE t_ds_tenant ADD UNIQUE KEY unique_tenant_code(tenant_code); END IF; END; d// delimiter ; CALL add_t_ds_tenant_uk_unique_tenant_code; DROP PROCEDURE add_t_ds_tenant_uk_unique_tenant_code; -- ALTER TABLE `t_ds_task_instance` ADD INDEX `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_task_instance_uk_idx_code_version; delimiter d// CREATE PROCEDURE add_t_ds_task_instance_uk_idx_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code_version') THEN ALTER TABLE `t_ds_task_instance` ADD INDEX `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_task_instance_uk_idx_code_version; DROP PROCEDURE add_t_ds_task_instance_uk_idx_code_version; -- ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`; drop PROCEDURE if EXISTS modify_t_ds_task_instance_col_task_params; delimiter d// CREATE PROCEDURE modify_t_ds_task_instance_col_task_params() BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='task_params') THEN ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`; END IF; END; d// delimiter ; CALL modify_t_ds_task_instance_col_task_params; DROP PROCEDURE modify_t_ds_task_instance_col_task_params; -- ALTER TABLE `t_ds_task_instance` ADD COLUMN `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id'; drop PROCEDURE if EXISTS add_t_ds_task_instance_col_task_group_id; delimiter d// CREATE PROCEDURE add_t_ds_task_instance_col_task_group_id() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='task_group_id') THEN ALTER TABLE `t_ds_task_instance` ADD COLUMN `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' after `var_pool`; END IF; END; d// delimiter ; CALL add_t_ds_task_instance_col_task_group_id; DROP PROCEDURE add_t_ds_task_instance_col_task_group_id; -- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_code; delimiter d// CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_code() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code') THEN ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_process_task_relation_key_idx_code; DROP PROCEDURE add_t_ds_process_task_relation_key_idx_code; -- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_pre_task_code_version` (`pre_task_code`,`pre_task_version`); drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_pre_task_code_version; delimiter d// CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_pre_task_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_pre_task_code_version') THEN ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_pre_task_code_version` (`pre_task_code`,`pre_task_version`); END IF; END; d// delimiter ; CALL add_t_ds_process_task_relation_key_idx_pre_task_code_version; DROP PROCEDURE add_t_ds_process_task_relation_key_idx_pre_task_code_version; -- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_post_task_code_version` (`post_task_code`,`post_task_version`); drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_post_task_code_version; delimiter d// CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_post_task_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_post_task_code_version') THEN ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_post_task_code_version` (`post_task_code`,`post_task_version`); END IF; END; d// delimiter ; CALL add_t_ds_process_task_relation_key_idx_post_task_code_version; DROP PROCEDURE add_t_ds_process_task_relation_key_idx_post_task_code_version; -- ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_process_code_version; delimiter d// CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_process_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_process_code_version') THEN ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_process_task_relation_key_idx_process_code_version; DROP PROCEDURE add_t_ds_process_task_relation_key_idx_process_code_version; -- ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_project_code` (`project_code`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_task_definition_log_key_idx_process_code_version; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_log_key_idx_process_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_project_code') THEN ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_project_code` (`project_code`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_log_key_idx_process_code_version; DROP PROCEDURE add_t_ds_task_definition_log_key_idx_process_code_version; -- ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_task_definition_log_key_idx_code_version; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_log_key_idx_code_version() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code_version') THEN ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_log_key_idx_code_version; DROP PROCEDURE add_t_ds_task_definition_log_key_idx_code_version; -- alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`; drop PROCEDURE if EXISTS add_t_ds_task_definition_log_col_task_group_id; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_log_col_task_group_id() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_id') THEN alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_log_col_task_group_id; DROP PROCEDURE add_t_ds_task_definition_log_col_task_group_id; -- alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`; drop PROCEDURE if EXISTS add_t_ds_task_definition_col_task_group_id; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_col_task_group_id() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_id') THEN alter table t_ds_task_definition add `task_group_id` int DEFAULT NULL COMMENT 'task group id'; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_col_task_group_id; DROP PROCEDURE add_t_ds_task_definition_col_task_group_id; -- alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`; drop PROCEDURE if EXISTS add_t_ds_task_definition_log_col_task_group_priority; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_log_col_task_group_priority() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_priority') THEN alter table t_ds_task_definition_log add `task_group_priority` tinyint DEFAULT '0' COMMENT 'task group priority' AFTER `task_group_id`; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_log_col_task_group_priority; DROP PROCEDURE add_t_ds_task_definition_log_col_task_group_priority; -- alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`; drop PROCEDURE if EXISTS add_t_ds_task_definition_col_task_group_priority; delimiter d// CREATE PROCEDURE add_t_ds_task_definition_col_task_group_priority() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_priority') THEN alter table t_ds_task_definition add `task_group_priority` tinyint DEFAULT '0' COMMENT 'task group priority' AFTER `task_group_id`; END IF; END; d// delimiter ; CALL add_t_ds_task_definition_col_task_group_priority; DROP PROCEDURE add_t_ds_task_definition_col_task_group_priority; -- ALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone'; drop PROCEDURE if EXISTS add_t_ds_user_col_time_zone; delimiter d// CREATE PROCEDURE add_t_ds_user_col_time_zone() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='time_zone') THEN ALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone'; END IF; END; d// delimiter ; CALL add_t_ds_user_col_time_zone; DROP PROCEDURE add_t_ds_user_col_time_zone; -- ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed'; drop PROCEDURE if EXISTS add_t_ds_alert_col_warning_type; delimiter d// CREATE PROCEDURE add_t_ds_alert_col_warning_type() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='warning_type') THEN ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed'; END IF; END; d// delimiter ; CALL add_t_ds_alert_col_warning_type; DROP PROCEDURE add_t_ds_alert_col_warning_type; -- ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE; drop PROCEDURE if EXISTS add_t_ds_alert_idx_idx_status; delimiter d// CREATE PROCEDURE add_t_ds_alert_idx_idx_status() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_status') THEN ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE; END IF; END; d// delimiter ; CALL add_t_ds_alert_idx_idx_status; DROP PROCEDURE add_t_ds_alert_idx_idx_status; -- ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code'; -- ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code'; -- ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id'; -- ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type'; drop PROCEDURE if EXISTS add_t_ds_alert_col_project_code; delimiter d// CREATE PROCEDURE add_t_ds_alert_col_project_code() BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='project_code') THEN ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code'; ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code'; ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id'; ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type'; END IF; END; d// delimiter ; # CALL add_t_ds_alert_col_project_code; DROP PROCEDURE add_t_ds_alert_col_project_code; -- t_ds_task_instance drop PROCEDURE if EXISTS alter_t_ds_task_instance_col_log_path; delimiter d// CREATE PROCEDURE alter_t_ds_task_instance_col_log_path() BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='log_path') THEN ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `log_path` longtext DEFAULT NULL COMMENT 'task log path'; END IF; END; d// delimiter ; CALL alter_t_ds_task_instance_col_log_path; DROP PROCEDURE alter_t_ds_task_instance_col_log_path; -- -- Table structure for table `t_ds_dq_comparison_type` -- DROP TABLE IF EXISTS `t_ds_dq_comparison_type`; CREATE TABLE `t_ds_dq_comparison_type` ( `id` int(11) NOT NULL AUTO_INCREMENT, `type` varchar(100) NOT NULL, `execute_sql` text DEFAULT NULL, `output_table` varchar(100) DEFAULT NULL, `name` varchar(100) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `is_inner_source` tinyint(1) DEFAULT '0', PRIMARY KEY (`id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_dq_execute_result -- DROP TABLE IF EXISTS `t_ds_dq_execute_result`; CREATE TABLE `t_ds_dq_execute_result` ( `id` int(11) NOT NULL AUTO_INCREMENT, `process_definition_id` int(11) DEFAULT NULL, `process_instance_id` int(11) DEFAULT NULL, `task_instance_id` int(11) DEFAULT NULL, `rule_type` int(11) DEFAULT NULL, `rule_name` varchar(255) DEFAULT NULL, `statistics_value` double DEFAULT NULL, `comparison_value` double DEFAULT NULL, `check_type` int(11) DEFAULT NULL, `threshold` double DEFAULT NULL, `operator` int(11) DEFAULT NULL, `failure_strategy` int(11) DEFAULT NULL, `state` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `comparison_type` int(11) DEFAULT NULL, `error_output_path` text DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_dq_rule -- DROP TABLE IF EXISTS `t_ds_dq_rule`; CREATE TABLE `t_ds_dq_rule` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(100) DEFAULT NULL, `type` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_dq_rule_execute_sql -- DROP TABLE IF EXISTS `t_ds_dq_rule_execute_sql`; CREATE TABLE `t_ds_dq_rule_execute_sql` ( `id` int(11) NOT NULL AUTO_INCREMENT, `index` int(11) DEFAULT NULL, `sql` text DEFAULT NULL, `table_alias` varchar(255) DEFAULT NULL, `type` int(11) DEFAULT NULL, `is_error_output_sql` tinyint(1) DEFAULT '0', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_dq_rule_input_entry -- DROP TABLE IF EXISTS `t_ds_dq_rule_input_entry`; CREATE TABLE `t_ds_dq_rule_input_entry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `field` varchar(255) DEFAULT NULL, `type` varchar(255) DEFAULT NULL, `title` varchar(255) DEFAULT NULL, `value` varchar(255) DEFAULT NULL, `options` text DEFAULT NULL, `placeholder` varchar(255) DEFAULT NULL, `option_source_type` int(11) DEFAULT NULL, `value_type` int(11) DEFAULT NULL, `input_type` int(11) DEFAULT NULL, `is_show` tinyint(1) DEFAULT '1', `can_edit` tinyint(1) DEFAULT '1', `is_emit` tinyint(1) DEFAULT '0', `is_validate` tinyint(1) DEFAULT '1', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_dq_task_statistics_value -- DROP TABLE IF EXISTS `t_ds_dq_task_statistics_value`; CREATE TABLE `t_ds_dq_task_statistics_value` ( `id` int(11) NOT NULL AUTO_INCREMENT, `process_definition_id` int(11) DEFAULT NULL, `task_instance_id` int(11) DEFAULT NULL, `rule_id` int(11) NOT NULL, `unique_code` varchar(255) NULL, `statistics_name` varchar(255) NULL, `statistics_value` double NULL, `data_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_relation_rule_execute_sql -- DROP TABLE IF EXISTS `t_ds_relation_rule_execute_sql`; CREATE TABLE `t_ds_relation_rule_execute_sql` ( `id` int(11) NOT NULL AUTO_INCREMENT, `rule_id` int(11) DEFAULT NULL, `execute_sql_id` int(11) DEFAULT NULL, `create_time` datetime NULL, `update_time` datetime NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- -- Table structure for table t_ds_relation_rule_input_entry -- DROP TABLE IF EXISTS `t_ds_relation_rule_input_entry`; CREATE TABLE `t_ds_relation_rule_input_entry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `rule_id` int(11) DEFAULT NULL, `rule_input_entry_id` int(11) DEFAULT NULL, `values_map` text DEFAULT NULL, `index` int(11) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for t_ds_k8s -- ---------------------------- DROP TABLE IF EXISTS `t_ds_k8s`; CREATE TABLE `t_ds_k8s` ( `id` int(11) NOT NULL AUTO_INCREMENT, `k8s_name` varchar(100) DEFAULT NULL, `k8s_config` text DEFAULT NULL, `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; -- ---------------------------- -- Table structure for t_ds_k8s_namespace -- ---------------------------- DROP TABLE IF EXISTS `t_ds_k8s_namespace`; CREATE TABLE `t_ds_k8s_namespace` ( `id` int(11) NOT NULL AUTO_INCREMENT, `limits_memory` int(11) DEFAULT NULL, `namespace` varchar(100) DEFAULT NULL, `online_job_num` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `pod_replicas` int(11) DEFAULT NULL, `pod_request_cpu` decimal(14,3) DEFAULT NULL, `pod_request_memory` int(11) DEFAULT NULL, `limits_cpu` decimal(14,3) DEFAULT NULL, `k8s` varchar(100) DEFAULT NULL, `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`), UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`) ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; -- ---------------------------- -- Table structure for t_ds_relation_namespace_user -- ---------------------------- DROP TABLE IF EXISTS `t_ds_relation_namespace_user`; CREATE TABLE `t_ds_relation_namespace_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `user_id` int(11) NOT NULL COMMENT 'user id', `namespace_id` int(11) DEFAULT NULL COMMENT 'namespace id', `perm` int(11) DEFAULT '1' COMMENT 'limits of authority', `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`), UNIQUE KEY `namespace_user_unique` (`user_id`,`namespace_id`) ) ENGINE=InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; -- ---------------------------- -- Table structure for t_ds_alert_send_status -- ---------------------------- DROP TABLE IF EXISTS t_ds_alert_send_status; CREATE TABLE t_ds_alert_send_status ( `id` int(11) NOT NULL AUTO_INCREMENT, `alert_id` int(11) NOT NULL, `alert_plugin_instance_id` int(11) NOT NULL, `send_status` tinyint(4) DEFAULT '0', `log` text, `create_time` datetime DEFAULT NULL COMMENT 'create time', PRIMARY KEY (`id`), UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for t_ds_audit_log -- ---------------------------- DROP TABLE IF EXISTS `t_ds_audit_log`; CREATE TABLE `t_ds_audit_log` ( `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT'key', `user_id` int(11) NOT NULL COMMENT 'user id', `resource_type` int(11) NOT NULL COMMENT 'resource type', `operation` int(11) NOT NULL COMMENT 'operation', `time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', `resource_id` int(11) NULL DEFAULT NULL COMMENT 'resource id', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for t_ds_task_group -- ---------------------------- DROP TABLE IF EXISTS `t_ds_task_group`; CREATE TABLE `t_ds_task_group` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', `name` varchar(100) DEFAULT NULL COMMENT 'task_group name', `description` varchar(200) DEFAULT NULL, `group_size` int (11) NOT NULL COMMENT'group size', `use_size` int (11) DEFAULT '0' COMMENT 'used size', `user_id` int(11) DEFAULT NULL COMMENT 'creator id', `project_code` bigint(20) DEFAULT 0 COMMENT 'project code', `status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(`id`) ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; -- ---------------------------- -- Table structure for t_ds_task_group_queue -- ---------------------------- DROP TABLE IF EXISTS `t_ds_task_group_queue`; CREATE TABLE `t_ds_task_group_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', `task_id` int(11) DEFAULT NULL COMMENT 'taskintanceid', `task_name` varchar(100) DEFAULT NULL COMMENT 'TaskInstance name', `group_id` int(11) DEFAULT NULL COMMENT 'taskGroup id', `process_id` int(11) DEFAULT NULL COMMENT 'processInstace id', `priority` int(8) DEFAULT '0' COMMENT 'priority', `status` tinyint(4) DEFAULT '-1' COMMENT '-1: waiting 1: running 2: finished', `force_start` tinyint(4) DEFAULT '0' COMMENT 'is force start 0 NO ,1 YES', `in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY( `id` ) )ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; ALTER TABLE t_ds_process_instance ADD`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId'; ALTER TABLE t_ds_process_instance ADD `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time'; ALTER TABLE t_ds_process_definition ADD `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority'; ALTER TABLE t_ds_process_definition_log ADD `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority';
安装3.0.1 版本
具体安装参考官网,创建新库,这里不做介绍。
数据导入数据库
重点注意
次脚本导入时:
-
调度 状态全设置为下线状态 定时表 中 t_ds_schedules release_state 全设置为0了,为了防止和原调度冲突,测试完成后需要手动上线即可。
-
worker 分组组信息需要重新设置,和原先保持一致即可。
-
告警实例管理需要重新添加,然后在告警组管理中重新绑定。
#项目表 truncate table t_ds_project; insert into t_ds_project select id, name, code, description, user_id, flag, create_time, update_time from dolphin201.t_ds_project; #任务过程定义####唯一dag truncate table t_ds_process_definition; insert into t_ds_process_definition select id, code, name, version, description, project_code, release_state, user_id, global_params, flag, locations, warning_group_id, timeout, tenant_id, execution_type, create_time, update_time from dolphin201.t_ds_process_definition; #任务过程定义日志 truncate table t_ds_process_definition_log; insert into t_ds_process_definition_log select t1.id, t1.code, t1.name, t1.version, t1.description, t1.project_code, t1.release_state, t1.user_id, t1.global_params, t1.flag, t1.locations, t1.warning_group_id, t1.timeout, t1.tenant_id, t1.execution_type, t1.operator, t1.operate_time, t1.create_time, t1.update_time from dolphin201.t_ds_process_definition_log t1 inner join (select code, project_code, max(version) version from dolphin201.t_ds_process_definition_log group by project_code, code) t2 on t1.code = t2.code and t1.project_code = t2.project_code and t1.version = t2.version ; ###进行任务相关性表 truncate table t_ds_process_task_relation; insert into t_ds_process_task_relation select id, name, project_code, process_definition_code, process_definition_version, pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_time from dolphin201.t_ds_process_task_relation; ###进行任务相关性表日志 truncate table t_ds_process_task_relation_log; insert into t_ds_process_task_relation_log select t3.id, t3.name, t3.project_code, t3.process_definition_code, t3.process_definition_version, t3.pre_task_code, t3.pre_task_version, t3.post_task_code, t3.post_task_version, t3.condition_type, t3.condition_params, t3.operator, t3.operate_time, t3.create_time, t3.update_time # from t_ds_process_task_relation; from dolphin201.t_ds_process_task_relation_log t3 inner join (select project_code, process_definition_code, post_task_code, max(process_definition_version) process_definition_version, max(post_task_version) AS post_task_version from dolphin201.t_ds_process_task_relation_log group by project_code, process_definition_code, post_task_code) t4 on t3.project_code = t4.project_code and t3.process_definition_code = t4.process_definition_code and t3.post_task_code = t4.post_task_code and t3.process_definition_version = t4.process_definition_version and t3.post_task_version = t4.post_task_version ; ###任务定义 truncate table t_ds_task_definition; insert into t_ds_task_definition select id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, task_group_id, task_group_priority, create_time, update_time from dolphin201.t_ds_task_definition; ###任务定义日志 truncate table t_ds_task_definition_log; insert into t_ds_task_definition_log select t5.id, t5.code, t5.name, t5.version, t5.description, t5.project_code, t5.user_id, t5.task_type, t5.task_params, t5.flag, t5.task_priority, t5.worker_group, t5.environment_code, t5.fail_retry_times, t5.fail_retry_interval, t5.timeout_flag, t5.timeout_notify_strategy, t5.timeout, t5.delay_time, t5.resource_ids, t5.operator, t5.task_group_id, t5.task_group_priority, t5.operate_time, t5.create_time, t5.update_time from dolphin201.t_ds_task_definition_log t5 ; ###定时计划 truncate table t_ds_schedules; insert into t_ds_schedules select id, process_definition_code, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, '0' AS release_state, warning_type, warning_group_id, process_instance_priority, worker_group, environment_code, create_time, update_time from dolphin201.t_ds_schedules; ###告警组 truncate table t_ds_alertgroup; insert into t_ds_alertgroup select id, alert_instance_ids, create_user_id, group_name, description, create_time, update_time from dolphin201.t_ds_alertgroup; # # ###告警组信息 ,这张表信息有冲突故不导入,后续自己配置下 # truncate table t_ds_alert_plugin_instance; # insert into t_ds_alert_plugin_instance # select id, plugin_define_id, plugin_instance_params, create_time, update_time, instance_name # from dolphin201.t_ds_alert_plugin_instance; ###用户 truncate table t_ds_user; insert into t_ds_user select id, user_name, user_password, user_type, email, phone, tenant_id, create_time, update_time, queue, state, time_zone from dolphin201.t_ds_user; ###租户 truncate table t_ds_tenant; insert into t_ds_tenant select id, tenant_code, description, queue_id, create_time, update_time from dolphin201.t_ds_tenant; ####资源中心 truncate table t_ds_resources; insert into t_ds_resources select id, alias, file_name, description, user_id, type, size, create_time, update_time, pid, full_name, is_directory from dolphin201.t_ds_resources; ####数据源 truncate table t_ds_datasource; insert into t_ds_datasource select id, name, note, type, user_id, connection_params, create_time, update_time from dolphin201.t_ds_datasource; ###环境情况 truncate table t_ds_environment; insert into t_ds_environment select id, code, name, config, description, operator, create_time, update_time from dolphin201.t_ds_environment;
遇到问题和解决方案
任务无法打开
如直接使用官网任务脚本会报次错误,保存可以直接看日志,报错的原因是由于字段缺少导致的,上面ddl语句中已经添加相关字段语句。
出现少量任务无法保存问题
主要原因主键冲突造成的!!!
最简单解决方案: 重新复制一个任务,在新任务中进行更改保存
用户授权的任务需要进行重新授权。
本文由 白鲸开源科技 提供发布支持!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
分布式数据库技术的演进和发展方向
这些年大家都在谈分布式数据库,各大企业也纷纷开始做数据库的分布式改造。那么,所谓的分布式数据库到底是什么?采用什么架构?优势在哪?为什么越来越多企业选择它?分布式数据库技术会向什么方向发展?带着这些疑问,一探究竟吧!参与文末的话题互动,更有机会赢取精美奖品~ 分布式数据库的架构演进 随着数据量的爆发增长,传统集中式数据库面临极大的挑战: 性能瓶颈:数据规模爆发增长,传统集中式数据库难以维持数据量大时的性能,而分布式数据库的性能可以水平扩展; 缺失混合负载能力:数据量爆发增长带来对数据分析(OLAP)需求的增长。企业需要使用两套系统分别支撑事务交易(OLTP)和数据分析(OLAP),不仅造成了大量的数据冗余,同时增加了系统的复杂度和运维难度。而分布式数据库的混合负载能力可大幅度提升分析的时效性,减少数据冗余,并大大提高灵活性; 高昂成本:集中式数据库水平扩展难,可靠性需要付出高昂的成本。而分布式数据库的架构支持灵活扩展,实现高可用方案的成本较低。 分布式数据库与单机数据库的不同在于其可以将核心功能扩展到多台节点,甚至多个地域,包括事务管理、数据存储和数据查询等。从实现方式上看,分布式数据...
- 下一篇
GaussDB SQL调优:建立合适的索引
背景 GaussDB是华为公司倾力打造的自研企业级分布式关系型数据库,该产品具备企业级复杂事务混合负载能力,同时支持优异的分布式事务,同城跨AZ部署,数据0丢失,支持1000+扩展能力,PB级海量存储等企业级数据库特性。拥有云上高可用,高可靠,高安全,弹性伸缩,一键部署,快速备份恢复,监控告警等关键能力,能为企业提供功能全面,稳定可靠,扩展性强,性能优越的企业级数据库服务。 一、建立合适的索引 在这个Codelabs中,您将体验GaussDB通过建立合适的索引来达到性能调优的实际案例。 1、SQL调优指南 SQL调优的唯一目的是“资源利用最大化”,即CPU、内存、磁盘IO、网络IO四种资源利用最大化。所有调优手段都是围绕资源使用开展的。所谓资源利用最大化是指SQL语句尽量高效,节省资源开销,以最小的代价实现最大的效益。比如做典型点查询的时候,可以用seqscan+filter(即读取每一条元组和点查询条件进行匹配)实现,也可以通过indexscan实现,显然indexscan可以以更小的代价实现相同的效果。 2、建立合适的索引 a. 现象描述 查询与销售部所有员工的信息: SELECT...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6