+
Skip to content

feat(inkless): add persistent storage for infinispan #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.log

import io.aiven.inkless.cache.InfinispanCache

import java.lang.{Long => JLong}
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
Expand Down Expand Up @@ -461,6 +463,8 @@ class LogManager(logDirs: Seq[File],
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
// Ignore inkless-cache directory as that is a cache maintained by the inkless subsystem
!logDir.getName.equals(InfinispanCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.nio.file.Path
import java.time.Duration
import java.util
import java.util.Optional
Expand Down Expand Up @@ -354,6 +355,7 @@ class BrokerServer(
inklessMetadataView,
controlPlane,
brokerTopicStats,
Path.of(config.logDirs().get(0)),
() => logManager.currentDefaultConfig
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
Expand All @@ -32,12 +33,15 @@
import org.infinispan.stats.Stats;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;

import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;

public class InfinispanCache implements ObjectCache {
public static final String DIR_NAME = "inkless-cache";

// Length of time the object is "leased" to the caller if not already present in the map
private static final int CACHE_WRITE_LOCK_TIMEOUT_MS = 10000;
Expand All @@ -51,43 +55,90 @@ public class InfinispanCache implements ObjectCache {

private InfinispanCacheMetrics metrics;

public static InfinispanCache build(Time time, String clusterId, String rack, long maxCacheSize) {
final InfinispanCache infinispanCache = new InfinispanCache(time, clusterId, rack, maxCacheSize);
public static InfinispanCache build(
Time time,
String clusterId,
String rack,
long maxCacheSize,
Path basePath,
boolean isPersistent,
long lifespanSeconds,
int maxIdleSeconds
) {
final InfinispanCache infinispanCache = new InfinispanCache(time, clusterId, rack, maxCacheSize, basePath, isPersistent, lifespanSeconds, maxIdleSeconds);
infinispanCache.initializeMetrics();
return infinispanCache;
}

private InfinispanCache(Time time, String clusterId, String rack, long maxCacheSize) {
public InfinispanCache(
Time time,
String clusterId,
String rack,
long maxCacheSize,
Path basePath,
boolean isPersistent,
long lifespanSeconds,
int maxIdleSeconds
) {
this.time = time;
this.maxCacheSize = maxCacheSize;
GlobalConfigurationBuilder globalConfig = GlobalConfigurationBuilder.defaultClusteredBuilder();
final String clusterName = clusterName(clusterId, rack);
globalConfig.transport()
.clusterName(clusterName(clusterId, rack))
.clusterName(clusterName)
.addProperty("configurationFile", "jgroups-udp.xml"); // Set bind port to 0
globalConfig.serialization()
.addContextInitializers()
.marshaller(new KafkaMarshaller())
.allowList().addClasses(CacheKey.class, FileExtent.class);
cacheManager = new DefaultCacheManager(globalConfig.build());
.addContextInitializers()
.marshaller(new KafkaMarshaller())
.allowList().addClasses(CacheKey.class, FileExtent.class);
this.cacheManager = new DefaultCacheManager(globalConfig.build());
ConfigurationBuilder config = new ConfigurationBuilder();
config.statistics().enable();
config.clustering()
.cacheMode(CacheMode.DIST_SYNC)
.memory()
.cacheMode(CacheMode.DIST_SYNC);
config.memory()
.storage(StorageType.HEAP)
.maxCount(maxCacheSize)
.whenFull(EvictionStrategy.REMOVE);
cache = cacheManager.administration()
.withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
.getOrCreateCache("fileExtents", config.build());
backoff = new ExponentialBackoff(1, CACHE_WRITE_BACKOFF_EXP_BASE, CACHE_WRITE_BACKOFF_EXP_BASE, CACHE_WRITE_BACKOFF_JITTER);
// There is no explicit way to define how much space the cache can use on disk,
// there are only two proxies: lifespan (fixed time to keep an entry)
// and maxIdle (how long to keep it without being accessed).
// Lifespan is the only that can guarantee that the cache will not grow indefinitely.
// To estimate the maximum disk usage use maximum buffer size and number of uploading threads.
// e.g. 6MB buffer size * 10 threads = 60MB maximum disk usage per sec.
// 5 minutes lifespan = 60MB * 300 seconds = 18GB maximum disk usage.
// Lifespan is enforced, but maxIdle can be disabled (it is by default).
config.expiration()
// maximum time an entry can live in the cache (fixed)
.lifespan(lifespanSeconds, TimeUnit.SECONDS)
// maximum time an entry is idle (not accessed) before it is expired
// entries expire based on either lifespan or maxIdle, whichever happens first
// when disabled and only lifespan is used
.maxIdle(maxIdleSeconds, TimeUnit.SECONDS)
// how often the cache checks for expired entries
.wakeUpInterval(5, TimeUnit.SECONDS);
if (isPersistent) {
// Prepare the cache directory within a known location. Index and data directories are created within this directory.
final Path cacheBasePath = cachePersistenceDir(basePath);
config.persistence()
.passivation(true)
.addSoftIndexFileStore()
.shared(false)
.purgeOnStartup(true)
.dataLocation(cacheBasePath.resolve("data").toAbsolutePath().toString())
.indexLocation(cacheBasePath.resolve("index").toAbsolutePath().toString());
}
this.cache = cacheManager.administration()
.withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
.getOrCreateCache("fileExtents", config.build());
this.backoff = new ExponentialBackoff(1, CACHE_WRITE_BACKOFF_EXP_BASE, CACHE_WRITE_BACKOFF_EXP_BASE, CACHE_WRITE_BACKOFF_JITTER);
}

private void initializeMetrics() {
this.metrics = new InfinispanCacheMetrics(this);
}

private static String clusterName(String clusterId, String rack) {
static String clusterName(String clusterId, String rack) {
// To avoid cross-rack traffic, include rack in the cluster name
// Clusters with different names don't share data or storage
return "inkless-" + clusterId + (rack != null ? "-" + rack : "" );
Expand Down Expand Up @@ -119,7 +170,7 @@ public FileExtent get(CacheKey key) {

@Override
public void put(CacheKey key, FileExtent value) {
cache.put(key, value, 1L, TimeUnit.MINUTES);
cache.put(key, value);
}

@Override
Expand All @@ -140,10 +191,27 @@ public long maxCacheSize() {
public void close() throws IOException {
cache.clear();
cacheManager.close();
metrics.close();
if (metrics != null) metrics.close();
}

public Stats metrics() {
return cache.getAdvancedCache().getStats();
}

static Path cachePersistenceDir(Path baseDir) {
final Path p = baseDir.resolve(DIR_NAME);
if (!Files.exists(p)) {
try {
return Files.createDirectories(p);
} catch (IOException e) {
throw new ConfigException("Failed to create cache directories", e);
}
} else if (!Files.isDirectory(p)) {
throw new ConfigException("Cache persistence directory is not a directory: " + p);
} else if (!Files.isWritable(p)) {
throw new ConfigException("Cache persistence directory is not writable: " + p);
}
return p;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.function.Supplier;

import io.aiven.inkless.cache.FixedBlockAlignment;
Expand Down Expand Up @@ -57,6 +58,7 @@ public static SharedState initialize(
MetadataView metadata,
ControlPlane controlPlane,
BrokerTopicStats brokerTopicStats,
Path logDir,
Supplier<LogConfig> defaultTopicConfigs
) {
return new SharedState(
Expand All @@ -68,7 +70,16 @@ public static SharedState initialize(
config.storage(),
ObjectKey.creator(config.objectKeyPrefix(), config.objectKeyLogPrefixMasked()),
new FixedBlockAlignment(config.fetchCacheBlockBytes()),
InfinispanCache.build(time, clusterId, rack, config.cacheMaxCount()),
InfinispanCache.build(
time,
clusterId,
rack,
config.cacheMaxCount(),
logDir,
config.isCachePersistenceEnabled(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
),
brokerTopicStats,
defaultTopicConfigs
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,25 @@ public class InklessConfig extends AbstractConfig {
private static final int CONSUME_CACHE_BLOCK_BYTES_DEFAULT = 16 * 1024 * 1024; // 16 MiB

public static final String CONSUME_CACHE_MAX_COUNT_CONFIG = CONSUME_PREFIX + "cache.max.count";
private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory.";
private static final String CONSUME_CACHE_MAX_COUNT_DOC = "The maximum number of objects to cache in memory. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bad config, counting number of objects is worse than counting memory.
Here or in a follow-up we should switch to max-size rather than max-count, but that might need us to change encoding/marshalling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. When using size-based config, cache creation fails with:

ISPN000504: Size (bytes) based eviction needs either off-heap or a binary compatible storage configured in the cache encoding

So, I think you're right; this will require further changes to be supported. Let's look into this in a follow-up

"If the cache exceeds this limit, and the cache persistence is enabled, " +
"the least recently used objects will be persisted to disk and removed from memory.";
private static final int CONSUME_CACHE_MAX_COUNT_DEFAULT = 1000;

public static final String CONSUME_CACHE_PERSISTENCE_ENABLE_CONFIG = CONSUME_PREFIX + "cache.persistence.enable";
private static final String CONSUME_CACHE_PERSISTENCE_ENABLE_DOC = "Enable cache persistence to disk. " +
"If this is not set, the cache will not be persisted to disk. " +
"If this is set, the cache will be persisted to disk when it exceeds the maximum count limit.";

public static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG = CONSUME_PREFIX + "cache.expiration.lifespan.sec";
private static final String CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DOC = "The lifespan in seconds of a cache entry before it will be removed from all storages.";
private static final int CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DEFAULT = 60; // Defaults to 1 minute

public static final String CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_CONFIG = CONSUME_PREFIX + "cache.expiration.max.idle.sec";
private static final String CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_DOC = "The maximum idle time in seconds before a cache entry will be removed from all storages. " +
"-1 means disabled, and entries will not be removed based on idle time.";
private static final int CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_DEFAULT = -1; // Disabled by default

public static final String RETENTION_ENFORCEMENT_INTERVAL_MS_CONFIG = "retention.enforcement.interval.ms";
private static final String RETENTION_ENFORCEMENT_INTERVAL_MS_DOC = "The interval with which to enforce retention policies on a partition. " +
"This interval is approximate, because each scheduling event is randomized. " +
Expand Down Expand Up @@ -242,6 +258,29 @@ public static ConfigDef configDef() {
ConfigDef.Importance.LOW,
CONSUME_CACHE_MAX_COUNT_DOC
);
configDef.define(
CONSUME_CACHE_PERSISTENCE_ENABLE_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
CONSUME_CACHE_PERSISTENCE_ENABLE_DOC
);
configDef.define(
CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG,
ConfigDef.Type.INT,
CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DEFAULT,
ConfigDef.Range.atLeast(10), // As it checks every 5 seconds, and the object lock timeout is 10 secs.
ConfigDef.Importance.LOW,
CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_DOC
);
configDef.define(
CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_CONFIG,
ConfigDef.Type.INT,
CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_DEFAULT,
ConfigDef.Range.atLeast(-1),
ConfigDef.Importance.LOW,
CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_DOC
);
configDef.define(
PRODUCE_UPLOAD_THREAD_POOL_SIZE_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -330,6 +369,18 @@ public Long cacheMaxCount() {
return getLong(CONSUME_CACHE_MAX_COUNT_CONFIG);
}

public boolean isCachePersistenceEnabled() {
return getBoolean(CONSUME_CACHE_PERSISTENCE_ENABLE_CONFIG);
}

public int cacheExpirationLifespanSec() {
return getInt(CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG);
}

public int cacheExpirationMaxIdleSec() {
return getInt(CONSUME_CACHE_EXPIRATION_MAX_IDLE_SEC_CONFIG);
}

public int produceUploadThreadPoolSize() {
return getInt(PRODUCE_UPLOAD_THREAD_POOL_SIZE_CONFIG);
}
Expand Down
Loading
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载