[P1] 项目初始态势
开始接手项目时,领导要求很简单,就是做一个本地服务,手机连接上服务,能控制本地系统内的各种设备,至于设备状态如何采集与控制,数据如何分析和存储这里略过,其通信机制类似于下图:
![]()
整个项目,领导只安排了只有我一个项目人员,当然能简单就简单。我将本地数据经过转换汇总,终端采用设备-〉信息点的层级关系显示,当然设备层具有自身属性,信息点层也具有自身属性,终端是瘦客户端,在链接上服务端后,由服务端推送层级结构和相关属性信息给客户端,进行初始化,后续就是实现实时上行推送和下行控制,上行和下行采用独立线程处理。
![]()
一个星期后,安装在项目现场,实现了本地WIFI的手机端远程监控,工程人员和使用人员都觉得OK.
又经历一段时间的功能追加,支持各种类型设备可配置接入,支持蓝牙、串口、网络采集数据,支持实时分析与告警,跨平台部署,自动化调度运维等一系列任务,想来项目应该结题。嗯,领导有了新想法。
[P2]项目变种
采用socket通信,手机端需要设置IP,很麻烦,领导希望工程人员、使用人员只要连上WIFI打开app就能实现远程监控。领导发话,没办法只能照办,为了偷懒,当然采用现成的通信机制改造,最终选型了Zero-ice实现,通信机制类似于:
![]()
我将的ice配置文件如下:
至于接口实现代码略过,给出部分服务和客户端的通信连接实现参考:
服务端:
void PCSServer::listen()
{
CacheDataObj *ptr_CacheDataObj = CacheDataObj::getInstance();
iceType = ptr_CacheDataObj->getIceType();
confFile = ptr_CacheDataObj->getIceConfFile();
try
{
Ice::InitializationData initData;
initData.properties = Ice::createProperties();
#ifdef ICE_STATIC_LIBS
Ice::registerIceSSL();
#endif
CLogger::createInstance()->Log(eTipMessage
,"start load ice config file %s", confFile.c_str());
initData.properties->load(confFile);
char ** argv;
char *p = NULL;
argv = &p;
int argc = 0;
if (iceType > 0)
{
communicator_ = Ice::initialize(argc, argv, initData);
}
else {
communicator_ = this->communicator();
}
if (NULL == communicator_) {
CLogger::createInstance()->Log(eConfigError
, "communicator initialize fail! [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
}
}
catch (const Ice::Exception& ex)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[1]:%s [%s %s %d]"
, ex.ice_id().c_str(), __FILE__, __FUNCTION__, __LINE__);
disListen();
}
catch (const std::string & msg)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[2]:%s [%s %s %d]"
, msg.c_str(), __FILE__, __FUNCTION__, __LINE__);
disListen();
}
catch (const char * msg)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[3]:%s [%s %s %d]"
, msg, __FILE__, __FUNCTION__, __LINE__);
disListen();
}
if (NULL != communicator_)
{
try {
if (iceType > 0)
{
Ice::ObjectAdapterPtr adapter_pcs = communicator_->createObjectAdapter("SyePcs");
Ice::Identity id_pcs = Ice::stringToIdentity("SyePcs");
PCS::ServerAchievePtr pcs_ = new ServerAchieveI(communicator_);
adapter_pcs->add(pcs_, id_pcs);
adapter_pcs->activate();
CLogger::createInstance()->Log(eTipMessage, "ice server listen!");
isListen = true;
communicator_->waitForShutdown();
}
else {
Ice::PropertiesPtr properties = communicator_->getProperties();
Ice::ObjectAdapterPtr adapter_pcs = communicator_->createObjectAdapter("SyePcs");
std::string id = communicator_->getProperties()->getPropertyWithDefault("Identity", "PCSIO");
Ice::Identity id_pcs = Ice::stringToIdentity(id);
CLogger::createInstance()->Log(eTipMessage
, "id_pcs.name=%s", id_pcs.name.c_str());
PCS::ServerAchievePtr pcs_ = new ServerAchieveI(communicator_);
adapter_pcs->add(pcs_, id_pcs);
adapter_pcs->activate();
CLogger::createInstance()->Log(eTipMessage, "ice server listen!");
isListen = true;
communicator_->waitForShutdown();
}
}
catch (const Ice::Exception& ex)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[1]:%s [%s %s %d]"
, ex.ice_id().c_str(), __FILE__, __FUNCTION__, __LINE__);
disListen();
}
catch (const std::string & msg)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[2]:%s [%s %s %d]"
, msg.c_str(), __FILE__, __FUNCTION__, __LINE__);
disListen();
}
catch (const char * msg)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[3]:%s [%s %s %d]"
, msg, __FILE__, __FUNCTION__, __LINE__);
disListen();
}
catch (...)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[4]:unkown error for adapter crate and activate! [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
disListen();
}
}
else
{
CLogger::createInstance()->Log(eConfigError
, "communicator is NULL! [%s %s %d]"
, __FILE__, __FUNCTION__, __LINE__);
}
};
void PCSServer::disListen()
{
if (communicator_)
{
try
{
isListen = false;
communicator_->destroy();
}
catch (const Ice::Exception& ex)
{
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception:%s [%s %s %d]"
, ex.ice_id().c_str()
, __FILE__, __FUNCTION__, __LINE__);
}
}
};
客户端只给出java的,c++实现类似:
由于本地通信不是很饱和,我只将serverPcs设了一个Node,其简化的application.xml配置如下,serverPcs是按需由节点服务icegridnode启动的:
为了简化与测试,暂时将服务部署本机,后续只要将IP更改即可,并在Host文件将IP与域内自定义站点映射即可,下面以本机地址为例配置了中心服务、节点服务的配置文件,通过节点服务启动serverPcs。
主中心服务配置,一些描述也一并给出,而从中心服务和节点服务只给出配置项:
从中心服务配置:
节点服务的配置:
配置完成后我们将配置启动脚本,当然也可以通过命令逐个执行,为了便捷,配置数个脚本,以win系统为例:
(1)start_center_server.bat,用于启动主从中心服务
start /b /Min "registry01" icegridregistry --Ice.Config=config.master
start /b /Min "registry02" icegridregistry --Ice.Config=config.slave
(2)start_admin.bat,用于修改更新配置
::如果更改配置,需要重新映射服务,删除数据目录并重新生成或更新,需先启动中心服务,再调用配置服务更新
::start_center_server.bat
icegridadmin --Ice.Config=config.admin -e "application add 'application.xml'"
icegridadmin --Ice.Config=config.admin -e "application update 'application.xml'"
(3)start_server.bat,启动节点服务集,本案例只有一个
cd D:\SYE_MCS_PRO\pcs_project\PCS_server\x64\PcsSrv
start /b /Min "node01" icegridnode --Ice.Config=config.node
cd D:\SYE_MCS_PRO\pcs_project\sye_mcs\mgr
(4)stop_server.bat,关闭服务集使用
taskkill /f /im icegridregistry.exe
taskkill /f /im icegridnode.exe
taskkill /f /im PCS_server.exe
配置完成,先去配置一些服务需要的一些目录,
如
D:\SYE_MCS_PRO\pcs_project\sye_mcs\mgr\db\LMDB_master,
D:\SYE_MCS_PRO\pcs_project\sye_mcs\mgr\db\LMDB_slave,
D:\SYE_MCS_PRO\pcs_project\PCS_server\x64\PcsSrv\db\node_01,
D:\SYE_MCS_PRO\pcs_project\PCS_server\x64\PcsSrv\db\node_out_01,
我手动配置是为了防止尝试配置测试的就数据信息阻隔目录创建或权限受限,完成目录配置后
,先start_center_server.bat服务,然后启动start_admin.bat(配置有改动时)和start_server.bat。
当服务启动稳定后,serverPcs服务应该是没有启动的,只有客户端连接需求后才会被触发启动。
客户端的ice配置文件config0.client如下:
现在启动测试终端,通过从服务-〉节点服务-〉serverPcs,终端与serverPcs建立了长链接:
![]()
![]()
手机终端类似下图
![]()
![]()
服务部署到工程上使用了一段时间,领导开始又不满足了,嗯,想在家里也能远程监控现场设备,顺便来个短信告警、统计报表啥的。好吧,准备升级。
[P3]升级到互联网
嗯,这次在利用Zero-Ice比较熟络了,我设计了通信机制如下:
![]()
我将本地的serverPcs作为客户端,并在阿里云购买了 ECS、数据库等服务,并建立了一个控制中转的门户服务serverMcs来支持各地的serverPcs链接进来,另外建立一个serverDcs来支持互联网终端链接进来,同时将终端升级,将每一个serverPcs推送的数据看作一个区域,即显示格式为区域-〉设备-〉信息点的层级结构。
同样,需要为serverPcs<->serverMcs和app<->serverDcs建立Ice通信接口,其接口文件与本地的终端APP通信类似,
McsInterface.ice:
DcsInterface.ice:
其实现逻辑很简单,就是serverPcs只要重新连接上云端后,推送一次设备-〉信息点整个层级结构信息,然后根据业务需要在变化或定期上送信息点数值和接收下行控制,而对于终端来说,也是瘦客户端,其链接上云端后,先根据其订购和账号权限,获得相应的设备及信息点的初始化推送,在终端建立起区域->设备->信息点的层级显示,后续就是刷新服务端推送数据和下发控制。
通信接口函数的具体实现略过,通信链接实现参考本地(java)即可,下面给出c++客户端通信实现代码示例。
int MCSClientV::Run()
{
while (!exitflag)
{
if (!m_bConnect)
{
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
if (connect())
{
newConnectEvent();
}
}
else {
changeUpEvent();
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
}
void MCSClientV::setPValue(const ::MCS::PValue &pval)
{
if (areaID != pval.areaID)
{
return;
}
unsigned long taskID = ptr_ServiceChain->getTaskIDFromDateTime();
PFrom _pfrom;
if (ptr_CacheDataObj->getFromInfo(static_cast<unsigned long long>(pval.devID)
, static_cast<unsigned int>(pval.pID), _pfrom))
{
float _val = pval.val;
ptr_CacheDataObj->getCValue(static_cast<unsigned long long>(pval.devID)
, static_cast<unsigned int>(pval.pID), _val);
WDS wd(_pfrom.ipLong, OnSet, _pfrom.pID, _pfrom.pType, _val, 0, "ICE_Control_MCS", taskID);
ptr_ReceiveData->addWDS(wd);
CLogger::createInstance()->Log(eControlMessage
, "TaskID[%lu] and down_node[1] setPValue from ICE MCS,time(%s)"
",devID(%ld),pID(%ld),val(%.3f)"
",down_control_map, ip[%s],pID(%d),pType(%d),val(%.3f)"
, taskID
, PFunc::getCurrentTime().c_str()
, pval.devID, pval.pID, pval.val
, _pfrom.ipStr.c_str()
, _pfrom.pID, static_cast<int>(_pfrom.pType), _val);
//
VerificationCache vit;
vit.execTime = PFunc::getCurrentTime("%04d%02d%02dT%02d%02d%02dZ");
vit.taskID = taskID;
vit.taskDesc = "ICE_Control_MCS";
vit.devID = static_cast<unsigned long>(pval.devID);
vit.devDesc = _pfrom.devDesc;
vit.pID = static_cast<unsigned long>(pval.pID);
vit.pDesc = _pfrom.pDesc;
vit.pType = static_cast<unsigned int>(_pfrom.pType);
vit.val = pval.val;
vit.limitTimeForCheck = static_cast<unsigned int>(time(NULL)) + 5;
vit.eway_ = _pfrom.eway;
ptr_VerificationForControlCache->addVerifyData(vit);
}
else {
PValueRet pret(pval.val);
ptr_CacheDataObj->setValue(static_cast<unsigned long long>(pval.devID)
, static_cast<unsigned int>(pval.pID), pret);
CLogger::createInstance()->Log(eControlMessage
, "TaskID[%lu] and down_node[1] setPValue from ICE MCS and down_node[0],time(%s)"
",devID(%ld),pID(%ld),val(%.3f)"
",ditect set val to virtual ponit control"
, taskID, PFunc::getCurrentTime().c_str()
, pval.devID, pval.pID, pret.val_actual);
}
}
bool MCSClientV::FileRefresh(::Ice::Long areaID, ::Ice::Int filetype, const ::std::string &filename)
{
return false;
}
::Ice::Int MCSClientV::nextFileData(::Ice::Long areaID, ::MCS::FileBinary &fdata)
{
return 0;
}
Ice::CommunicatorPtr MCSClientV::communicator()
{
fprintf(stderr, "MCSClientV::communicator()\n");
if (m_ic == NULL)
{
char ** argv;
char *p = NULL;
argv = &p;
int argc = 0;
Ice::InitializationData initData;
initData.properties = Ice::createProperties();
#ifdef ICE_STATIC_LIBS
Ice::registerIceSSL();
#endif
//fprintf(stderr, "load %s start\n", confFile.c_str());
CLogger::createInstance()->Log(eTipMessage
, "load %s start"
, confFile.c_str());
initData.properties->load(confFile);
m_ic = Ice::initialize(argc, argv, initData);
}
return m_ic;
};
bool MCSClientV::connect()
{
if (!m_bConnect)
{
try {
fprintf(stderr, "MCS::MCSServerPrx::checkedCast\n");
if (runType>0)
{
soneway = MCS::MCSServerPrx::checkedCast(
communicator()->propertyToProxy("SyeMcs.Proxy")->ice_twoway()->ice_secure(true));
}
else {
try
{
fprintf(stderr, "checkedCast MCSIO\n");
soneway = MCS::MCSServerPrx::checkedCast(
communicator()->stringToProxy("MCSIO")->ice_twoway()->ice_secure(false));
}
catch (const Ice::NotRegisteredException&)
{
fprintf(stderr, "checkedCast MCSLGrid/Query\n");
IceGrid::QueryPrx query = IceGrid::QueryPrx::checkedCast(communicator()->stringToProxy("MCSLGrid/Query"));
soneway = MCS::MCSServerPrx::checkedCast(
query->findObjectByType("::MCS::MCSIO")->ice_twoway()->ice_secure(false));
}
}
if (!soneway)
{
std::cerr <<"couldn't find a `SyeMcs.Proxy' object." << std::endl;
}
else {
std::cerr <<"find a `SyeMcs.Proxy' object." << std::endl;
//MCS::MCSServerPrx oneway = twoway->ice_oneway();
//MCS::MCSServerPrx batchOneway = twoway->ice_batchOneway();
//MCS::MCSServerPrx datagram = twoway->ice_datagram();
//MCS::MCSServerPrx batchDatagram = twoway->ice_batchDatagram();
Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("");
Ice::Identity ident;
ident.name = IceUtil::generateUUID();
m_strUUID = ident.name;
ident.category = "";
MCS::MCSClientPtr crtwoway = new MCSClientI(this);
adapter->add(crtwoway, ident);
adapter->activate();
soneway->ice_getConnection()->setAdapter(adapter);
::MCS::AreaIDs aids;
aids.push_back(areaID);
//flag client-type client-area-ids-map
soneway->AddClient(ident,(int)1, aids);
m_bConnect = true;
}
}
catch (const Ice::Exception& ex)
{
//fprintf(stderr, "%s\n", ex.ice_id().c_str());
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[1]:%s, [%s %s %d]"
, ex.ice_id().c_str()
, __FILE__, __FUNCTION__, __LINE__);
}
catch (const std::string & msg) {
//fprintf(stderr, "%s\n", msg.c_str());
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[2]:%s, [%s %s %d]"
, msg.c_str()
, __FILE__, __FUNCTION__, __LINE__);
}
catch (const char * msg) {
//fprintf(stderr, "%s\n", msg);
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception[3]:%s, [%s %s %d]"
, msg
, __FILE__, __FUNCTION__, __LINE__);
}
}
return m_bConnect;
};
void MCSClientV::disconnect()
{
if (m_bConnect)
{
m_bConnect = false;
}
if (NULL!=m_ic)
{
try
{
m_ic->destroy();
m_ic = NULL;
}
catch (const Ice::Exception& ex)
{
//fprintf(stderr, "%s\n", ex.ice_id().c_str());
CLogger::createInstance()->Log(eSoftError
, "Ice::Exception:%s, [%s %s %d]"
, ex.ice_id().c_str()
, __FILE__, __FUNCTION__, __LINE__);
}
}
}
void MCSClientV::newConnectEvent()
{
::MCS::Devs _devs;
if (ptr_CacheDataObj->getDevsToSrv(_devs))
{
try {
//::MCS::Areas areas;
::MCS::Area area;
area.areaID = areaID;
area.areaType = (MCS::AreaTypeICE)areaType;
area.name = areaName;
area.desc = areaDesc;
soneway->addArea(area);
//
soneway->addDev(areaID,_devs);
for (::MCS::Devs::const_iterator itdev = _devs.begin(); itdev != _devs.end(); ++itdev)
{
::MCS::PInfos _pinfos;
if (ptr_CacheDataObj->getPInfosToSrv(itdev->devID, _pinfos))
{
soneway->addPInfo(area.areaID,itdev->devID, _pinfos);
}
}
}
catch (...)
{
//printf("addDev or addPInfo Error:%d\n", static_cast<int>(time(NULL)));
CLogger::createInstance()->Log(eSoftError
, "addDev or addPInfo Error:%d, [%s %s %d]"
, static_cast<int>(time(NULL))
, __FILE__, __FUNCTION__, __LINE__);
disconnect();
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
}
}
void MCSClientV::changeUpEvent()
{
WDC wdls;
//
if (ptr_ReceiveData->getFirstWDLS(wdls))
{
::MCS::DateTimeI _itime;
_itime.isec = wdls.evtTimeS;
_itime.imsec = wdls.evtTimeMS;
try
{
//std::cerr << " PValueChange:" << wdlc.devID << "," << wdlc.pID
// << "," << wdlc.val << "," << _itime.isec << "," << _itime.imsec << std::endl;
::MCS::PValue pval;
pval.areaID = areaID;
pval.devID = wdls.devID;
pval.pID = wdls.pID;
pval.itime = _itime;
pval.val = wdls.val;
soneway->PValueChange(pval);
}
catch (...)
{
CLogger::createInstance()->Log(eTipMessage
, "PValueChange Error:%d [%s,%s,%d]"
, static_cast<int>(time(NULL))
, __FILE__, __FUNCTION__, __LINE__);
disconnect();
#ifdef WIN32
Sleep(1000);
#else
usleep(1000000);
#endif
}
if (!ptr_ReceiveData->removeFirstWDLS())
{
CLogger::createInstance()->Log(eTipMessage
, "removeFirstWDLS Error[%s,%s,%d]"
, __FILE__, __FUNCTION__, __LINE__);
}
}
}
下面给出Zero-Ice云端服务的各个配置信息,其实很类似。
application.xml的简要配置:
云端的主从中心服务的配置,测试时以本地为例,
config.master:
从服务的config.slave:
serverMcs的配置config01.node(config02.node):
serverDcs的config01.node(config02.node类似):
配置完成后我们将配置启动脚本,有了本地配置的经验,我同样配置数个脚本,以win系统为例:
(1)start_center_server.bat,用于启动主从中心服务
start /b /MIN "registry11" icegridregistry --Ice.Config=config.master
start /b /MIN "registry12" icegridregistry --Ice.Config=config.slave
(2)start_admin.bat,用于修改更新配置
::如果更改配置,需要重新映射服务,删除数据目录并重新生成或更新,需先启动中心服务,再调用配置服务更新
::start_center_server.bat
icegridadmin --Ice.Config=config.admin -e "application add 'application.xml'"
icegridadmin --Ice.Config=config.admin -e "application update 'application.xml'"
(3)start_server.bat,启动节点服务集,本案例serverMcs和serverDcs有两个节点服务
cd D:\\SYE_MCS_PRO\\pcs_project\\MCS_server\\x64\\McsSrv
start /b /MIN "node11" icegridnode --Ice.Config=config01.node
start /b /MIN "node12" icegridnode --Ice.Config=config02.node
cd D:\\SYE_MCS_PRO\\pcs_project\\DCS_server\\x64\\DcsSrv
start /b /MIN "node21" icegridnode --Ice.Config=config01.node
start /b /MIN "node22" icegridnode --Ice.Config=config02.node
cd D:\\SYE_MCS_PRO\\pcs_project\\sye_mcs\\mgrs
(4)stop_server.bat,关闭服务集使用
taskkill /f /im icegridregistry.exe
taskkill /f /im icegridnode.exe
taskkill /f /im glacier2router.exe
taskkill /f /im MCS_server.exe
taskkill /f /im DCS_server.exe
配置完成,先去配置一些服务需要的一些目录,以本地win服务为例,
如
D:\SYE_MCS_PRO\pcs_project\sye_mcs\mgrs\db\LMDB_master,
D:\SYE_MCS_PRO\pcs_project\sye_mcs\mgrs\db\LMDB_slave,
D:\SYE_MCS_PRO\pcs_project\MCS_server\x64\McsSrv\db\node_21,
D:\SYE_MCS_PRO\pcs_project\MCS_server\x64\McsSrv\db\node_out_21,
D:\SYE_MCS_PRO\pcs_project\DCS_server\x64\DcsSrv\db\node_12,
D:\SYE_MCS_PRO\pcs_project\DCS_server\x64\DcsSrv\db\node_out_12
完成目录配置后
,先start_center_server.bat服务,然后启动start_admin.bat(配置有改动时)和start_server.bat。
当服务启动稳定后,serverPcs服务应该是没有启动的,只有客户端连接需求后才会被触发启动。
serverPcs作为客户端与serverMcs通信,终端app与serverDcs 通信,由于他们的注册主从服务一致,它们的ice配置文件config0.client如下:
现在启动测试样例,先启动本地服务集群的脚本(start_center_server.bat,start_admin.bat(配置有改动时),start_server.bat),再启动云端服务集群的脚本(start_center_server.bat,start_admin.bat(配置有改动时),start_server.bat),然后启动终端app测试,展示类似,只多了一级区域:
![]()
![]()
样例走通后,进行linux编译,然后部署在阿里云的ECS上,进行一些优化和调整,重新设计终端UI,撰写维护使用手册,开通给使用人员。嗯,现在每个被授权人可以通过外网通信实现对现场设备的实时监控。
可是项目还没有完结,领导又有新构想了,远程升级、日志备份云端和远程查阅、语音控制、视频发布等等,估计又要追加一堆微服务了。
嘿,没问题,就是能不能招人或找外包,或加点项目预算啥的。
项目开发还在路上,感觉坑越来越深了。