+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2451,8 +2451,7 @@ project(':storage:inkless') {
exclude group: 'com.google.guava', module: 'listenablefuture'
}
implementation libs.metrics
implementation libs.infinispan
compileOnly libs.infinispanAnnotations // Workaround for ISPN-12461
implementation libs.caffeine

testImplementation project(':clients').sourceSets.test.output.classesDirs
testImplementation project(':test-common')
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package kafka.log

import io.aiven.inkless.cache.InfinispanCache

import java.lang.{Long => JLong}
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
Expand Down Expand Up @@ -463,8 +461,6 @@ class LogManager(logDirs: Seq[File],
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
// Ignore inkless-cache directory as that is a cache maintained by the inkless subsystem
!logDir.getName.equals(InfinispanCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.nio.file.Path
import java.time.Duration
import java.util
import java.util.Optional
Expand Down Expand Up @@ -348,14 +347,11 @@ class BrokerServer(
val inklessSharedState = sharedServer.inklessControlPlane.map { controlPlane =>
SharedState.initialize(
time,
clusterId,
config.rack.orNull,
config.brokerId,
config.inklessConfig,
inklessMetadataView,
controlPlane,
brokerTopicStats,
Path.of(config.logDirs().get(0)),
() => logManager.currentDefaultConfig
)
}
Expand Down
3 changes: 0 additions & 3 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ versions += [
gradle: "8.14.1",
grgit: "4.1.1",
httpclient: "4.5.14",
infinispan: "15.1.1.Final",
jackson: "2.19.0",
jacoco: "0.8.13",
javassist: "3.30.2-GA",
Expand Down Expand Up @@ -168,8 +167,6 @@ libs += [
commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
gcsSdk: "com.google.cloud:google-cloud-storage:$versions.gcsSdk",
infinispan: "org.infinispan:infinispan-core:$versions.infinispan",
infinispanAnnotations: "org.infinispan:infinispan-component-annotations:$versions.infinispan",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.aiven.inkless.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Function;

import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;

public final class CaffeineCache implements ObjectCache {

private final Cache<CacheKey, FileExtent> cache;

private final CaffeineCacheMetrics metrics;

public CaffeineCache(
final long maxCacheSize,
final long lifespanSeconds,
final int maxIdleSeconds) {
cache = Caffeine.newBuilder()
.maximumSize(maxCacheSize)
.expireAfterWrite(Duration.ofSeconds(lifespanSeconds))
.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180))
.recordStats()
.build();
metrics = new CaffeineCacheMetrics(cache);
}

@Override
public void close() throws IOException {
metrics.close();
}

@Override
public FileExtent computeIfAbsent(final CacheKey key, final Function<CacheKey, FileExtent> mappingFunction) {
return cache.asMap().computeIfAbsent(key, mappingFunction);
}

@Override
public FileExtent get(final CacheKey key) {
return cache.getIfPresent(key);
}

@Override
public void put(final CacheKey key, final FileExtent value) {
cache.asMap().putIfAbsent(key, value);
}

@Override
public boolean remove(final CacheKey key) {
if (cache.getIfPresent(key) != null) {
cache.invalidate(key);
return true;
}
return false;
}

@Override
public long size() {
return cache.estimatedSize();
}

public CacheStats stats() {
return cache.stats();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.aiven.inkless.cache;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

import com.github.benmanes.caffeine.cache.Cache;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import io.aiven.inkless.common.metrics.MeasurableValue;
import io.aiven.inkless.common.metrics.SensorProvider;

public final class CaffeineCacheMetrics implements Closeable {

private final Metrics metrics;

private final Sensor cacheSizeSensor;
private final Sensor cacheHitCountSensor;
private final Sensor cacheHitRateSensor;
private final Sensor cacheMissCountSensor;
private final Sensor cacheMissRateSensor;
private final Sensor cacheAvgLoadPenaltySensor;
private final Sensor cacheEvictionsSensor;

public CaffeineCacheMetrics(final Cache<?, ?> cache) {
final JmxReporter reporter = new JmxReporter();
this.metrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT)
);

final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry();
cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize);
cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount());
cacheHitRateSensor = registerDoubleSensor(metrics, metricsRegistry.cacheHitRateMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_RATE, () -> cache.stats().hitRate());
cacheMissCountSensor = registerLongSensor(metrics, metricsRegistry.cacheMissCountMetricName, CaffeineCacheMetricsRegistry.CACHE_MISS_COUNT, () -> cache.stats().missCount());
cacheMissRateSensor = registerDoubleSensor(metrics, metricsRegistry.cacheMissRateMetricName, CaffeineCacheMetricsRegistry.CACHE_MISS_RATE, () -> cache.stats().missRate());
cacheAvgLoadPenaltySensor = registerDoubleSensor(metrics, metricsRegistry.avgReadTimeMetricName, CaffeineCacheMetricsRegistry.CACHE_AVG_LOAD_PENALTY_NANOSECONDS, () -> cache.stats().averageLoadPenalty());
cacheEvictionsSensor = registerLongSensor(metrics, metricsRegistry.cacheEvictionsMetricName, CaffeineCacheMetricsRegistry.CACHE_EVICTION_COUNT, () -> cache.stats().evictionCount());
}

static Sensor registerDoubleSensor(final Metrics metrics, final MetricNameTemplate metricName, final String sensorName, final Supplier<Double> supplier) {
return new SensorProvider(metrics, sensorName)
.with(metricName, new MeasurableValue<>(supplier))
.get();
}

static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate metricName, final String sensorName, final Supplier<Long> supplier) {
return new SensorProvider(metrics, sensorName)
.with(metricName, new MeasurableValue<>(supplier))
.get();
}

@Override
public String toString() {
return "CaffeineCacheMetrics{" +
"metrics=" + metrics +
", cacheSizeSensor=" + cacheSizeSensor +
", cacheHitCountSensor=" + cacheHitCountSensor +
", cacheHitRateSensor=" + cacheHitRateSensor +
", cacheMissCountSensor=" + cacheMissCountSensor +
", cacheMissRateSensor=" + cacheMissRateSensor +
", cacheAvgLoadPenaltySensor=" + cacheAvgLoadPenaltySensor +
", cacheEvictionsSensor=" + cacheEvictionsSensor +
'}';
}

@Override
public void close() throws IOException {
metrics.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.MetricNameTemplate;

public class CaffeineCacheMetricsRegistry {
public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine";
public static final String METRIC_GROUP = "wal-segment-cache";

public static final String CACHE_SIZE = "cache-size";
public static final String CACHE_HIT_RATE = "cache-hits-rate";
public static final String CACHE_HIT_COUNT = "cache-hit-count";
public static final String CACHE_MISS_RATE = "cache-miss-rate";
public static final String CACHE_MISS_COUNT = "cache-miss-count";
public static final String CACHE_AVG_LOAD_PENALTY_NANOSECONDS = "avg-load-penalty-ns";
public static final String CACHE_EVICTION_COUNT = "cache-evictions-count";

public final MetricNameTemplate cacheSizeMetricName;
public final MetricNameTemplate cacheHitRateMetricName;
public final MetricNameTemplate cacheHitCountMetricName;
public final MetricNameTemplate cacheMissRateMetricName;
public final MetricNameTemplate cacheMissCountMetricName;
public final MetricNameTemplate avgReadTimeMetricName;
public final MetricNameTemplate cacheEvictionsMetricName;

public CaffeineCacheMetricsRegistry() {
cacheSizeMetricName = new MetricNameTemplate(
CACHE_SIZE,
METRIC_GROUP,
"Current size of the cache"
);
cacheHitRateMetricName = new MetricNameTemplate(
CACHE_HIT_RATE,
METRIC_GROUP,
"Cache hit rate"
);

cacheHitCountMetricName = new MetricNameTemplate(
CACHE_HIT_COUNT,
METRIC_GROUP,
"Number of cache hits"
);
cacheMissRateMetricName = new MetricNameTemplate(
CACHE_MISS_RATE,
METRIC_GROUP,
"Cache miss rate"
);

cacheMissCountMetricName = new MetricNameTemplate(
CACHE_MISS_COUNT,
METRIC_GROUP,
"Number of cache misses"
);
avgReadTimeMetricName = new MetricNameTemplate(
CACHE_AVG_LOAD_PENALTY_NANOSECONDS,
METRIC_GROUP,
"Average cache load penalty in nanoseconds"
);
cacheEvictionsMetricName = new MetricNameTemplate(
CACHE_EVICTION_COUNT,
METRIC_GROUP,
"Number of evictions from the cache"
);
}
}
Loading
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载