`timescale 1ns /1ps
//// Company: // Engineer: wengf// Create Date: // Design Name: // Module Name: big_small_cnt// Project Name: // Target Devices: // Tool Versions: // Description: // Dependencies: // Revision:// Revision 0.01 - File Created// Additional Comments:// 通用的大小计数器,输入想要开始的条件,直到条件终止//
module big_small_cnt(
clk ,
rst ,
clk_p ,
start ,//BIG_LENGTH,// 添加
big_cnts ,
small_cnts
);//
parameter BIG_LENGTH=16'd8;
parameter SMALL_LENGTH=8'd11;//
input clk,rst,clk_p,start;//input [15:0] BIG_LENGTH;//添加
output [15:0] big_cnts;
output [7:0] small_cnts;//
reg [7:0] small_cnts;
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
small_cnts<=8'hff;
end
elseif(clk_p)
begin
if(start)//起始位置
begin
small_cnts<=8'd0;
end
elseif((small_cnts==SMALL_LENGTH-1)&&(big_cnts<BIG_LENGTH-1))//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,自动停止
begin
small_cnts<=8'd0;
end
elseif(big_cnts<=BIG_LENGTH-1)//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,结束循环,保持循环达不到的最大值
begin
small_cnts<=small_cnts +1'b1;
end
else
begin
small_cnts<=small_cnts;
end
end
else
begin
small_cnts<=small_cnts;
end
end
//
reg [15:0] big_cnts;
always @ (posedge clk or posedge rst)
begin
if(rst)
begin
big_cnts<=16'hffff;
end
elseif((start)&&(clk_p))//起始位置
begin
big_cnts<=16'd0;
end
elseif((small_cnts==SMALL_LENGTH-1)&&(clk_p))所需长度减1,最大值受最小值控制,按照最小值走几个周期即可。
begin
big_cnts<=big_cnts +1'b1;
end
else
begin
big_cnts<=big_cnts;
end
end
//
endmodule
通用大小计数器,去除 clk_p
`timescale 1ns /1ps
//// Company: // Engineer: wengf// Create Date: // Design Name: // Module Name: big_small_cnt// Project Name: // Target Devices: // Tool Versions: // Description: // Dependencies: // Revision:// Revision 0.01 - File Created// Additional Comments:// 通用的大小计数器,输入想要开始的条件,直到条件终止//
module big_small_cnt(
clk ,
rst ,
start ,
forward_cnts ,
backward_cnts ,
big_cnts ,
small_cnts ,
small_pulse_start,
small_pulse_end,
pulse_end
);
parameter BIG_LENGTH=16'd8;
parameter SMALL_LENGTH=8'd8;
parameter COUNT = BIG_LENGTH * SMALL_LENGTH;
input clk,rst,start;
output [15:0] forward_cnts,backward_cnts;
output [15:0] big_cnts;
output [7:0] small_cnts;
output small_pulse_start;
output small_pulse_end;
output pulse_end;
reg [15:0] forward_cnts,backward_cnts;
reg [15:0] big_cnts;
reg [7:0] small_cnts;
reg small_pulse_start,small_pulse_end,pulse_end;
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
forward_cnts<=16'hffff;
end
elseif(start)//起始位置
begin
forward_cnts<=16'd0;
end
elseif(big_cnts<=BIG_LENGTH-1)//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,结束循环,保持循环达不到的最大值
begin
forward_cnts<=forward_cnts +1'b1;
end
else
begin
forward_cnts<=forward_cnts;
end
end
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
backward_cnts <=16'hffff;
end
elseif(start)//起始位置
begin
backward_cnts <= COUNT-1'b1;
end
elseif(big_cnts<=BIG_LENGTH-1)//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,结束循环,保持循环达不到的最大值
begin
backward_cnts<=backward_cnts-1'b1;
end
else
begin
backward_cnts<=backward_cnts;
end
end
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
small_cnts<=8'hff;
end
elseif(start)//起始位置
begin
small_cnts<=8'd0;
end
elseif((small_cnts==SMALL_LENGTH-1)&&(big_cnts<BIG_LENGTH-1))//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,自动停止
begin
small_cnts<=8'd0;
end
elseif(big_cnts<=BIG_LENGTH-1)//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,结束循环,保持循环达不到的最大值
begin
small_cnts<=small_cnts +1'b1;
end
else
begin
small_cnts<=small_cnts;
end
end
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
small_pulse_start<=1'b0;
end
elseif(start)//起始位置
begin
small_pulse_start<=1'b1;
end
elseif((small_cnts==SMALL_LENGTH-1)&&(big_cnts<BIG_LENGTH-1))//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,自动停止
begin
small_pulse_start<=1'b1;
end
else
begin
small_pulse_start<=1'b0;
end
end
always @ (posedge clk or posedge rst)//小计数器
begin
if(rst)
begin
small_pulse_end <=1'b0;
end
elseif(start)//起始位置
begin
small_pulse_end <=1'b0;
end
elseif(small_cnts==SMALL_LENGTH-2)//所需长度减1,在这里面进行循环,当big_cnt大于该长度后,自动停止
begin
small_pulse_end <=8'd1;
end
else
begin
small_pulse_end <=1'b0;
end
end
always @ (posedge clk or posedge rst)
begin
if(rst)
begin
big_cnts<=16'hffff;
end
elseif(start)//起始位置
begin
big_cnts<=16'd0;
end
elseif(small_cnts==SMALL_LENGTH-1)
begin
big_cnts<=big_cnts +1'b1;
end
else
begin
big_cnts<=big_cnts;
end
end
// pulse_end
always @ (posedge clk or posedge rst)
begin
if(rst)
begin
pulse_end<=1'b0;
end
elseif(start)//起始位置
begin
pulse_end <=1'b0;
end
elseif((small_cnts==SMALL_LENGTH-1)&&(big_cnts==BIG_LENGTH-1))
begin
pulse_end <=1'b1;
end
else
begin
pulse_end <=1'b0;
end
end
endmodule
1.简介
1.1.producer介绍
生产者就是负责向kafka发送消息的应用程序。消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用后才能被真正的发往broker。 demo:
public class Kafk…
之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java结合真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion reposi…