import datetime
import json
import random
import time
from kafka import KafkaProducer
'''
生产者demo
向branch-event主题中循环写入10条json数据
注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(
value_serializer= lambda v: json. dumps( v) . encode( 'utf-8' ) ,
security_protocol= 'SASL_PLAINTEXT' ,
sasl_mechanism= 'PLAIN' ,
sasl_plain_username= 'kafkadmin' ,
sasl_plain_password= 'xxxxxxxx' ,
bootstrap_servers= [ '10.10.xx.xx:9092' ]
)
def gen ( i) :
""" 生成当前日期和时间戳 """
time_stamp = str ( round ( time. time( ) * 1000 ) - 0 )
time_stamp_format = datetime. datetime. now( ) . strftime( '%Y-%m-%d %H:%M:%S.%f' ) [ : - 3 ]
current_date = datetime. datetime. now( ) . strftime( '%Y.%m.%d' )
dst_ip = str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) )
src_ip = str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) ) + '.' + \
str ( random. randint( 1 , 254 ) )
model_name = f"zhang- { current_date} .cc2 { i} "
""" 原始日志 """
s = {
"dst_device_ip" : "777.77.77.77" ,
"src_device_ip" : "777.77.77.77" ,
"src_device_dept" : " " ,
"eqpt_asset_type" : "/IDS/Network/WAF" ,
"app_protocol" : f"abc { i} " ,
"alarm_times" : "9" ,
"start_time" : f" { time_stamp_format} " ,
"src_account" : model_name + "." + str ( i) ,
"answer_address" : "光谷创新港" ,
"alarm_direction" : "xxgcn" ,
"additional_name" : "www.jw.com" ,
"http_url_externalurl" : f"http://www. { model_name} .com" ,
"http_url_externalurl_domain" : f"www. { model_name} .com" ,
"response_action" : { "alertRestrainAccordingCols" : "" ,
"sinkCols" : "group_array(src_device_ip) as src_ip,group_array(src_device_uuid) as src_device_uuid,group_array(dst_device_ip) as dst_ip,group_array(dst_device_uuid) as dst_device_uuid,group_array(src_device_ip_country) as src_country,group_array(src_device_ip_province) as src_province,group_array(src_device_ip_city) as src_city,group_array(src_port) as src_port,group_array(dst_device_ip_country) as dst_country,group_array(dst_device_ip_province) as dst_province,group_array(dst_device_ip_city) as dst_city,group_array(dst_port) as dst_port,group_array(http_url_externalurl_domain) as dst_domain,group_array(http_url_externalurl) as dst_url,group_array(protocol) as agreement,uuid as uuids,first(A.start_time) as strategy_alert_first_time,first(A.start_time) as strategy_alert_last_time" ,
"sinkStaticInfo" : "[{\"strategy_alert_name\":\"名单过滤-实时告警6.9\"},{\"strategy_att_ck\":\"侦察-搜集主机信息\"},{\"strategy_alert_desc\":\"\"},{\"strategy_risk_score\":5},{\"strategy_alert_category\":\"告警分类\"},{\"strategy_alert_summary\":\"\"}]" ,
"sinkType" : "each" } ,
"response_code" : "standby2-jwwwwwwwwwwwww" ,
"response_data" : f"TotoLink 多款路由器downloadFlilecgi命执行漏洞(CVE-2022-25075--CVE-2022-25083)_ { i} " ,
"file_hash" : "36426c221bfa23180805d78c8421b653" ,
"branch_code" : "xxgcn" ,
"external_alarm_attack" : "漏洞 恶意域名 XRed" ,
"external_alarm_attack_type" : "bbaccb" ,
"attack_ip" : "211.211.211.211" ,
"log_type" : "uum" ,
"result_action" : "用户静态密码错误" ,
"eqpt_vendor" : f"idss_ { i} " ,
"src_port" : f"111 { i} " ,
"dst_port" : f"53" ,
"src_network_domain" : None ,
"dst_network_domain" : None ,
"object_type" : "公共服务" ,
"src_device_vendor" : "联通" ,
"dst_device_type" : "服务器" ,
"src_person" : "zhangxingheng" ,
"dst_person" : "xuqq" ,
"dst_person_name" : "zhangxingheng" ,
"dst_person_status" : "在职" ,
"dst_person_ctpositionname" : "测试" ,
"dst_person_types" : "企业员工" ,
"dst_person_org_name" : "技术部" ,
}
print ( '打印插入数据:' , s)
producer. send( 'zhang_orglog' , s)
if __name__ == '__main__' :
for i in range ( 2 ) :
gen( i)
time. sleep( 1 )
producer. close( )