基于数据驱动 U-Net 模型的大气污染物扩散快速预测,提升计算速度近6000倍
项目背景
当前,常见的大气污染预测模型大多是基于物理机理构建的,比如空气质量预测模型 Calpuff、AERMOD、CMAQ 等。然而,这些模型运算较为复杂,对于输入数据的要求非常高,运算耗时也比较长,适合用于常规固定区域的预报。当遇到突发污染事件时,就无法有效发挥作用。
针对以上问题,本项目以某城区 3km*3km 范围的固定模拟区域,根据污染物扩散模型,快速计算任意释放点源和任意风向的污染物扩散动图,并进行精度评估。仅利用城市局部污染物扩散云图作为输入,使用深度学习模型提取图像中污染物扩散的特征,纯数据驱动,无需建立物理模型,预测耗时短,适合作为突发污染扩散事件时的应急处置决策辅助。
项目需求
课题名称
基于数据驱动的污染物扩散深度学习模型案例
课题需求
外部单位提供数据集,总数据集详细描述:120 个动图数据(3 个风速*5 个释放源点位 * 8 个风向)。选取其中任意 1 个动图的数据,基于数据驱动类模型(模型不限制)提取数据特征,得到污染物扩散模型,可对污染物扩散进行预测。
- 项目地址
https://aistudio.baidu.com/aistudio/projectdetail/5663515
实现过程
数据集
我们选择了风速 15m/s,风向正北,Pos_0 作为污染源释放点的动图数据,数据来源于某城区 3km*3km 范围的固定区域内污染物扩散 CFD 模拟结果(南京欧帕提亚公司提供),共 745 秒 148 张污染物浓度云图,两张图片时间间隔 5 秒。
基于飞桨 2.4.0 的开发环境,在对动图解压之后,我们发现动图解压得到的 181 张静态图片中第 148 张之后的图片存在明显的图像抖动。我们采用了基于 Harris 角点检测的图像对齐算法进行处理,但是图像抖动没有得到完全消除。为了保证模型输入数据的质量,我们丢弃了第 148 张之后的静态图片。
图1 原始数据
U-Net 网络模型
网络模型如图 2 所示,其由 3 个 Encoder/Decoder、9 个卷积 Conv、9 个反卷积 Conv-T 组成,约 30 万个训练参数。之所以选择 U-Net,是因为该网络在图像分割和目标识别中应用广泛,污染物扩散模式学习可以看作是一种动态的目标识别任务,只不过目标的形态比较抽象;另一个原因是 U-Net 的代码实现较简单,短时间内可以完成网络的搭建。
图2 U-Net网络图
核心代码
import paddle import paddle.nn as nn import paddle.nn.functional as F from paddle.nn.utils import weight_norm # 创建基础卷积层 def create_layer(in_channels, out_channels, kernel_size, wn=True, bn=True, activation=nn.ReLU, convolution=nn.Conv2D): assert kernel_size % 2 == 1 layer = [ ] conv = convolution(in_channels, out_channels, kernel_size, padding=kernel_size // 2) if wn: conv = weight_norm(conv) layer.append(conv) if activation is not None: layer.append(activation()) if bn: layer.append(nn.BatchNorm2D(out_channels)) return nn.Sequential(*layer) # 创建Encoder中的单个块 def create_encoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2): encoder = [ ] for i in range(layers): _in = out_channels _out = out_channels if i == 0: _in = in_channels encoder.append(create_layer(_in, _out, kernel_size, wn, bn, activation, nn.Conv2D)) return nn.Sequential(*encoder) # 创建Decoder中的单个块 def create_decoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2, final_layer=False): decoder = [ ] for i in range(layers): _in = in_channels _out = in_channels _bn = bn _activation = activation if i == 0: _in = in_channels * 2 if i == layers - 1: _out = out_channels if final_layer: _bn = False _activation = None decoder.append(create_layer(_in, _out, kernel_size, wn, _bn, _activation, nn.Conv2DTranspose)) return nn.Sequential(*decoder) # 创建Encoder def create_encoder(in_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2): encoder = [ ] for i in range(len(filters)): if i == 0: encoder_layer = create_encoder_block(in_channels, filters[i], kernel_size, wn, bn, activation, layers) else: encoder_layer = create_encoder_block(filters[i - 1], filters[i], kernel_size, wn, bn, activation, layers) encoder = encoder + [encoder_layer] return nn.Sequential(*encoder) # 创建Decoder def create_decoder(out_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2): decoder = [] for i in range(len(filters)): if i == 0: decoder_layer = create_decoder_block(filters[i], out_channels, kernel_size, wn, bn, activation, layers, final_layer=True) else: decoder_layer = create_decoder_block(filters[i], filters[i - 1], kernel_size, wn, bn, activation, layers, final_layer=False) decoder = [decoder_layer] + decoder return nn.Sequential(*decoder) # 创建网络 class UNetEx(nn.Layer): def __init__(self, in_channels, out_channels, kernel_size=3, filters=[16, 32, 64], layers=3, weight_norm=True, batch_norm=True, activation=nn.ReLU, final_activation=None): super().__init__() assert len(filters) > 0 self.final_activation = final_activation self.encoder = create_encoder(in_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers) decoders = [ ] # for i in range(out_channels): decoders.append(create_decoder(out_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers)) self.decoders = nn.Sequential(*decoders) def encode(self, x): tensors = [ ] indices = [ ] sizes = [ ] for encoder in self.encoder: x = encoder(x) sizes.append(x.shape) tensors.append(x) x, ind = F.max_pool2d(x, 2, 2, return_mask=True) indices.append(ind) return x, tensors, indices, sizes def decode(self, _x, _tensors, _indices, _sizes): y = [ ] for _decoder in self.decoders: x = _x tensors = _tensors[:] indices = _indices[:] sizes = _sizes[:] for decoder in _decoder: tensor = tensors.pop() size = sizes.pop() ind = indices.pop() # 反池化操作,为上采样 x = F.max_unpool2d(x, ind, 2, 2, output_size=size) x = paddle.concat([tensor, x], axis=1) x = decoder(x) y.append(x) return paddle.concat(y, axis=1) def forward(self, x): x, tensors, indices, sizes = self.encode(x) x = self.decode(x, tensors, indices, sizes) if self.final_activation is not None: x = self.final_activation(x) return x
训练
训练时输入数据为上一时刻的污染物云图,输出为预测的下一时刻的污染物云图。当前的训练 batch-size 为 1,即只预测下一时刻的污染物扩散情况。训练时,每 10 个 epoch 保存一次模型,防止训练意外中断时模型参数丢失。
# 训练方法 def train(model, train_dataset, criterion, optimizer, device, num_epochs): loss_history = [ ] epoch_loss = 0 # 遍历批次 for epoch in range(num_epochs): optimizer.clear_grad() for batch_id in range(len(train_dataset)-1): inputs = train_dataset[batch_id] outputs_true = train_dataset[batch_id+1] inputs = T.ToTensor()(inputs) inputs = paddle.unsqueeze(inputs, 0) outputs_true = T.ToTensor()(outputs_true) outputs_true = paddle.unsqueeze(outputs_true, 0) # 训练 outputs = model(inputs) # 计算损失值 loss = criterion(outputs, outputs_true) if batch_id % 10 ==0: print('epoch:',epoch,'batch_id:',batch_id,'loss:',loss.numpy()) loss.backward() epoch_loss += loss.item() optimizer.step() epoch_loss /= len(train_dataset) loss_history.append(epoch_loss) print("Epoch [{}/{}], Loss: {:.8f}".format(epoch + 1, num_epochs, loss.numpy()[0])) # 保存模型 if epoch % 10 == 0: save_model(model, '/home/aistudio/pollution_model.pdparams') print("Training complete.") return loss_history
预测
预测时,输入测试数据某时刻的污染物扩散云图,预测下一时刻的污染物扩散情况。测试函数中 supervise 这个 flag 为后续连续预测多个时刻的数据预置了接口。目前 supervise 置为 true,当模型预备连续预测多个时刻数据时,测试时将 supervise 置为 false。
def test(): # 初始化结果列表 results = [ ] # 测试集合起始点 inputs = test_dataset[0] inputs = T.ToTensor()(inputs) inputs = paddle.unsqueeze(inputs, 0) # 是否supervise flag_supervise = True device = paddle.set_device('gpu' if paddle.is_compiled_with_cuda() else 'cpu') # 加载模型 model = UNetEx(3,3,3) load_model(model,'/home/aistudio/pollution_model.pdparams',device) for num in range(1,10): # 进行预测 outputs = model(inputs) outputs_np = outputs.numpy() outputs_np = np.squeeze(outputs_np, axis=0) # 去除第一个维度(batch_size) outputs_np = np.transpose(outputs_np, (1, 2, 0)) # 将通道维度调整为最后一个维度 outputs_np = (255 * np.clip(outputs_np, 0, 1)).astype('uint8') #outputs_np = outputs_np.transpose([1, 2, 0]) #outputs_np_uint8 = (outputs_np * 255).astype(np.uint8) # 将预测结果添加到结果列表中 results.append(outputs_np) if flag_supervise == False: # 将预测结果作为下一帧的输入 inputs = outputs else: # 使用真实数据预测 inputs = test_dataset[num+1] inputs = T.ToTensor()(inputs) inputs = paddle.unsqueeze(inputs, 0) return results results = test()
项目成果
图3 计算函数损失值
图4 对比 CFD 模拟参数对比
图5 残差值对比
如图 5 所示,浓度误差主要集中在污染源附近(如图红色框),主要数值分布在-0.02~0.02 之间。不同颜色分别代表不同浓度区间误差,蓝色表示的低浓度相对误差较小,绿色红色表示的中高浓度误差平均误差较高,绿色区域表征的中等浓度区域,偏大的误差影响的面积较大。
图6 数值对比
未来发展方向
预测能力方面
-
基于前一时刻的污染物浓度云图,预测后十个时刻、二十个时刻,四十个时刻的污染物浓度云图;
-
尝试用多时刻预测多时刻。
网络方面
-
尝试引入更先进的网络架构,如 transformer;
-
对于网络层数和每层网络的神经元个数,尝试进行敏感性分析和误差分析;
-
尝试引入更多种类的激活函数如 tanh,silu 等;
-
尝试对 learning rate、batch size 等超参数进行调整实验。
物理原理方面
-
尝试引入物理先验知识,对建筑、边界位置施加 loss 软约束;
-
尝试利用流体 NS 方程对模型进行修正。
模型方面
-
尝试引入更多参数作为输入:如污染源位置、污染源初始浓度等提高模型的适应能力;
-
增加模型参数量级,探索大模型对复杂多态问题的处理能力;
-
尝试和传统流体求解方法进行融合。
项目意义与心得
本项目尝试用 U-Net 网络通过污染物扩散云图来学习污染物扩散的模型参数,对污染物扩散进行快速预测,是数据驱动计算场景拓展的一次探索。从项目结果来看,模型计算速度相比 CFD 模拟提升明显,但是模型预测的效果还有待提升,未来将通过探索以上几个方向,不断优化模型预测效果。项目实现过程中,我们花费了大量的时间处理背景存在抖动的图像,直到后来发现有一部分数据集的质量要远远好于另一部分,我们选择放弃质量不好的数据,从而加快了项目的进展。
数据处理过程中有以下几个方面的心得。
第一,对项目的数据应该第一时间进行全局探索,了解数据的全貌,对数据质量进行评估;
第二,与其花费大量的时间处理质量不好的数据,不如先使用质量较好的数据,优先做对模型取得进展更加关键的事情;
第三,相对于改变模型的结构,提高输入数据的质量对模型的训练结果起到更加积极的作用。一些开源模型的效果无法复现的原因在于训练数据的不公开,即便大家都用到同样结构的网络,但是训练数据不同,模型取得的效果就大不相同。从这个角度看,模型参数是训练数据在网络上留下的压缩信息,训练数据存在的瑕疵很难通过优化网络来解决。
飞桨 AI for Science 共创计划为本项目提供了强大的技术支持,打造活跃的前瞻性的 AI for Science 开源社区,通过飞桨 AI for Science 共创计划,学习到了如何在飞桨平台上使用科学计算的 AI 方法去解决 CFD 模拟预测的问题,并且大幅度提高了数据驱动计算的速度。相信未来会有越来越多的项目通过 AI for Science 共创计划建立产学研闭环,推动科研创新与产业赋能。
相关地址
- 飞桨 AI for Science 共创计划
https://www.paddlepaddle.org.cn/science
- 飞桨 PPSIG-Science 小组
https://www.paddlepaddle.org.cn/specialgroupdetail?id=9

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
StampedLock:高并发场景下一种比读写锁更快的锁
摘要:在读多写少的环境中,有没有一种比ReadWriteLock更快的锁呢?有,那就是JDK1.8中新增的StampedLock! 本文分享自华为云社区《【高并发】高并发场景下一种比读写锁更快的锁》,作者: 冰 河。 什么是StampedLock? ReadWriteLock锁允许多个线程同时读取共享变量,但是在读取共享变量的时候,不允许另外的线程多共享变量进行写操作,更多的适合于读多写少的环境中。那么,在读多写少的环境中,有没有一种比ReadWriteLock更快的锁呢? 答案当然是有!那就是我们今天要介绍的主角——JDK1.8中新增的StampedLock!没错,就是它! StampedLock与ReadWriteLock相比,在读的过程中也允许后面的一个线程获取写锁对共享变量进行写操作,为了避免读取的数据不一致,使用StampedLock读取共享变量时,需要对共享变量进行是否有写入的检验操作,并且这种读是一种乐观读。 总之,StampedLock是一种在读取共享变量的过程中,允许后面的一个线程获取写锁对共享变量进行写操作,使用乐观读避免数据不一致的问题,并且在读多写少的高并发环境...
- 下一篇
用时序数据库 DolphinDB 实现地震波形的分析预警
1. 绪论 波形数据的存储与实时流处理是地震预警、地震速报、地震烈度速报、震源机制解等数字地震台网综合处理系统的前提,合理的存储方案与高效的实时流处理架构能极大地节约存储成本、降低响应延时、方便震源分析。 本篇教程会为有该方面需求的客户提供一个基于 DolphinDB 的地震波形数据存储及实时流处理的解决方案,助力用户降低存储成本、提升效率。 1.1 行业背景 国家地震台网、 区域地震台网和流动地震台网等组成的中国数字地震观测网络系统每天不间断地产生地震波行数据。该数据的原始形态是方便数据处理的,但为了进行归档和交换,必须进行统一的数据格式化,现使用的国际通用交换格式为 SEED。 国家测震台网数据备份中心实时接收存储的地震波形数据始于2007年8月,至今数据量大约有 100TB 左右。随着地震台站数量和强震数据的不断增多,可以预见今后的测震数据增速会越来越快。 现今大部分测震数据是以 SEED 文件格式或存储于盘阵内,或存储于光盘、移动硬盘等离线介质内,辅以数十张描述这些数据文件的关系型表,这些表遵从《中国数字测震台网数据规范》。有数据分析需求的研究人员通过网络等方式进行数据申请,数...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启