这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Interpreters/Cache/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
, keep_current_size_to_max_ratio(1 - settings[FileCacheSetting::keep_free_space_size_ratio])
, keep_current_elements_to_max_ratio(1 - settings[FileCacheSetting::keep_free_space_elements_ratio])
, keep_up_free_space_remove_batch(settings[FileCacheSetting::keep_free_space_remove_batch])
, name(cache_name)
, log(getLogger("FileCache(" + cache_name + ")"))
, metadata(settings[FileCacheSetting::path],
settings[FileCacheSetting::background_download_queue_size_limit],
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/Cache/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class FileCache : private boost::noncopyable

void freeSpaceRatioKeepingThreadFunc();

const String & getName() const { return name; }

private:
using KeyAndOffset = FileCacheKeyAndOffset;

Expand All @@ -228,6 +230,7 @@ class FileCache : private boost::noncopyable
const double keep_current_elements_to_max_ratio;
const size_t keep_up_free_space_remove_batch;

String name;
LoggerPtr log;

std::exception_ptr init_exception;
Expand Down
9 changes: 9 additions & 0 deletions src/Interpreters/Cache/FileCacheFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
return caches_by_name;
}

FileCacheFactory::Caches FileCacheFactory::getUniqueInstances()
{
std::lock_guard lock(mutex);
Caches caches;
for (const auto & [_, cache_data] : caches_by_name)
caches.insert(cache_data);
return caches;
}

FileCachePtr FileCacheFactory::get(const std::string & cache_name)
{
std::lock_guard lock(mutex);
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/Cache/FileCacheFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <unordered_set>
#include <mutex>

namespace DB
Expand Down Expand Up @@ -36,6 +37,7 @@ class FileCacheFactory final : private boost::noncopyable

using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
using Caches = std::unordered_set<FileCacheDataPtr>;

static FileCacheFactory & instance();

Expand All @@ -52,6 +54,7 @@ class FileCacheFactory final : private boost::noncopyable
const std::string & config_path);

CacheByName getAll();
Caches getUniqueInstances();

FileCacheDataPtr getByName(const std::string & cache_name);

Expand Down
5 changes: 2 additions & 3 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,9 +882,8 @@ struct ContextSharedPart : boost::noncopyable
/// Background operations in cache use background schedule pool.
/// Deactivate them before destructing it.
LOG_TRACE(log, "Shutting down caches");
const auto & caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache] : caches)
cache->cache->deactivateBackgroundOperations();
for (const auto & cache_data : FileCacheFactory::instance().getUniqueInstances())
cache_data->cache->deactivateBackgroundOperations();
FileCacheFactory::instance().clear();

