需求:通过私有的api我可以不断收到byte[]形式的视频数据,现在我需要处理这些数据,最终推送出RTMP流。
实现:通过管道流将不断收到的byte[]视频数据转化为输入流然后提供给JavaCV的FFmpegFrameGrabber使用,然后通过FFmpegFrameRecorder将视频数据推送至指定RTMP服务器(这个通过mediamtx实现)。
效果图
VLC播放
关键依赖
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>javacv-platform</artifactId>
<version>1.5.9</version>
</dependency>
完整的Demo代码
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.avcodec.AVCodecParameters;
import org.bytedeco.ffmpeg.avformat.AVFormatContext;
import org.bytedeco.ffmpeg.avformat.AVStream;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.FFmpegLogCallback;
import org.bytedeco.javacv.Frame;
import org.jfjy.ch2ji.ecctv.dh.api.ApiService;
import org.jfjy.ch2ji.ecctv.dh.callback.RealPlayCallback;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_DEBUG;
import static org.bytedeco.ffmpeg.global.avutil.AV_LOG_INFO;
@Slf4j
public class GetBytes2PipedStreamAndPushRTMP {
private static final String SRS_PUSH_ADDRESS = "rtmp://127.0.0.1:1935/live/livestream";
static int BUFFER_CAPACITY = 1024 * 1024;
public static void main(String[] args) throws Exception {
FFmpegLogCallback.set();
FFmpegLogCallback.setLevel(AV_LOG_DEBUG);
ApiService apiService = new ApiService();
Long login = apiService.login("10.3.0.54", 8801, "admin", "xxxx");
PipedInputStream inputStream = new PipedInputStream(BUFFER_CAPACITY);
PipedOutputStream outputStream = new PipedOutputStream(inputStream);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
List<byte[]> bytesArray = new ArrayList<>();
apiService.startRealPlay(new RealPlayCallback<Long, Integer, byte[]>() {
@Override
public void apply(Long aLong, Integer integer, byte[] bytes) {
log.info("收到视频数据,类型:{},字节:{}", integer, bytes.length);
//之所以没在这里写入管道流,是因为每次回调都会创建新的线程,而管道流要求只能在一个线程中写入,否则会出错。
//所以这里把数据丢给了集合对象
synchronized (bytesArray) {
bytesArray.add(bytes);
}
}
}, 0, 0);
try {
while (true) {
synchronized (bytesArray) {
//将视频数据写入管道流
for (byte[] bytes : bytesArray) {
outputStream.write(bytes);
}
outputStream.flush();
bytesArray.clear();
}
Thread.sleep(100);
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
boolean isStartPush = false;
log.info("推送数据线程启动");
while (true) {
try {
//当管道输入流有数据后则开始推送,只需要调用一次
if (!isStartPush && inputStream.available() > 0) {
log.info("推送任务开始执行| available size : {}",inputStream.available());
grabAndPush(inputStream, SRS_PUSH_ADDRESS);
}
Thread.sleep(500);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
while (true) {
}
}
private static synchronized void grabAndPush(InputStream inputStream, String pushAddress) throws Exception {
avutil.av_log_set_level(AV_LOG_INFO);
FFmpegLogCallback.set();
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(inputStream,0);
grabber.setFormat("dhav");
grabber.startUnsafe();
AVFormatContext avFormatContext = grabber.getFormatContext();
int streamNum = avFormatContext.nb_streams();
if (streamNum < 1) {
log.error("no media!");
return;
}
int frameRate = (int) grabber.getVideoFrameRate();
if (0 == frameRate) {
frameRate = 15;
}
log.info("frameRate[{}],duration[{}]秒,nb_streams[{}]",
frameRate,
avFormatContext.duration() / 1000000,
avFormatContext.nb_streams());
for (int i = 0; i < streamNum; i++) {
AVStream avStream = avFormatContext.streams(i);
AVCodecParameters avCodecParameters = avStream.codecpar();
log.info("stream index[{}],codec type[{}],codec ID[{}]", i, avCodecParameters.codec_type(), avCodecParameters.codec_id());
}
int frameWidth = grabber.getImageWidth();
int frameHeight = grabber.getImageHeight();
int audioChannels = grabber.getAudioChannels();
log.info("frameWidth[{}],frameHeight[{}],audioChannels[{}]",
frameWidth,
frameHeight,
audioChannels);
FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(pushAddress,
frameWidth,
frameHeight,
audioChannels);
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setInterleaved(true);
recorder.setFormat("flv");
recorder.setFrameRate(frameRate);
recorder.setGopSize(frameRate);
recorder.setAudioChannels(grabber.getAudioChannels());
recorder.start();
Frame frame;
log.info("start push");
int videoFrameNum = 0;
int audioFrameNum = 0;
int dataFrameNum = 0;
int interVal = 1000 / frameRate;
interVal /= 8;
while (null != (frame = grabber.grab())) {
if (null != frame.image) {
videoFrameNum++;
}
if (null != frame.samples) {
audioFrameNum++;
}
if (null != frame.data) {
dataFrameNum++;
}
recorder.record(frame);
Thread.sleep(interVal);
}
log.info("push complete,videoFrameNum[{}],audioFrameNum[{}],dataFrameNum[{}]",
videoFrameNum,
audioFrameNum,
dataFrameNum);
recorder.close();
grabber.close();
}
}