一、整体流程
FileChannel主要是由WAL预写日志和内存队列FlumeEventQueue组成。
二、Transaction
public interface Transaction {
// 描述transaction状态
enum TransactionState { Started, Committed, RolledBack, Closed }
void begin();
void commit();
void rollback();
void close();
}
三、Put()
Put事务发生在source往channel写入数据
// source收集到events后,交由channel处理
getChannelProcessor().processEventBatch(events);
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
events = interceptorChain.intercept(events);
//必需通道
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
//可选通道,可选通道是指在定义 SinkGroup 时可以选择添加的通道,用于将数据从 SinkGroup 中的一个 Sink 传输到另一个 Sink
//没有配置可选通道可忽略。
Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
List<Channel> optChannels = selector.getOptionalChannels(event);
for (Channel ch : optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
//进入必选通道,调用put()
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
//org.apache.flume.channel.BasicChannelSemantics.java
public void put(Event event) throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
//调用transaction对象put()
transaction.put(event);
}
//org.apache.flume.channel.BasicTransactionSemantics.java
protected void put(Event event) {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"put() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"put() called when transaction is %s!", state);
Preconditions.checkArgument(event != null,
"put() called with null event!");
try {
doPut(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}
// org.apache.flume.channel.file.FileBackedTransaction.java
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
if (putList.remainingCapacity() == 0) {
throw new ChannelException("Put queue for FileBackedTransaction " +
"of capacity " + putList.size() + " full, consider " +
"committing more frequently, increasing capacity or " +
"increasing thread count. " + channelNameDescriptor);
}
// this does not need to be in the critical section as it does not
// modify the structure of the log or queue.
if (!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
throw new ChannelFullException("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
+ channelNameDescriptor);
}
boolean success = false;
//doTake,也会获取该锁,所以doTake和doPut只能操作一个,无法同时操作
log.lockShared();
try {
// transactionID在前面创建transaction对象的时候定义的
// 将put事件写入WAL日志,并将数据持久化到磁盘文件
FlumeEventPointer ptr = log.put(transactionID, event);
// 将event pointer放到内存队列putList
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+ channelNameDescriptor);
// 将event也放到inflightPuts中,这是临时数据,因为还没有提交
// 如果还没有提交就直接放到FlumeEventQueue,那么将提前暴露给sink。
queue.addWithoutCommit(ptr, transactionID);
success = true;
} catch (IOException e) {
channelCounter.incrementEventPutErrorCount();
throw new ChannelException("Put failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
if (!success) {
// release slot obtained in the case
// the put fails for any reason
queueRemaining.release();
}
}
}
// org.apache.flume.channel.file.FlumeEventQueue.java
synchronized void addWithoutCommit(FlumeEventPointer e, long transactionID) {
inflightPuts.addEvent(transactionID, e.toLong());
}
public void addEvent(Long transactionID, Long pointer) {
// event放到inflightEvents
inflightEvents.put(transactionID, pointer);
inflightFileIDs.put(transactionID,
FlumeEventPointer.fromLong(pointer).getFileID());
syncRequired = true;
}
InflightPuts 是实际正在传输中的事件集合,Flume 使用 InflightPuts 来跟踪正在传输的事件,只有事务提交了,InflightPuts 里才会清空这次事务的所有pointer数据。InflightPuts 和Flume的checkpoint机制密切相关。
先来看下Flume如何将event持久化到磁盘,将写到"log-"前缀的文件中。log.put(transactionID, event):
// org.apache.flume.channel.file.Log.java
FlumeEventPointer put(long transactionID, Event event)
throws IOException {
Preconditions.checkState(open, "Log is closed");
FlumeEvent flumeEvent = new FlumeEvent(
event.getHeaders(), event.getBody());
//封装Put操作,WAL日志会记录四种操作,分别是Put,Take,Commit和Rollback
Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);
ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
//选择数据目录的数据文件,比如log-1
int logFileIndex = nextLogWriter(transactionID);
long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
long requiredSpace = minimumRequiredSpace + buffer.limit();
if (usableSpace <= requiredSpace) {
throw new IOException("Usable space exhausted, only " + usableSpace +
" bytes remaining, required " + requiredSpace + " bytes");
}
boolean error = true;
try {
try {
// Put事件写入WAL日志文件,Event也就持久化到文件了
// logFileIndex就是数据文件ID,比如log-1文件
FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
error = false;
return ptr;
} catch (LogFileRetryableIOException e) {
if (!open) {
throw e;
}
roll(logFileIndex, buffer);
FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
error = false;
return ptr;
}
} finally {
if (error && open) {
roll(logFileIndex);
}
}
}
再来看下commit,tx.commit():
// org.apache.flume.channel.BasicTransactionSemantics.java
public void commit() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"commit() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"commit() called when transaction is %s!", state);
try {
doCommit();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
// 修改transaction状态
state = State.COMPLETED;
}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if (puts > 0) {
Preconditions.checkState(takes == 0, "nonzero puts and takes "
+ channelNameDescriptor);
log.lockShared();
try {
// commit操作写入WAL日志
log.commitPut(transactionID);
channelCounter.addToEventPutSuccessCount(puts);
synchronized (queue) {
while (!putList.isEmpty()) {
// 将putList的event pointer放到transaction的queue中
// channel的容量就是channel transaction的queue的容量
if (!queue.addTail(putList.removeFirst())) {
StringBuilder msg = new StringBuilder();
msg.append("Queue add failed, this shouldn't be able to ");
msg.append("happen. A portion of the transaction has been ");
msg.append("added to the queue but the remaining portion ");
msg.append("cannot be added. Those messages will be consumed ");
msg.append("despite this transaction failing. Please report.");
msg.append(channelNameDescriptor);
LOG.error(msg.toString());
Preconditions.checkState(false, msg.toString());
}
}
//清除事务ID
//清空inflightPuts
queue.completeTransaction(transactionID);
}
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
}
} else if (takes > 0) {
//省略代码
//......
}
//清空
putList.clear();
takeList.clear();
channelCounter.setChannelSize(queue.getSize());
}
// org.apache.flume.channel.file.FlumeEventQueue
synchronized void completeTransaction(long transactionID) {
//清空inflightPuts
if (!inflightPuts.completeTransaction(transactionID)) {
inflightTakes.completeTransaction(transactionID);
}
}
// org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper
// inflightPuts和inflightTakes都是InflightEventWrapper的实例对象
public boolean completeTransaction(Long transactionID) {
if (!inflightEvents.containsKey(transactionID)) {
return false;
}
//清除inflightPuts的事务ID
//清空inflightPuts的inflightEvents
inflightEvents.removeAll(transactionID);
inflightFileIDs.removeAll(transactionID);
syncRequired = true;
return true;
}
内存队列putList中的event pointer放入到transaction的queue中,并清空了inflightPuts,说明当前put事务已经提交成功了。
四、Take()
Take事务发生在sink从channel拿取数据。
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
//从channel拿取数据
event = channel.take();
......
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to process transaction" , ex);
} finally {
transaction.close();
}
return result;
}
// org.apache.flume.channel.BasicTransactionSemantics.java
protected Event take() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"take() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"take() called when transaction is %s!", state);
try {
return doTake();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for FileBackedTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count. "
+ channelNameDescriptor);
}
log.lockShared();
/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
* then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/
try {
while (true) {
// 从FlumeEventQueue中取出event pointer
// 并将event pointer放到inflightTakes
FlumeEventPointer ptr = queue.removeHead(transactionID);
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
//首先将pointer放入takeList中
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
// take操作写入WAL日志
log.take(transactionID, ptr); // write take to disk
//根据pointer从持久化文件中获取event
Event event = log.get(ptr);
return event;
} catch (IOException e) {
channelCounter.incrementEventTakeErrorCount();
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
LOG.warn("Corrupt record replaced by File Channel Integrity " +
"tool found. Will retrieve next event", e);
takeList.remove(ptr);
} catch (CorruptEventException ex) {
channelCounter.incrementEventTakeErrorCount();
if (fsyncPerTransaction) {
throw new ChannelException(ex);
}
LOG.warn("Corrupt record found. Event will be " +
"skipped, and next event will be read.", ex);
takeList.remove(ptr);
}
}
}
} finally {
log.unlockShared();
}
}
看一下queue.removeHead(transactionID):
synchronized FlumeEventPointer removeHead(long transactionID) {
if (backingStore.getSize() == 0) {
return null;
}
long value = remove(0, transactionID);
Preconditions.checkState(value != EMPTY, "Empty value "
+ channelNameDescriptor);
FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
backingStore.decrementFileID(ptr.getFileID());
return ptr;
}
protected synchronized long remove(int index, long transactionID) {
if (index < 0 || index > backingStore.getSize() - 1) {
throw new IndexOutOfBoundsException("index = " + index
+ ", queueSize " + backingStore.getSize() + " " + channelNameDescriptor);
}
copyCount++;
long start = System.currentTimeMillis();
long value = get(index);
if (queueSet != null) {
queueSet.remove(value);
}
//if txn id = 0, we are recovering from a crash.
if (transactionID != 0) {
//将queue的event pointer加入到inflightTakes中
inflightTakes.addEvent(transactionID, value);
}
if (index > backingStore.getSize() / 2) {
// Move tail part to left
for (int i = index; i < backingStore.getSize() - 1; i++) {
long rightValue = get(i + 1);
set(i, rightValue);
}
set(backingStore.getSize() - 1, EMPTY);
} else {
// Move head part to right
for (int i = index - 1; i >= 0; i--) {
long leftValue = get(i);
set(i + 1, leftValue);
}
set(0, EMPTY);
backingStore.setHead(backingStore.getHead() + 1);
if (backingStore.getHead() == backingStore.getCapacity()) {
backingStore.setHead(0);
}
}
backingStore.setSize(backingStore.getSize() - 1);
copyTime += System.currentTimeMillis() - start;
return value;
}
拿到event后,再来看commit事务:
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if (puts > 0) {
//省略代码
//......
} else if (takes > 0) {
log.lockShared();
try {
log.commitTake(transactionID);
//清除事务ID
//清空inflightTakes
queue.completeTransaction(transactionID);
channelCounter.addToEventTakeSuccessCount(takes);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
}
queueRemaining.release(takes);
}
putList.clear();
takeList.clear();
channelCounter.setChannelSize(queue.getSize());
}
// org.apache.flume.channel.file.FlumeEventQueue
synchronized void completeTransaction(long transactionID) {
if (!inflightPuts.completeTransaction(transactionID)) {
//清空inflightTakes
inflightTakes.completeTransaction(transactionID);
}
}
如果take事务提交,则从transaction的queue中取出了event pointer,并清空inflightTakes和takeList。
transaction.rollback():
protected void doRollback() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
log.lockShared();
try {
if (takes > 0) {
Preconditions.checkState(puts == 0, "nonzero puts and takes "
+ channelNameDescriptor);
synchronized (queue) {
while (!takeList.isEmpty()) {
// 把takeList中的数据放回到transaction的queue中
Preconditions.checkState(queue.addHead(takeList.removeLast()),
"Queue add failed, this shouldn't be able to happen "
+ channelNameDescriptor);
}
}
}
putList.clear();
takeList.clear();
//清除事务ID
//清空inflightTakes
queue.completeTransaction(transactionID);
channelCounter.setChannelSize(queue.getSize());
//rollback操作写入WAL日志
log.rollback(transactionID);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
// since rollback is being called, puts will never make it on
// to the queue and we need to be sure to release the resources
queueRemaining.release(puts);
}
}
take事务回滚时,takeList中的数据重新放回到transaction的queue中,并清空这次事务的inflightTakes