{
Expand Down
8 changes: 3 additions & 5 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ BlockIO InterpreterSystemQuery::execute()

if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache_data] : caches)
for (const auto & cache_data : FileCacheFactory::instance().getUniqueInstances())
{
if (!cache_data->cache->isInitialized())
continue;
Expand Down Expand Up @@ -523,11 +522,10 @@ BlockIO InterpreterSystemQuery::execute()

if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [cache_name, cache_data] : caches)
for (const auto & cache_data : FileCacheFactory::instance().getUniqueInstances())
{
auto file_segments = cache_data->cache->sync();
fill_data(cache_name, cache_data->cache, file_segments);
fill_data(cache_data->cache->getName(), cache_data->cache, file_segments);
}
}
else
Expand Down
3 changes: 1 addition & 2 deletions src/Interpreters/ServerAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ ServerAsynchronousMetrics::~ServerAsynchronousMetrics()
void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values)
{
{
auto caches = FileCacheFactory::instance().getAll();
size_t total_bytes = 0;
size_t max_bytes = 0;
size_t total_files = 0;

for (const auto & [_, cache_data] : caches)
for (const auto & cache_data : FileCacheFactory::instance().getUniqueInstances())
{
total_bytes += cache_data->cache->getUsedCacheSize();
max_bytes += cache_data->cache->getMaxCacheSize();
Expand Down
67 changes: 38 additions & 29 deletions src/Storages/System/StorageSystemFilesystemCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,46 +45,55 @@ StorageSystemFilesystemCache::StorageSystemFilesystemCache(const StorageID & tab

void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
{
auto caches = FileCacheFactory::instance().getAll();
auto caches_by_name = FileCacheFactory::instance().getAll();
std::unordered_set<FileCacheFactory::FileCacheDataPtr> caches;
for (const auto & [_, cache_data] : caches_by_name)
caches.insert(cache_data);
std::unordered_map<FileCacheFactory::FileCacheDataPtr, std::vector<std::string>> caches_by_instance;
for (const auto & [cache_name, cache_data] : caches_by_name)
caches_by_instance[cache_data].push_back(cache_name);

for (const auto & [cache_name, cache_data] : caches)
for (const auto & cache_data : caches)
{
const auto & cache = cache_data->cache;
if (!cache->isInitialized())
continue;

cache->iterate([&](const FileSegment::Info & file_segment)
{
size_t i = 0;
res_columns[i++]->insert(cache_name);
res_columns[i++]->insert(cache->getBasePath());
for (const auto & cache_name : caches_by_instance.at(cache_data))
{
size_t i = 0;
res_columns[i++]->insert(cache_name);
res_columns[i++]->insert(cache->getBasePath());

/// Do not use `file_segment->getPath` here because it will lead to nullptr dereference
/// (because file_segments in getSnapshot doesn't have `cache` field set)
/// Do not use `file_segment->getPath` here because it will lead to nullptr dereference
/// (because file_segments in getSnapshot doesn't have `cache` field set)

const auto path = cache->getFileSegmentPath(
file_segment.key, file_segment.offset, file_segment.kind,
FileCache::UserInfo(file_segment.user_id, file_segment.user_weight));
res_columns[i++]->insert(path);
res_columns[i++]->insert(file_segment.key.toString());
res_columns[i++]->insert(file_segment.range_left);
res_columns[i++]->insert(file_segment.range_right);
res_columns[i++]->insert(file_segment.size);
res_columns[i++]->insert(FileSegment::stateToString(file_segment.state));
res_columns[i++]->insert(file_segment.download_finished_time);
res_columns[i++]->insert(file_segment.cache_hits);
res_columns[i++]->insert(file_segment.references);
res_columns[i++]->insert(file_segment.downloaded_size);
res_columns[i++]->insert(toString(file_segment.kind));
res_columns[i++]->insert(file_segment.is_unbound);
res_columns[i++]->insert(file_segment.user_id);
const auto path = cache->getFileSegmentPath(
file_segment.key, file_segment.offset, file_segment.kind,
FileCache::UserInfo(file_segment.user_id, file_segment.user_weight));
res_columns[i++]->insert(path);
res_columns[i++]->insert(file_segment.key.toString());
res_columns[i++]->insert(file_segment.range_left);
res_columns[i++]->insert(file_segment.range_right);
res_columns[i++]->insert(file_segment.size);
res_columns[i++]->insert(FileSegment::stateToString(file_segment.state));
res_columns[i++]->insert(file_segment.download_finished_time);
res_columns[i++]->insert(file_segment.cache_hits);
res_columns[i++]->insert(file_segment.references);
res_columns[i++]->insert(file_segment.downloaded_size);
res_columns[i++]->insert(toString(file_segment.kind));
res_columns[i++]->insert(file_segment.is_unbound);
res_columns[i++]->insert(file_segment.user_id);

std::error_code ec;
auto size = fs::file_size(path, ec);
if (!ec)
res_columns[i++]->insert(size);
else
res_columns[i++]->insertDefault();
std::error_code ec;
auto size = fs::file_size(path, ec);
if (!ec)
res_columns[i++]->insert(size);
else
res_columns[i++]->insertDefault();
}
}, FileCache::getCommonUser().user_id);
}
}
Expand Down
Loading