Parse cache entries async.

This commit is contained in:
John Preston 2019-04-11 15:56:26 +04:00
parent fe15ee742d
commit 1ee4dac4f3
2 changed files with 186 additions and 122 deletions

View File

@ -24,17 +24,139 @@ constexpr auto kMaxOnlyInHeader = 80 * kPartSize;
constexpr auto kPartsOutsideFirstSliceGood = 8;
constexpr auto kSlicesInMemory = 2;
// 1 MB of header parts can be outside the first slice for us to still
// put the whole first slice of the file in the header cache entry.
//constexpr auto kMaxOutsideHeaderPartsForOptimizedMode = 8;
// 1 MB of parts are requested from cloud ahead of reading demand.
constexpr auto kPreloadPartsAhead = 8;
using PartsMap = base::flat_map<int, QByteArray>;
struct ParsedCacheEntry {
PartsMap parts;
std::optional<PartsMap> included;
};
bool IsContiguousSerialization(int serializedSize, int maxSliceSize) {
return !(serializedSize % kPartSize) || (serializedSize == maxSliceSize);
}
bool IsFullInHeader(int size) {
return (size <= kMaxOnlyInHeader);
}
bool ComputeIsGoodHeader(int size, const PartsMap &header) {
if (IsFullInHeader(size)) {
return false;
}
const auto outsideFirstSliceIt = ranges::lower_bound(
header,
kInSlice,
ranges::less(),
&PartsMap::value_type::first);
const auto outsideFirstSlice = end(header) - outsideFirstSliceIt;
return (outsideFirstSlice <= kPartsOutsideFirstSliceGood);
}
int SlicesCount(int size) {
return (size + kInSlice - 1) / kInSlice;
}
int MaxSliceSize(int sliceNumber, int size) {
return !sliceNumber
? size
: (sliceNumber == SlicesCount(size))
? (size - (sliceNumber - 1) * kInSlice)
: kInSlice;
}
bytes::const_span ParseComplexCachedMap(
PartsMap &result,
bytes::const_span data,
int maxSize) {
const auto takeInt = [&]() -> std::optional<int> {
if (data.size() < sizeof(int32)) {
return std::nullopt;
}
const auto bytes = data.data();
const auto result = *reinterpret_cast<const int32*>(bytes);
data = data.subspan(sizeof(int32));
return result;
};
const auto takeBytes = [&](int count) {
if (count <= 0 || data.size() < count) {
return bytes::const_span();
}
const auto result = data.subspan(0, count);
data = data.subspan(count);
return result;
};
const auto maybeCount = takeInt();
if (!maybeCount) {
return {};
}
const auto count = *maybeCount;
if (count < 0) {
return {};
} else if (!count) {
return data;
}
for (auto i = 0; i != count; ++i) {
const auto offset = takeInt().value_or(0);
const auto size = takeInt().value_or(0);
const auto bytes = takeBytes(size);
if (offset < 0
|| offset >= maxSize
|| size <= 0
|| size > maxSize
|| offset + size > maxSize
|| bytes.size() != size) {
return {};
}
result.try_emplace(
offset,
reinterpret_cast<const char*>(bytes.data()),
bytes.size());
}
return data;
}
bytes::const_span ParseCachedMap(
PartsMap &result,
bytes::const_span data,
int maxSize) {
const auto size = int(data.size());
if (IsContiguousSerialization(size, maxSize)) {
if (size > maxSize) {
return {};
}
for (auto offset = 0; offset < size; offset += kPartSize) {
const auto part = data.subspan(
offset,
std::min(kPartSize, size - offset));
result.try_emplace(
offset,
reinterpret_cast<const char*>(part.data()),
part.size());
}
return {};
}
return ParseComplexCachedMap(result, data, maxSize);
}
ParsedCacheEntry ParseCacheEntry(
bytes::const_span data,
int sliceNumber,
int size) {
auto result = ParsedCacheEntry();
const auto remaining = ParseCachedMap(
result.parts,
data,
MaxSliceSize(sliceNumber, size));
if (!sliceNumber && ComputeIsGoodHeader(size, result.parts)) {
result.included = PartsMap();
ParseCachedMap(*result.included, remaining, MaxSliceSize(1, size));
}
return result;
}
template <typename Range> // Range::value_type is Pair<int, QByteArray>
int FindNotLoadedStart(Range &&parts, int offset) {
auto result = offset;
@ -101,7 +223,7 @@ struct Reader::CacheHelper {
const Storage::Cache::Key baseKey;
QMutex mutex;
base::flat_map<int, QByteArray> results;
base::flat_map<int, PartsMap> results;
std::atomic<crl::semaphore*> waiting = nullptr;
};
@ -113,9 +235,7 @@ Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const {
return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber };
}
bytes::const_span Reader::Slice::processCacheData(
bytes::const_span data,
int maxSize) {
void Reader::Slice::processCacheData(PartsMap &&data) {
Expects((flags & Flag::LoadingFromCache) != 0);
Expects(!(flags & Flag::LoadedFromCache));
@ -123,74 +243,13 @@ bytes::const_span Reader::Slice::processCacheData(
flags |= Flag::LoadedFromCache;
flags &= ~Flag::LoadingFromCache;
});
const auto size = int(data.size());
if (IsContiguousSerialization(size, maxSize)) {
if (size > maxSize) {
return {};
if (parts.empty()) {
parts = std::move(data);
} else {
for (auto &[offset, bytes] : data) {
parts.emplace(offset, std::move(bytes));
}
for (auto offset = 0; offset < size; offset += kPartSize) {
const auto part = data.subspan(
offset,
std::min(kPartSize, size - offset));
parts.try_emplace(
offset,
reinterpret_cast<const char*>(part.data()),
part.size());
}
return {};
}
return processComplexCacheData(bytes::make_span(data), maxSize);
}
bytes::const_span Reader::Slice::processComplexCacheData(
bytes::const_span data,
int maxSize) {
const auto takeInt = [&]() -> std::optional<int> {
if (data.size() < sizeof(int32)) {
return std::nullopt;
}
const auto bytes = data.data();
const auto result = *reinterpret_cast<const int32*>(bytes);
data = data.subspan(sizeof(int32));
return result;
};
const auto takeBytes = [&](int count) {
if (count <= 0 || data.size() < count) {
return bytes::const_span();
}
const auto result = data.subspan(0, count);
data = data.subspan(count);
return result;
};
const auto maybeCount = takeInt();
if (!maybeCount) {
return {};
}
const auto count = *maybeCount;
if (count < 0) {
return {};
} else if (!count) {
return data;
}
for (auto i = 0; i != count; ++i) {
const auto offset = takeInt().value_or(0);
const auto size = takeInt().value_or(0);
const auto bytes = takeBytes(size);
if (offset < 0
|| offset >= maxSize
|| size <= 0
|| size > maxSize
|| offset + size > maxSize
|| bytes.size() != size) {
return {};
}
parts.try_emplace(
offset,
reinterpret_cast<const char*>(bytes.data()),
bytes.size());
}
return data;
}
void Reader::Slice::addPart(int offset, QByteArray bytes) {
@ -215,7 +274,7 @@ auto Reader::Slice::prepareFill(int from, int till) -> PrepareFillResult {
parts,
from,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
&PartsMap::value_type::first);
if (after == begin(parts)) {
result.offsetsFromLoader = offsetsFromLoader(
fromOffset,
@ -229,7 +288,7 @@ auto Reader::Slice::prepareFill(int from, int till) -> PrepareFillResult {
end(parts),
till,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
&PartsMap::value_type::first);
const auto haveTill = FindNotLoadedStart(
ranges::make_iterator_range(start, finish),
fromOffset);
@ -256,7 +315,7 @@ auto Reader::Slice::offsetsFromLoader(int from, int till) const
parts,
from,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
&PartsMap::value_type::first);
auto check = (after == begin(parts)) ? after : (after - 1);
const auto end = parts.end();
for (auto offset = from; offset != till; offset += kPartSize) {
@ -282,7 +341,7 @@ Reader::Slices::Slices(int size, bool useCache)
_headerMode = HeaderMode::NoCache;
}
if (!isFullInHeader()) {
_data.resize((_size + kInSlice - 1) / kInSlice);
_data.resize(SlicesCount(_size));
}
}
@ -291,7 +350,7 @@ bool Reader::Slices::headerModeUnknown() const {
}
bool Reader::Slices::isFullInHeader() const {
return (_size <= kMaxOnlyInHeader);
return IsFullInHeader(_size);
}
bool Reader::Slices::isGoodHeader() const {
@ -299,16 +358,7 @@ bool Reader::Slices::isGoodHeader() const {
}
bool Reader::Slices::computeIsGoodHeader() const {
if (isFullInHeader()) {
return false;
}
const auto outsideFirstSliceIt = ranges::lower_bound(
_header.parts,
kInSlice,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
const auto outsideFirstSlice = end(_header.parts) - outsideFirstSliceIt;
return (outsideFirstSlice <= kPartsOutsideFirstSliceGood);
return ComputeIsGoodHeader(_size, _header.parts);
}
void Reader::Slices::headerDone(bool fromCache) {
@ -364,9 +414,7 @@ void Reader::Slices::applyHeaderCacheData() {
}
}
void Reader::Slices::processCacheResult(
int sliceNumber,
bytes::const_span result) {
void Reader::Slices::processCacheResult(int sliceNumber, PartsMap &&result) {
Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
@ -384,16 +432,13 @@ void Reader::Slices::processCacheResult(
// We could've already unloaded this slice using LRU _usedSlices.
return;
}
const auto remaining = slice.processCacheData(
result,
maxSliceSize(sliceNumber));
slice.processCacheData(std::move(result));
if (!sliceNumber) {
applyHeaderCacheData();
if (isGoodHeader()) {
// When we first read header we don't request the first slice.
// But we get it, so let's apply it anyway.
_data[0].flags |= Slice::Flag::LoadingFromCache;
processCacheResult(1, remaining);
}
}
}
@ -542,11 +587,7 @@ void Reader::Slices::markSliceUsed(int sliceIndex) {
}
int Reader::Slices::maxSliceSize(int sliceNumber) const {
return !sliceNumber
? _size
: (sliceNumber == _data.size())
? (_size - (sliceNumber - 1) * kInSlice)
: kInSlice;
return MaxSliceSize(sliceNumber, _size);
}
Reader::SerializedSlice Reader::Slices::serializeAndUnloadUnused() {
@ -779,6 +820,10 @@ void Reader::processDownloaderRequests() {
}
void Reader::checkCacheResultsForDownloader() {
}
bool Reader::isRemoteLoader() const {
return _loader->baseCacheKey().has_value();
}
@ -799,17 +844,32 @@ void Reader::readFromCache(int sliceNumber) {
if (sliceNumber == 1 && _slices.isGoodHeader()) {
return readFromCache(0);
}
const auto size = _loader->size();
const auto key = _cacheHelper->key(sliceNumber);
const auto weak = std::weak_ptr<CacheHelper>(_cacheHelper);
const auto cache = std::weak_ptr<CacheHelper>(_cacheHelper);
const auto weak = base::make_weak(this);
_owner->cacheBigFile().get(key, [=](QByteArray &&result) {
if (const auto strong = weak.lock()) {
QMutexLocker lock(&strong->mutex);
strong->results.emplace(sliceNumber, std::move(result));
if (const auto waiting = strong->waiting.load()) {
strong->waiting.store(nullptr, std::memory_order_release);
waiting->release();
crl::async([=, result = std::move(result)]{
auto entry = ParseCacheEntry(
bytes::make_span(result),
sliceNumber,
size);
if (const auto strong = cache.lock()) {
QMutexLocker lock(&strong->mutex);
strong->results.emplace(sliceNumber, std::move(entry.parts));
if (!sliceNumber && entry.included) {
strong->results.emplace(1, std::move(*entry.included));
}
if (const auto waiting = strong->waiting.load()) {
strong->waiting.store(nullptr, std::memory_order_release);
waiting->release();
} else {
crl::on_main(weak, [=] {
checkCacheResultsForDownloader();
});
}
}
}
});
});
}
@ -935,11 +995,17 @@ bool Reader::processCacheResults() {
}
QMutexLocker lock(&_cacheHelper->mutex);
const auto loaded = base::take(_cacheHelper->results);
auto loaded = base::take(_cacheHelper->results);
lock.unlock();
for (const auto &[sliceNumber, result] : loaded) {
_slices.processCacheResult(sliceNumber, bytes::make_span(result));
for (auto &[sliceNumber, result] : loaded) {
_slices.processCacheResult(sliceNumber, std::move(result));
}
if (!loaded.empty()
&& (loaded.front().first == 0)
&& _slices.isGoodHeader()) {
Assert(loaded.size() > 1);
Assert((loaded.begin() + 1)->first == 1);
}
if (_downloaderAttached.load(std::memory_order_acquire)) {
for (const auto &[sliceNumber, result] : loaded) {

View File

@ -65,6 +65,8 @@ private:
struct CacheHelper;
using PartsMap = base::flat_map<int, QByteArray>;
template <int Size>
class StackIntVector {
public:
@ -100,17 +102,12 @@ private:
struct PrepareFillResult {
StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
base::flat_map<int, QByteArray>::const_iterator start;
base::flat_map<int, QByteArray>::const_iterator finish;
PartsMap::const_iterator start;
PartsMap::const_iterator finish;
bool ready = true;
};
bytes::const_span processCacheData(
bytes::const_span data,
int maxSize);
bytes::const_span processComplexCacheData(
bytes::const_span data,
int maxSize);
void processCacheData(PartsMap &&data);
void addPart(int offset, QByteArray bytes);
PrepareFillResult prepareFill(int from, int till);
@ -119,7 +116,7 @@ private:
int from,
int till) const;
base::flat_map<int, QByteArray> parts;
PartsMap parts;
Flags flags;
};
@ -134,7 +131,7 @@ private:
[[nodiscard]] bool isFullInHeader() const;
[[nodiscard]] bool isGoodHeader() const;
void processCacheResult(int sliceNumber, bytes::const_span result);
void processCacheResult(int sliceNumber, PartsMap &&result);
void processPart(int offset, QByteArray &&bytes);
[[nodiscard]] FillResult fill(int offset, bytes::span buffer);
@ -191,6 +188,7 @@ private:
void finalizeCache();
void processDownloaderRequests();
void checkCacheResultsForDownloader();
static std::shared_ptr<CacheHelper> InitCacheHelper(
std::optional<Storage::Cache::Key> baseKey);