(业务管理)业务数据集成
功能概述
业务数据集成是指MDMServer和外围系统之间的数据交换时需要中间处理的模块应用层。
如图所示,外围系统主要通过三种模式和 MDMServer交互(这里不考虑 ETL):
直接的 webservice 访问–主要通过 http协议联机访问,壹般不通过业务集成层而
直接访问数据服务层
近实时的消息通知–主要通过 MQ消息访问,需要于业务集成层进行必要的处理
批量处理–主要通过文件进行数据传递,需要于业务集成层进行处理
业务集成服务层的主要内容包括:
近实时的消息通知
数据标准代码的转换
交易 xml的拆分
其他规则
批量处理
标准代码的转换
入库方式判断处理,判断是否是新增仍是更改
交易拆分
其他规则
. 联机数据集成(基于 MQ/XML)
联机数据集成主要处理近实时消息通知,处理模式如下图所示。
MQ组件是所有的客户信息更新消息的载体,MsgReceiver 组件负责接收 MQ的消息,通
过 MsgParser组件进行 xml 格式解析,MsgReceiver 转交到 IntegrateFW组件,IntegrateFW
组件是转发框架,负责业务集成层的基本控制,根据报文的类型调用必要的规则处理进行数
据转换或者信息拆分,然后交由具体逻辑处理实现模块 IntegrateImpl完成业务集成逻辑,
且最终通过 ServiceInvocation 模块调用 MDM的服务,完成交易。
数据服务层主要包括扩展服务和组合服务。
如上图,根据业务需求,壹般性的顺序如下详述:
1ejbCreate–MQ 中的信息通知 WAS容器,根据 MsgReceiver 组件的 MDB 情况,创建壹个
实例进入池态;
2onMessage–WAS 容器调用 MDB的 onMessage 方法,且把消息作为参数传递到该方法;
–onMessage方法中调用 IntegrateFW组件的 deliver方法,且传递消息;
–IntegrateFW 组件调用 MsgParser组件解析传入的消息体;
–MsgParser 组件根据消息体解析,生成壹个 IBObj对象;
–parse 方法返回生成的 IBObj对象;
–获取 IBObj 对象的服务类型;
–返回字符串结果;
–获取 IBObj 的请求者;
–返回;
–IntegrateFW根据服务对象和请求者,实例化壹个 IntegrateImpl
组件中集成服务实例;
–调用具体实例的业务逻辑,且把 IBObj 作为参数传递;
–调用标准代码转换规则,进行必要规则处理;
–返回规则处理后的 IBObj对象;
–调用客户识别规则,进行识别
–调用返回
–于具体的业务逻辑处理中,根据具体需要调用 MsgParser创
建需要的 IBObj对象;
–MsgParser 构建新的 IBObj实例;
–返回具体的 IBObj实例;
–处理具体的 IBObj 实例内容;
–处理内容返回;
–处理完所有的 IBObj实例内容后,调用方法转换为 xml 标准服
务格式;
–返回 xml字节流;
–调用 MDM服务;
–返回;
–完成具体业务集成逻辑实例的执行;
–完成整个业务集成逻辑的执行;
注:
到 之间可能有多次的 callRule和 invoke 调用;
到 的调用需要包括于事务处理中;
批量处理的业务集成层设计参见《批量型》章节。
. MsgReceiver组件
MsgReceiver 组 件 主 要 由 IntegrateMDB组 成 , IntegrateMDB 是 壹 个
MessageDriverBean,通过 onMessage(Message)方法接收 MQ的消息通知。
信息接收组件主要负责从 MQ接收消息,参数 Message使用 BytesMessage类型,消息格
式为 XML形式,其具体的接口定义参见《服务接口定义》中的关联通过 MQ接口部分。
接口主要包括:
个险–增加客户(addPerson/CSCMQ)
个险–保全修改客户(updatePerson/CSCMQ)
电商网上销售–增加客户(addPerson/TOLMQ)
电商网上销售–修改客户基本信息(updatePersonBase/TOLMQ)
电商网上销售–保全修改客户(updatePerson/TOLMQ)
注:之上括号中的服务名是指各个业务系统提交到 MQ中的消息服务名,而不是 MDM调
用的标准服务名;
组件调用 IntegrateFW 组件进行后续处理。
publicvoidonMessage(Messagemsg){
……
if(msginstanceofBytesMessage){
BytesMessagebm=(BytesMessage)msg;
longlength=();
byte[]bs=newbyte[length];
(bs);
BytesInputStreambis=newBytesInputStream(bs);
(bis);
}else{
……
}
……
}
. IntegrateFW组件
IntegrateFW组件是框架组件,负责业务集成层的控制调度。组件负责调用 MsgParser
组件,解析 xml报文,然后根据报文类型调用具体的控制处理逻辑,同时调用业务处理规则
进行数据和业务处理,最后形成处理后的报文提交 ServiceInvocation组件调用
MDMService,完成整个处理,同时如果于处理中发生异常,则交由 IntegException组件处
理异常结果。
IntegrateFW组件中对于多个 service调用作为同壹事务来管理,如果调用出错需要进
行回滚处理。
IntegrateFW组件暴露壹个接口 IntegrateLogic供 IntegrateImpl组件实现,其接口方
法如下。
/**
*执行具体的业务集成任务.用于 MQ 异步通知模式
*于 IntegrateImpl 组件实现该方法,处理具体某壹项接口的逻辑调用,壹般包括
*代码转换、服务拆分、服务变换、DSP判断等
*@paramis输入流,是通过 MQ 接收到的消息内容,xml 字节流
*@throwsItegrateException例外,如果发生例外需要到例外组件处理
*/
publicvoidexecute(IBobjobj)throwsItegrateException;
组件对外调用的 Façade接口调用是 IntegratFW类,使用静态方法。
/**
*传递到 IntegrateFW 组件执行后续任务.用于 MQ异步通知模式
*@paramis输入流,是通过 MQ 接收到的消息内容,xml 字节流
*/
publicstaticvoiddeliver(InputStreamis);
. IntegrateImpl组件
IntegrateImpl 组件是根据报文类型确定的具体的业务集成逻辑的实现。
壹期需求主要包括:
个险增加客户
个险修改客户基本信息
个险保全修改客户信息
网上销售增加客户
参见个险增加客户,只是其接口内容稍有差异
网上销售修改客户基本信息
参见个险修改客户基本信息,只是其接口内容稍有差异
网上销售保全修改客户信息
参见个险保全修改客户信息,只是其接口内容稍有差异
此组件需要处理地址、电话、电子邮件、客户标识等的格式变换,是把输入的对象格式
变为标准的 MDM扩展服务格式,包括:
从粗粒度服务格式转换为 updatePersonName服务格式
转换为changePartyAddress服务格式–服务中确定具体调用correctPartyAddress
仍是 addPartyAddress仍是不做处理;
转 换 为 changePartyContactMethod服 务 格 式 –服 务 中 定 具 体 调 用
updatePartyContactMethod仍是 addPartyContactMethod仍是不做处理;
从粗粒度服务格式转换为 changePartyIdentifer服务格式–服务中定具体调用
updatePartyIdentifer 仍是 addPartyIdentifer仍是不做处理;
具体接口参见《服务接口》和 MDM 开发文档。
服务逻辑参见关联于线服务组合服务设计;
. MsgParser组件
信息解析主要是根据 xml 报文解析其内容。
MsgParser组件根据不同的报文类型解析不同的内容。
MsgParser组件仍负责输出变更后的报文结果。
MsgParser是暴露于外的 Façade调用界面,通过俩个个方法接收外部调用。
/**
*解析 xml.
*@paramis输入流,是通过 MQ 接收到的消息内容,xml 字节流
*/
publicstaticIBObjparse(InputStreamis);
/**
*把 IBObj 根据类型输出为标准的 MDMservice报文.
*@paramobj输入的 IBObj
*@return返回壹个字节数组,组成 xml,使用标准的 iso-8859-1 格式
*/
publicstaticbyte[]toServiceXml(IBObjobj);
/**
*根据名称构建新的 IBObj对象.
*@paramname输入的 IBObj 名称
*@return返回 IBObj对象
*/
publicstaticIBObjconstructIBObj(Stringname);
IBObj是用于定义报文的 bean,类似 MDM中的 BObj。
IBObj主要定义 xml 中的 bobj 对象,IBObj内部能够嵌套。其暴露的接口如下。
/**
*设置 IBObj 的类型.是指对象类型,如 TCRMPersonBObj、
*TCRMAdminContEquivBObj、TCRMPartyAddressBObj、TCRMAddressBObj等。
*/
publicvoidsetType(Stringname);
publicStringgetType();
/**
*设置服务的类型.
*/
publicvoidsetTCRMTxType(Stringtype);
pubicStringgetTCRMTxType();
/**
*设置服务的对象类型.
*/
publicvoidsetTCRMTxObject(Stringobj);
publicStringgetTCRMTxObject();
/**
*设置 IBObj对象.
*/
publicvoidsetIBObj(IBObjobj);
/**
*根据名字获取 IBObj对象.只能获取下壹级对象.
*/
publicIBObj[]getIBObj(Stringtype);
publicStringgetAttribute(Stringtype);
/**
*设置属性.自动区分处理 Extension 属性.
*/
publicStringsetAttribute(Stringtype,Stringvalue);
/**
*设置服务头.
*/
publicvoidsetHeader(Stringheader);
publicStringgetHeader();
/**
*设置请求控制部分.
*/
publicvoidsetRequestControl(Stringrc);
publicStringgetRequestControl();
/**
*设置请求控制部分的请求名称.
*/
publicvoidsetRequestName(Stringrn);
publicStringgetRequestName();
/**
*设置请求控制部分的 LOB.
*/
publicvoidsetRequestLOB(Stringrlob);
publicStringgetRequestLOB();
. DSPRule组件
处理 DSP规则,具体参见 DSP 设计中于线可疑客户识别。
. CDRule组件
处理标准代码的转换,根据 requestName来判断来源,且把源系统标准代码转换为 MDM
标准代码。
CDRule组件由 CDPool从数据库中装载标准代码数据映射。映射关系是各个源系统指向
MDM。
文件是每个源系统接口中的需要转换的代码定义,其格式如下:
CSCMQ=GenderType|HighestEducationType|
e|…
TOLMQ=GenderType|…
格式以 Key=Value方式存放,key 为源系统提交的 requestName,而值是以’|’分割的
多个数据域,每个数据域均需要进行标准代码替换。如果该数据域是于接口的更底层,
以’.’作为路径分割,如 表示是于该对象中的
TCRMAdminContEquivBObj 对象下的 AdminSystemType 需要进行代码转换。
具体的实现类 CDRule 则根据输入 IBObj 对象和相应数据定义进行代码转换任务。
RuleFW是对外暴露的调用类,提供静态方法。
/**
*规则调用处理.
*@paramtype规则类型,根据此类型确定是调用哪个规则实现
*@paramobj输入输出对象,规则实现对其进行处理,且形成结果返回
*/
publicstaticvoidcallRule(Stirngtype,IBObjobj)throwsIntegrateException;
壹期需要进行的代码转换为:
个险
M 男 M 男
F 女 F 女
GenderType
U 未知 U 未知
0 身份证
00 其它
1 参字第
10 后字第
11 空文字第
12 北文字
13 护照
14 装字第
TCRMPartyIdentificationBOb
j
.IdentificationType
15 北文字第
1 本人
2 丈夫
3 妻子
4 父亲
5 母亲
RelationshipType
6 儿子
'01' '文盲'
'02' '小学'
'03' '初中'
'04' '高中'
'05' '大专'
'06' '本科'
'07' '研究生及之
上'
'08' '中专'
HighestEducationType
未知
0 未婚 0 未婚
1 已婚 1 已婚
2 离婚 2 离婚
MaritalStatusType
3 鳏寡 3 鳏寡
B 单位地址 2 单位地址
P 邮递地址 3 邮递地址
AddressUsageType
R 家庭地址 1 家庭地址
电商网上销售
Commented [liyw1]: 等待标准代码标对应完毕后补
充
银保
养老金
团险
. Utils组件
Utils组件是工具类组件,主要包括
服务调用组件,负责进行 MDM 服务的调用
通过 IIOP方式访问 MDM的 EJB(DWLServiceController)来处理 webservice格式的请求,
具体请参考MDMworkbench中的。
数据库访问
. IntegException组件
例外处理的组件,如果处理过程中有例外,则需要记录例外的原因和状态,且把该服务
请求 xml保存到数据库。
例外后需要保存的内容如下:
错误流水号 ERRLOGID BigInt PK,自增型
时间 LOG_DT Timestamp
错误返回消息 ERRMESSAGE Varchar(255
)
()
或者业务逻辑错误说明,
如找不到该客户等
错误提交消息 REQUESTXML Xml字段
错误堆栈同样需要记录到 log4j 日志中。
. 批量数据集成(基于批量/XML)
批量处理的主要内容类似业务数据集成章节的近实时处理部分,主要是 FileParser模
块管理各个业务系统上传的批量文件,且调用 MsgParser模块解析具体的数据内容,同样,
根据规则处理各个业务逻辑规则,然后形成具体的批量文件,且调用 MDM的 BatchProcessor
进行批量处理。
具体模块关系如下图所示。
批量处理的部分将会复用较多的近实时处理部分的业务组件,包括:
MsgParser 组件
IntegException组件
CDRule组件
1detectFile–守护线程,检查各个系统相应目录下的上穿文件是否 ready;
2Constructor–读入文件,形成文件输入流和输出流,且调用批量处理,根据源系统类
型实例化具体的业务逻辑处理实例,传递输入流;
3readOneService–从输入流读取壹个 service块;
4readOneService–返回读取的 service 块;
5parse–调用 MsgParser 把 servie块的 xml 格式解析成为壹个 IBObj;
6parse–返回 IBObj;
7callRule–调用代码转换 rule,转换客户证件类型、地址类型、联系类型为 MDM标准
代码;
8callRule–调用返回;
9writeTmp–把客户关键信息写入数据库,内容参见临时表定义;
10writeTmp–返回;
之上从 3到 10循环处理,直到文件输入流到尾端;
11resetFile–把文件输入流重新定位到开始;
12resetFile–返回重新定位后的文件输入流;
13callRule–调用批量客户识别的规则处理;
–根据规则处理结果更新临时表,确定客户增加、修改类型以及客户的
地址、联系、证件类型更改类型;
–处理临时表结束;
14callRule–调用规则返回;
15readOneService–从文件输入流读入壹个 servicexml块;
16readOneService–返回 service 块;
17parse–调用 MsgParser 解析为壹个 IBObj;
18parse–返回 IBObj;
19readTmp–读入壹条和 IBObj匹配的临时表记录;
20readTmp–读入返回数据;
21callRule–调用格式转换规则处理,根据临时表类型标志,转换 IBObj的内容为标准
服务的 IBObj;
22callRule–返回转换完毕的 IBObj;
23toServiceXml–把 IBObj转换为标准的 xml格式;
24toServiceXml–xml结果;
25writeXml2File–把 xml结果写入文件输出流;
26writeXml2File–返回;
之上 15到 26循环处理,直到文件处理完毕;
–关闭相应资源,调用 MDM的 BatchProcessor 进行批量处理,完成后处
理相应输入输出及中间文件;
. 批量处理接口
批量处理主要包括三个外部系统接口:
银保增加客户
养老金增加客户
团险增加客户
具体接口格式参见《服务接口》
外部系统输出符合接口规范的数据文件,且 FTP到规定的目录,具体参见《服务接口》
中“批量接口模式”。
. FileParser组件
FileParser组件是解析文件的处理,其主要处理客户的上传数据文件,同时新建输出结
果文件。
解析整体文件格式;包括输入输出,输出文件每个 servicexml只能是壹行;
FileParser调用 IntegrateFW 组件以处理不同的业务集成逻辑。
FileParser组件暴露壹个抽象类 IntegrateBatchLogic供 IntegrateImpl 组件实现,其
抽象方法如下。
/**
*执行具体的业务集成任务.用于批量模式
*于 IntegrateImpl 组件实现该方法,处理具体某壹项接口的逻辑调用,壹般包括
*代码转换、服务拆分、服务变换、DSP判断等
*@paramis输入流,是通过 MQ 接收到的消息内容,xml 字节流
*@throwsItegrateException例外,如果发生例外需要到例外组件处理
*/
publicabstractvoidexecuteBatch(InputStreamis,OutputStreamos)throwsItegrateE
xception;
于 IntegrateBatchLogic 中仍实现必要方法:
resetFile–重置文件指针到文件头;
callBatchSDP–调用批量客户识别规则;
callRule–调用代码转换规则;
getService–获取文件的下壹个 service块;
callMDMBatch–调用 MDM 的
. IntegrateImpl组件
具体的业务集成逻辑实现,三个不同源系统的增加客户业务集成逻辑相同,其具体内容
格式有所差异。
具体逻辑处理如下图。
. 批量客户识别匹配规则
批量客户识别匹配规则是指考虑执行效率而进行的客户识别匹配程序,其匹配规则和于
线完全相同,但处理方式变成批量,规则参考如下:
统壹客户管理平台进行客户识别的关键数据包括:客户名称+出生日期、证件类型+证件
编号以及客户性别。为了提高系统的客户识别能力和信息的准确度,加快业务处理的速度,
系统针对用于客户识别的关键数据进行了有效性规定,符合下列规定的信息才能够作为客户
识别的依据:
客户名称不允许为空
证件编号长度至少 8 位。
不满足上述有效性规则的客户信息会被加入统壹客户管理平台,但不会进行客户识别。
注:规则以于线匹配规则为准,请参考关联设计文档以确认规则壹致性;
客户匹配的主体是 MDM 数据库的 contact/person/identifier表;
另外进行匹配的表是临时表:tmp_contact
Sql参考:
--先根据客户号和源系统 id进行匹配
updatetmp_contacttset
(MATCH_TP
,MATCH_PARTYID
,MATCH_PARTYUPDT
,MATCH_PERSONUPDT
,PERSONNAMEID
,PERSONNAME_UPDT
)
=
(select
1
,_id
,_update_dt
,_update_dt
,_name_id
,_update_dt
from
contactc
,personp
,tmp_contactt
,contequive
,personnamepn
where
_id=_id
_tp=_SYS_TP_CD
_custno=_CLIENT_ID
_id=_id
_id=_id
_usage_tp_cd=1
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
whereexists
(select
1
from
contactc
,personp
,tmp_contactt
,contequive
,personnamepn
where
_id=_id
_tp=_SYS_TP_CD
_custno=_CLIENT_ID
_id=_id
_id=_id
_usage_tp_cd=1
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
;
--根据证件类型证件号码客户姓名客户生日进行匹配
updatetmp_contacttset
(MATCH_TP
,MATCH_PARTYID
,MATCH_PARTYUPDT
,MATCH_PERSONUPDT
,PERSONNAMEID
,PERSONNAME_UPDT
)
=
(select
1
,_id
,_update_dt
,_update_dt
,_name_id
,_update_dt
from
contactc
,personp
,tmp_contactt
,identifieri
,personnamepn
where
_id=_id
_id=_id
_tp_cd=_tp_cd
_num=_num
andlength(rtrim(ltrim(_num)))>7
_name=_name
_nameisnotnull
_name<>’’
and((_TP_CODEisnull)_T
P_CODE=_TP_CODE)
andchar(_dt,ISO)=substr(_dt,1,10)
_tp=2
_id=_id
_usage_tp_cd=1
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
whereexists
(_id
from
contactc
,personp
,tmp_contactt
,identifieri
,personnamepn
where
_id=_id
_id=_id
_tp_cd=_tp_cd
_num=_num
andlength(rtrim(ltrim(_num)))>7
_name=_name
_nameisnotnull
_name<>’’
and((_TP_CODEisnull)_TP_
CODE=_TP_CODE)
andchar(_dt,ISO)=substr(_dt,1,10)
_tp=2
_id=_id
_usage_tp_cd=1
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
;
--根据匹配结果查找需要修改的证件号码
updatetmp_contacttset
(IDENT_MATCH_TP
,IDENTIFIR_ID
,IDENTIFIR_UPDT
)=
(select
1
,IDENTIFIER_ID
,LAST_UPDATE_DT
from
tmp_contactt
,identifieri
where
_partyid=_id
_tp_cd=_tp_cd
_num<>_num
and(_dt>currenttimestamp)
)
whereexists
(select
1
from
tmp_contactt
,identifieri
where
_partyid=_id
_tp_cd=_tp_cd
_num<>_num
and(_dt>currenttimestamp)
);
--根据匹配结果查找需要修改的地址
updatetmp_partyaddressset
(ADDR_UP_TP
,PARTYADDRESSID
,PARTYADDRESSUPDT
,LOCATIONGROUPUPDT
)=
(select
2
,_group_id
,_update_dt
,_update_dt
from
locationgroupl
,adderssgroupa
,tmp_contactt
,tmp_partyaddresspa
where
_group_id=_group_id
_id=_partyid
_tp=_tp
_custno=_custno
_USAGE_TP_CD=_USAGE_TP_CD
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
whereexists
(select
1
from
locationgroupl
,adderssgroupa
,tmp_contactt
,tmp_partyaddresspa
where
_group_id=_group_id
_id=_partyid
_tp=_tp
_custno=_custno
_USAGE_TP_CD=_USAGE_TP_CD
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
);
--根据匹配结果查找需要修改的联系
updatetmp_partycontactmethodset
(CONT_METH_UP_TP
,PARTYCONTMETHID
,PARTYCONTMETHUPDT
,LOCATIONGROUPUPDT
)=
(select
2
,_group_id
,_update_dt
,_update_dt
from
locationgroupl
,contactmethodgroupa
,tmp_contactt
,tmp_partycontactmethodpa
where
_group_id=_group_id
_id=_partyid
_tp=_tp
_custno=_custno
_METH_TP_CD=_METH_TP_CD
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
)
whereexists
(select
1
from
locationgroupl
,adderssgroupa
,tmp_contactt
,tmp_partyaddresspa
where
_group_id=_group_id
_id=_partyid
_tp=_tp
_custno=_custno
_METH_TP_CD=_METH_TP_CD
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
);
. 临时表定义
tmp_Contact
源系统类型 LOB_TP BigInt PK
客户号 LOB_CUSTNO Varchar(50) PK
客户姓名 PERSON_NAME Varchar(50)
客户证件类型 ID_TP_CD BigInt
客户证件号码 REF_NUM Varchar(50)
客户生日 BIRTH_DT Date
客户性别 GENDER_TP_CODE BigInt
匹配类型 MATCH_TP Int 1–根据源系统客
户号匹配
2–根据证件类型
号码等匹配,
3–不匹配
缺省为 3
匹配 partyId MATCH_PARTYID Bigint
匹配 partyUpdateDT MATCH_PARTYUPDT Timestamp
匹配 personUpdateDT MATCH_PERSONUPDT Timestamp
客户姓名 ID PERSONNAMEID Bigint
客户姓名更新时间 PERSONNAME_UPDT Timestamp
证件更新标志 IDENT_MATCH_TP Int 1–更新证件
2–不更新
缺省为 2
证件 id IDENTIFIR_ID Bigint
证件更新时间 IDENTIFIR_UPDT Timestamp
需要考虑旧客户的证件号码更新;
tmp_PartyAddress
源系统类型 LOB_TP BigInt PK
客户号 LOB_CUSTNO Varchar(50) PK
地址类型 ADDR_USAGE_TP_CD BigInt PK
地址修改类型 ADDR_UP_TP Int 1–新增
2–更新
缺省为 1
客户地址 id PARTYADDRESSID Bigint
客户地址 updatedt PARTYADDRESSUPDT Timestamp
LocationGroupupdt LOCATIONGROUPUPDT Timestamp
tmp_PartyContactMethod
源系统类型 LOB_TP BigInt PK
客户号 LOB_CUSTNO Varchar(50) PK
联系类型 CONT_METH_TP_CD BigInt PK
联系修改类型 CONT_METH_UP_TP Int 1–新增
2–更新
缺省为 1
客户联系 id PARTYCONTMETHID Bigint
客户联系 updatedt PARTYCONTMETHUPDT Timestamp
LocationGroupupdt LOCATIONGROUPUPDT Timestamp
灰色区域是原始写入客户信息,白色区域是匹配后的更新内容;
. 批量型
. 客户匹配 1-遗漏未匹配客户处理
遗漏未匹配客户是指于上壹个批量期间由于批量处理时间窗口内实时、近实时增加客户
时和批量客户可能的合且情况,由于时间的差错导致形成 2 条不同记录的情况,同时这种情
况由于且发情况下不能完全锁定 DSP处理的串行化从而可能出现的情况。
遗漏未匹配客户处理是针对上述情况的客户合且处理,其条件为:
新增客户–created_dt为新增;
符 合 匹 配 条 件
–_name/_dt/_tp_cd/
ef_num/_TP_CODE相同;
Sql参考:
select
,_id
,_name
,_dt
,_TP_CODE
,_tp_cd
,_num
,_dt
from
contactc
,personp
,identifieri
,(select
,_name
,substr(_dt,1,10)asbirth_dt
,_tp_cd
,_num
,_TP_CODE
from
contactc
,personp
,identifieri
where
_id=_id
_id=_id
_name<>’’
andlength(rtrim(ltrim(_num)))>7
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
groupby
_name
,substr(_dt,1,10)
,_TP_CODE
,_tp_cd
,_num
havingcount(1)>1
)astmp
where
_id=_id
_id=_id
and((_TP_CODEisnull)
_TP_CODE=_TP_CODE)
_tp_cd=_tp_cd
_num=_num
_name=_anme
_nameisnotnull
_name<>’’
andsubstr(_dt,1,10)=_dt
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
and(_dt>currenttimestamp)
orderby
,_name
,_dt
,_tp_cd
,_num
_TP_CODE
_dt
;
and(_dt>currenttimestamp)合且规则为:
以 created_dt 时间最早者保留;
根据时间排序获取后续 party 对象;
按 照
updatePersonBase/updatePersonName/updatePartyContactMethod/correctParty
Address/updatePartyIdentificaton进行更新;
增加原来的客户关系和等价键等信息;
把后续的 partyInactive;
注:不考虑客户的保单地址和联系,因为此批量于保单 ETL之前
业务组件能够复用,GreenBatch 组件处理批量流程逻辑,如下图。
临时表定义
Ctl_increment
增量类型 INCREMENT_TP Varchar(50) PK
上次增量时间 INCREMENT_DT Timestamp
描述 DESCRIBE Varchar(255)
内容定义:
Evergreen增 量 控 制 :
insertintoctl_increment(increment_tp,increment_dt)values(‘EVERGREEN’,currentt
imestamp);