SAP的并行方式有很多种:
-
SPTA框架,参考debug可以看出这个核心也是异步bgRFC
-
异步RFC,使用CALL FUNCTION “XXXXXX” STARTING NEW TASK XXXX CALLING XXXX ON END OF TASK
-
BANK_PP_JOBCTRL框架
-
拆分成多个后台JOB执行
这里只说SPTA框架,既然是框架就一定已经实现的基础功能,我们只需要在框架中加入要实现的逻辑和配置一些参数就可以实现多线程并行处理。
优化实例
下面是一个项目中的优化实例:
背景:整车接口,包含几块主要逻辑,
Step1 创建物料主数据-MM01
Step2 修改特性值-MM02
Step3 关联SBOM-CS40
Step4 分配工艺路线-CA02
Step5 创建生产版本-C223
每一辆车的VMC号接口过来都要走这几步,原接口程序的逻辑就是循环串行执行。
比如一共100个VMC,逻辑运行过程就是:
12345
12345
…
12345
12345,
如此执行100次。
耗时非常长,导致在数据量大时,JOB要执行一整晚,会影响到其它JOB的执行。
优化过程
有几点考虑:
- step1 和step2分别是对物料的操作,不同的VMC之间不存在互锁,所以可以用并行处理
- step3关联的是同一个super BOM,只能顺序执行
- step4有函数可以实现批量分配,也就是一次执行可以处理所有数据
- step5不同VMC之间不存在互锁,也可以并行处理
所以优化后的逻辑是:
5. 将step1和step2放到一个form里进行并行处理。
6. 上一步全部数据处理完成后成功的数据进入Step3, 循环处理
7. 上一步全部数据处理完成后成功的数据进入Step4,一次执行全部数据
8. 上一步处理完成后成功的数据进行并行处理。
所以优化后整个过程是:
12 12 12 12 12 12 12 12 12 12(始终保持10个进程在处理前两步)
3
4
4
…
4
4
5 5 5 5 5 5 5 5 5 5(始终保持10个进程在处理step5)
大概是这个意思:
优化前后的数据对比(72台车,相同数据):
时间是原来的1/50。
主要代码:
**********************************************************************
*step 1 + 2 Parallel process
**********************************************************************
* 使用SPTA框架拆包进行多进程处理
* BEFORE_RFC: 拆包FORM
* IN_RFC: 拆包后执行此FORM,将逻辑打包到这个form里
* AFTER_RFC: IN_RFC执行完调用此FORM,用来收集执行结果
**********************************************************************
"参数准备
CALL 'C_SAPGPARAM' "#EC CI_CCALL
ID 'NAME' FIELD 'rdisp/myname'
ID 'VALUE' FIELD lv_sergroup.
SELECT SINGLE classname
INTO lv_rfcgroup
FROM rzllitab
WHERE grouptype = 'S'
AND applserver = lv_sergroup.
IF sy-subrc <> 0.
MESSAGE 'Server Group Read Error. Contact Administrator.' TYPE 'E'.
ENDIF.
lv_repid = sy-repid.
IF p_max > 0.
lv_maxtasks = p_max.
ELSE.
lv_maxtasks = 10.
ENDIF.
LOOP AT gt_data INTO DATA(ls_data) WHERE mtype <> 'E'.
DATA(lv_tabix) = sy-tabix.
APPEND ls_data TO lt_data.
DELETE gt_data INDEX lv_tabix.
ENDLOOP.
IF lt_data[] IS NOT INITIAL.
CALL FUNCTION 'SPTA_PARA_PROCESS_START_2'
EXPORTING
server_group = lv_rfcgroup " SERVER_GROUP created from RZ12
max_no_of_tasks = lv_maxtasks "最大进程数
before_rfc_callback_form = 'BEFORE_RFC'
in_rfc_callback_form = 'IN_RFC'
after_rfc_callback_form = 'AFTER_RFC'
callback_prog = lv_repid
* SHOW_STATUS = ' '
* RESOURCE_TIMEOUT = 600
* TASK_CALL_MODE = 1
CHANGING
user_param = lt_data
EXCEPTIONS
invalid_server_group = 1
no_resources_available = 2
OTHERS = 3.
IF sy-subrc <> 0.
MESSAGE 'Error. Contact Administrator.' TYPE 'E'.
ENDIF.
"将前两步处理的结果再汇总到GT_DATA
APPEND LINES OF gt_final TO gt_data.
ENDIF.
**********************************************************************
**step3: cs40 LINK SBOM
**********************************************************************
CLEAR lt_data.
CLEAR gt_final.
LOOP AT gt_data INTO ls_data WHERE mtype <> 'E' AND zzt = '2'.
lv_tabix = sy-tabix.
APPEND ls_data TO lt_data.
DELETE gt_data INDEX lv_tabix.
ENDLOOP.
IF lt_data IS NOT INITIAL.
PERFORM frm_cs40_batch TABLES lt_data.
APPEND LINES OF lt_data TO gt_data.
ENDIF.
**********************************************************************
**step4: CA02
**********************************************************************
LOOP AT gt_data INTO ls_data WHERE mtype <> 'E' AND zzt = '3'.
lv_tabix = sy-tabix.
CLEAR lv_subrc.
CLEAR lv_str.
*& 4. assign routing
" MOVE-CORRESPONDING ls_data TO ls_ca02.
MOVE-CORRESPONDING ls_data TO it_data.
PERFORM frm_assign_routing USING it_data CHANGING lv_subrc lv_str.
IF lv_subrc <> 0.
ls_data-icon = gc_red.
ls_data-zzt = 'E'."数据不完整
ls_data-mtype = /sapmp/if_char_1=>ce.
CONCATENATE ls_data-msg lv_str INTO ls_data-msg. "分配工艺路线失败:
MODIFY gt_data FROM ls_data INDEX lv_tabix.
CONTINUE.
ELSE.
ls_data-zzt = '4'."分配工艺路线成功
MODIFY gt_data FROM ls_data INDEX lv_tabix.
ENDIF.
ENDLOOP.
**********************************************************************
**step5: C223
**********************************************************************
* 使用SPTA框架拆包进行多进程处理
* BEFORE_RFC: 拆包FORM
* IN_RFC_C223: 拆包后执行此FORM,将逻辑打包到这个form里
* AFTER_RFC: IN_RFC执行完调用此FORM,用来收集执行结果
**********************************************************************
CLEAR lt_data.
CLEAR gt_final.
LOOP AT gt_data INTO ls_data WHERE mtype <> 'E' AND zzt = '4'.
lv_tabix = sy-tabix.
APPEND ls_data TO lt_data.
DELETE gt_data INDEX lv_tabix.
ENDLOOP.
IF lt_data[] IS NOT INITIAL.
CALL FUNCTION 'SPTA_PARA_PROCESS_START_2'
EXPORTING
server_group = lv_rfcgroup " SERVER_GROUP created from RZ12
max_no_of_tasks = lv_maxtasks "最大进程数
before_rfc_callback_form = 'BEFORE_RFC'
in_rfc_callback_form = 'IN_RFC_C223'
after_rfc_callback_form = 'AFTER_RFC'
callback_prog = lv_repid
* SHOW_STATUS = ' '
* RESOURCE_TIMEOUT = 600
* TASK_CALL_MODE = 1
CHANGING
user_param = lt_data
EXCEPTIONS
invalid_server_group = 1
no_resources_available = 2
OTHERS = 3.
IF sy-subrc <> 0.
MESSAGE 'Error. Contact Administrator.' TYPE 'E'.
ENDIF.
"将前两步处理的结果再汇总到GT_DATA
APPEND LINES OF gt_final TO gt_data.
ENDIF.
FORM before_rfc USING p_before_rfc_imp TYPE spta_t_before_rfc_imp
CHANGING
p_before_rfc_exp TYPE spta_t_before_rfc_exp
pt_rfcdata TYPE spta_t_indxtab
p_failed_objects TYPE spta_t_failed_objects
p_objects_in_process TYPE spta_t_objects_in_process
p_user_param .
DATA: ld_task_data TYPE t_rfcdata,
ld_obj_in_process LIKE LINE OF p_objects_in_process.
DATA:1_counter TYPE i,
1_no_of_items TYPE i,
lt_data TYPE STANDARD TABLE OF yvms_parallel_s,
lt_package TYPE yvms_parallel_t,
wa_data TYPE yvms_parallel_s.
*定义一个Package size,比如10,如果总数据量为65,屏幕输入max 进程为3,则会先开3个进程,处理前30条数据,每结束一个进行再新开一个进程
*处理下一个10条,始终保持开最多3个进程
CLEAR ld_obj_in_process.
lt_data = p_user_param.
*Create small work packets for parallel processing
1_no_of_items = 10.
"One work packet vill contain 10 records
LOOP AT lt_data INTO wa_data.
*Read and move specified number of records
IF ( 1_counter < 1_no_of_items AND wa_data IS NOT INITIAL ).
APPEND wa_data TO lt_package.
DELETE lt_data INDEX 1.
1_counter = 1_counter + 1.
ELSE.
EXIT.
ENDIF.
ENDLOOP.
p_user_param = lt_data.
IF lt_package IS INITIAL.
CLEAR p_before_rfc_exp-start_rfc.
EXIT.
ENDIF.
* Convert the input data into the INDX structure that is needed for the RFC
CALL FUNCTION 'SPTA_INDX_PACKAGE_ENCODE'
EXPORTING
data = lt_package
IMPORTING
indxtab = pt_rfcdata.
* Inform task manager that an RFC can be started from the
* data compiled
p_before_rfc_exp-start_rfc = 'X'.
ENDFORM.
FORM in_rfc
USING is_in_rfc_imp TYPE spta_t_in_rfc_imp
CHANGING es_in_rfc_exp TYPE spta_t_in_rfc_exp
it_rfcdata TYPE spta_t_indxtab.
DATA: lt_package TYPE yvms_parallel_t.
* decode the data from the INDX Structure into the process work list
CALL FUNCTION 'SPTA_INDX_PACKAGE_DECODE'
EXPORTING
indxtab = it_rfcdata
IMPORTING
data = lt_package.
* repack output data for AFTER_RFC form
PERFORM frm_package_process TABLES lt_package.
CALL FUNCTION 'SPTA_INDX_PACKAGE_ENCODE'
EXPORTING
data = lt_package
IMPORTING
indxtab = it_rfcdata.
COMMIT WORK.
ENDFORM.
FORM in_rfc_c223
USING is_in_rfc_imp TYPE spta_t_in_rfc_imp
CHANGING es_in_rfc_exp TYPE spta_t_in_rfc_exp
it_rfcdata TYPE spta_t_indxtab.
DATA: lt_package TYPE yvms_parallel_t.
* decode the data from the INDX Structure into the process work list
CALL FUNCTION 'SPTA_INDX_PACKAGE_DECODE'
EXPORTING
indxtab = it_rfcdata
IMPORTING
data = lt_package.
* repack output data for AFTER_RFC form
PERFORM frm_package_process_c223 TABLES lt_package.
CALL FUNCTION 'SPTA_INDX_PACKAGE_ENCODE'
EXPORTING
data = lt_package
IMPORTING
indxtab = it_rfcdata.
COMMIT WORK.
ENDFORM.
FORM after_rfc USING it_rfcdata TYPE spta_t_indxtab
if_rfcsubrc TYPE sy-subrc
if_rfcmsg TYPE spta_t_rfcmsg
it_objects_in_process TYPE spta_t_objects_in_process
is_after_rfc_imp TYPE spta_t_after_rfc_imp
CHANGING es_after_rfc_exp TYPE spta_t_after_rfc_exp
cs_user_param TYPE yvms_parallel_t.
DATA: lt_final TYPE yvms_parallel_t.
* uppack RFC output data and add RFC reulsts to global data
CALL FUNCTION 'SPTA_INDX_PACKAGE_DECODE'
EXPORTING
indxtab = it_rfcdata
IMPORTING
data = lt_final.
APPEND LINES OF lt_final TO gt_final.
ENDFORM.
有了框架,程序变得很简单也很清楚,我们只需要重点关注in_rfc_callback_form这个回调form ‘IN_RFC’和’IN_RFC_C223‘(因为我两次使用了并行,这里分开做了处理,其实写到一起也是可以的)