Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@
<artifactId>bookkeeper-proto</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-slogger-slf4j</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-slogger-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-tools-framework</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;

import io.github.merlimat.slog.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.File;
Expand All @@ -34,7 +35,6 @@
import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.OpStatsLogger;

/**
Expand All @@ -43,7 +43,7 @@
public abstract class DirectCompactionEntryLog implements CompactionEntryLog {
protected final int srcLogId;
protected final int dstLogId;
protected final Slogger slog;
protected final Logger log;

protected final File compactingFile;
protected final File compactedFile;
Expand All @@ -57,10 +57,10 @@ static CompactionEntryLog newLog(int srcLogId,
BufferPool writeBuffers,
NativeIO nativeIO,
ByteBufAllocator allocator,
Slogger slog) throws IOException {
Logger log) throws IOException {
return new WritingDirectCompactionEntryLog(
srcLogId, dstLogId, ledgerDir, maxFileSize,
writeExecutor, writeBuffers, nativeIO, allocator, slog);
writeExecutor, writeBuffers, nativeIO, allocator, log);
}

static CompactionEntryLog recoverLog(int srcLogId,
Expand All @@ -71,46 +71,52 @@ static CompactionEntryLog recoverLog(int srcLogId,
NativeIO nativeIO,
ByteBufAllocator allocator,
OpStatsLogger readBlockStats,
Slogger slog) {
Logger log) {
return new RecoveredDirectCompactionEntryLog(srcLogId, dstLogId, ledgerDir, readBufferSize,
maxSaneEntrySize, nativeIO, allocator, readBlockStats, slog);
maxSaneEntrySize, nativeIO, allocator, readBlockStats, log);
}

private DirectCompactionEntryLog(int srcLogId,
int dstLogId,
File ledgerDir,
Slogger slog) {
Logger log) {
compactingFile = compactingFile(ledgerDir, dstLogId);
compactedFile = compactedFile(ledgerDir, dstLogId, srcLogId);
completeFile = DirectEntryLogger.logFile(ledgerDir, dstLogId);

this.srcLogId = srcLogId;
this.dstLogId = dstLogId;

this.slog = slog.kv("dstLogId", dstLogId).kv("srcLogId", srcLogId).ctx(DirectCompactionEntryLog.class);
this.log = Logger.get(DirectCompactionEntryLog.class).with()
.ctx(log)
.attr("dstLogId", dstLogId)
.attr("srcLogId", srcLogId)
.build();
}

@Override
public void abort() {
try {
Files.deleteIfExists(compactingFile.toPath());
} catch (IOException ioe) {
slog.kv("compactingFile", compactingFile).warn(Events.COMPACTION_ABORT_EXCEPTION, ioe);
log.warn().exception(ioe).attr("compactingFile", compactingFile)
.log("Compaction aborted");
}

try {
Files.deleteIfExists(compactedFile.toPath());
} catch (IOException ioe) {
slog.kv("compactedFile", compactedFile).warn(Events.COMPACTION_ABORT_EXCEPTION, ioe);
log.warn().exception(ioe).attr("compactedFile", compactedFile)
.log("Compaction aborted");
}
}


@Override
public void makeAvailable() throws IOException {
idempotentLink(compactedFile, completeFile);
slog.kv("compactedFile", compactedFile).kv("completeFile", completeFile)
.info(Events.COMPACTION_MAKE_AVAILABLE);
log.info().attr("compactedFile", compactedFile).attr("completeFile", completeFile)
.log("Making compacted log available");
}

private static void idempotentLink(File src, File dst) throws IOException {
Expand All @@ -132,15 +138,17 @@ public void finalizeAndCleanup() {
try {
Files.deleteIfExists(compactingFile.toPath());
} catch (IOException ioe) {
slog.kv("compactingFile", compactingFile).warn(Events.COMPACTION_DELETE_FAILURE, ioe);
log.warn().exception(ioe).attr("compactingFile", compactingFile)
.log("Failed to delete compaction artifact");
}

try {
Files.deleteIfExists(compactedFile.toPath());
} catch (IOException ioe) {
slog.kv("compactedFile", compactedFile).warn(Events.COMPACTION_DELETE_FAILURE, ioe);
log.warn().exception(ioe).attr("compactedFile", compactedFile)
.log("Failed to delete compaction artifact");
}
slog.info(Events.COMPACTION_COMPLETE);
log.info("Compaction complete");
}

@Override
Expand Down Expand Up @@ -168,15 +176,15 @@ private static class RecoveredDirectCompactionEntryLog extends DirectCompactionE
NativeIO nativeIO,
ByteBufAllocator allocator,
OpStatsLogger readBlockStats,
Slogger slog) {
super(srcLogId, dstLogId, ledgerDir, slog);
Logger log) {
super(srcLogId, dstLogId, ledgerDir, log);
this.allocator = allocator;
this.nativeIO = nativeIO;
this.readBufferSize = readBufferSize;
this.maxSaneEntrySize = maxSaneEntrySize;
this.readBlockStats = readBlockStats;

this.slog.info(Events.COMPACTION_LOG_RECOVERED);
this.log.info("Recovered partially-flushed compaction log");
}

private IllegalStateException illegalOpException() {
Expand Down Expand Up @@ -223,16 +231,16 @@ private static class WritingDirectCompactionEntryLog extends DirectCompactionEnt
BufferPool writeBuffers,
NativeIO nativeIO,
ByteBufAllocator allocator,
Slogger slog) throws IOException {
super(srcLogId, dstLogId, ledgerDir, slog);
Logger log) throws IOException {
super(srcLogId, dstLogId, ledgerDir, log);

this.writer = new WriterWithMetadata(
new DirectWriter(dstLogId, compactingFile.toString(), maxFileSize,
writeExecutor, writeBuffers, nativeIO, slog),
writeExecutor, writeBuffers, nativeIO, log),
new EntryLogMetadata(dstLogId),
allocator);

this.slog.info(Events.COMPACTION_LOG_CREATED);
this.log.info("Created compaction log");
}

@Override
Expand All @@ -251,13 +259,13 @@ public void markCompacted() throws IOException {

idempotentLink(compactingFile, compactedFile);
if (!compactingFile.delete()) {
slog.kv("compactingFile", compactingFile)
.kv("compactedFile", compactedFile)
.info(Events.COMPACTION_DELETE_FAILURE);
log.info().attr("compactingFile", compactingFile)
.attr("compactedFile", compactedFile)
.log("Failed to delete compaction artifact");
} else {
slog.kv("compactingFile", compactingFile)
.kv("compactedFile", compactedFile)
.info(Events.COMPACTION_MARK_COMPACTED);
log.info().attr("compactingFile", compactingFile)
.attr("compactedFile", compactedFile)
.log("Marked compaction log as compacted");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import io.github.merlimat.slog.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -56,15 +57,14 @@
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.LedgerDirUtil;

/**
* DirectEntryLogger.
*/
public class DirectEntryLogger implements EntryLogger {
private final Slogger slog;
private final Logger log;
private final File ledgerDir;
private final EntryLogIds ids;
private final ExecutorService writeExecutor;
Expand Down Expand Up @@ -99,7 +99,7 @@ public DirectEntryLogger(File ledgerDir,
int readBufferSize,
int numReadThreads,
int maxFdCacheTimeSeconds,
Slogger slogParent,
Logger logParent,
StatsLogger stats) throws IOException {
this.ledgerDir = ledgerDir;
this.flushExecutor = flushExecutor;
Expand All @@ -112,7 +112,10 @@ public DirectEntryLogger(File ledgerDir,
this.maxSaneEntrySize = maxSaneEntrySize;
this.readBufferSize = Buffer.nextAlignment(readBufferSize);
this.ids = ids;
this.slog = slogParent.kv("directory", ledgerDir).ctx(DirectEntryLogger.class);
this.log = Logger.get(DirectEntryLogger.class).with()
.ctx(logParent)
.attr("directory", ledgerDir)
.build();

this.stats = new DirectEntryLoggerStats(stats);

Expand All @@ -129,36 +132,38 @@ public DirectEntryLogger(File ledgerDir,
// then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
// from the cache
if (perThreadBufferSize < readBufferSize) {
slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)")
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("totalNumReadThreads", numReadThreads)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.error(Events.ENTRYLOGGER_MISCONFIGURED);
log.error()
.attr("reason",
"perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)")
.attr("totalReadBufferSize", totalReadBufferSize)
.attr("totalNumReadThreads", numReadThreads)
.attr("readBufferSize", readBufferSize)
.attr("perThreadBufferSize", perThreadBufferSize)
.log("Entry logger misconfigured");
}

long maxCachedReadersPerThread = perThreadBufferSize / readBufferSize;
long maxCachedReaders = maxCachedReadersPerThread * numReadThreads;

this.slog
.kv("maxFileSize", maxFileSize)
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("totalWriteBufferSize", totalWriteBufferSize)
.kv("singleWriteBufferSize", singleWriteBufferSize)
.kv("totalReadBufferSize", totalReadBufferSize)
.kv("readBufferSize", readBufferSize)
.kv("perThreadBufferSize", perThreadBufferSize)
.kv("maxCachedReadersPerThread", maxCachedReadersPerThread)
.kv("maxCachedReaders", maxCachedReaders)
.info(Events.ENTRYLOGGER_CREATED);
log.info()
.attr("maxFileSize", maxFileSize)
.attr("maxSaneEntrySize", maxSaneEntrySize)
.attr("totalWriteBufferSize", totalWriteBufferSize)
.attr("singleWriteBufferSize", singleWriteBufferSize)
.attr("totalReadBufferSize", totalReadBufferSize)
.attr("readBufferSize", readBufferSize)
.attr("perThreadBufferSize", perThreadBufferSize)
.attr("maxCachedReadersPerThread", maxCachedReadersPerThread)
.attr("maxCachedReaders", maxCachedReaders)
.log("Entry logger created");

this.caches = ThreadLocal.withInitial(() -> {
RemovalListener<Integer, LogReader> rl = (notification) -> {
try {
notification.getValue().close();
this.stats.getCloseReaderCounter().inc();
} catch (IOException ioe) {
slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR);
log.error().attr("logID", notification.getKey()).log("Failed to close entry log reader");
}
};
Cache<Integer, LogReader> cache = CacheBuilder.newBuilder()
Expand Down Expand Up @@ -190,7 +195,7 @@ public long addEntry(long ledgerId, ByteBuf buf) throws IOException {
curWriter = new WriterWithMetadata(newDirectWriter(newId),
new EntryLogMetadata(newId),
allocator);
slog.kv("newLogId", newId).info(Events.LOG_ROLL);
log.info().attr("newLogId", newId).log("Rolling to new log file");
}

offset = curWriter.addEntry(ledgerId, buf);
Expand Down Expand Up @@ -378,7 +383,8 @@ public boolean removeEntryLog(long entryLogId) {
checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
File file = logFile(ledgerDir, (int) entryLogId);
boolean result = file.delete();
slog.kv("file", file).kv("logId", entryLogId).kv("result", result).info(Events.LOG_DELETED);
log.info().attr("file", file).attr("logId", entryLogId).attr("result", result)
.log("Log file deleted");
return result;
}

Expand All @@ -402,8 +408,8 @@ public EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompacto
try {
return readEntryLogIndex(entryLogId);
} catch (IOException e) {
slog.kv("entryLogId", entryLogId).kv("reason", e.getMessage())
.info(Events.READ_METADATA_FALLBACK);
log.info().attr("entryLogId", entryLogId).attr("reason", e.getMessage())
.log("Falling back to scanning log for metadata");
return scanEntryLogMetadata(entryLogId, throttler);
}
}
Expand Down Expand Up @@ -450,7 +456,7 @@ LogReader newDirectReader(int logId) throws IOException {
private LogWriter newDirectWriter(int newId) throws IOException {
unflushedLogs.add(newId);
LogWriter writer = new DirectWriter(newId, logFilename(ledgerDir, newId), maxFileSize,
writeExecutor, writeBuffers, nativeIO, slog);
writeExecutor, writeBuffers, nativeIO, log);
ByteBuf buf = allocator.buffer(Buffer.ALIGNMENT);
try {
Header.writeEmptyHeader(buf);
Expand All @@ -475,7 +481,7 @@ public CompactionEntryLog newCompactionLog(long srcLogId) throws IOException {
int dstLogId = ids.nextId();
return DirectCompactionEntryLog.newLog((int) srcLogId, dstLogId, ledgerDir,
maxFileSize, writeExecutor, writeBuffers,
nativeIO, allocator, slog);
nativeIO, allocator, log);
}

@Override
Expand All @@ -490,7 +496,7 @@ public Collection<CompactionEntryLog> incompleteCompactionLogs() {
try {
Files.deleteIfExists(f.toPath());
} catch (IOException ioe) {
slog.kv("file", f).warn(Events.COMPACTION_DELETE_FAILURE);
log.warn().attr("file", f).log("Failed to delete compaction artifact");
}
}

Expand All @@ -503,7 +509,7 @@ public Collection<CompactionEntryLog> incompleteCompactionLogs() {
readBufferSize, maxSaneEntrySize,
nativeIO, allocator,
stats.getReadBlockStats(),
slog));
log));
}
}
}
Expand Down
Loading
Loading