Store first slice in the header cache key.

This commit is contained in:
John Preston 2019-03-08 15:23:34 +04:00
parent 95954c4b1f
commit 84b09795f3
2 changed files with 194 additions and 76 deletions

View File

@ -21,6 +21,7 @@ constexpr auto kPartsInSlice = 64;
constexpr auto kInSlice = kPartsInSlice * kPartSize;
constexpr auto kMaxPartsInHeader = 16;
constexpr auto kMaxOnlyInHeader = 80 * kPartSize;
constexpr auto kPartsOutsideFirstSliceGood = 8;
constexpr auto kSlicesInMemory = 2;
// 1 MB of header parts can be outside the first slice for us to still
@ -30,6 +31,10 @@ constexpr auto kSlicesInMemory = 2;
// 1 MB of parts are requested from cloud ahead of reading demand.
constexpr auto kPreloadPartsAhead = 8;
bool IsContiguousSerialization(int serializedSize, int maxSliceSize) {
return !(serializedSize % kPartSize) || (serializedSize == maxSliceSize);
}
template <typename Range> // Range::value_type is Pair<int, QByteArray>
int FindNotLoadedStart(Range &&parts, int offset) {
auto result = offset;
@ -108,7 +113,9 @@ Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const {
return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber };
}
bool Reader::Slice::processCacheData(QByteArray &&data, int maxSliceSize) {
bytes::const_span Reader::Slice::processCacheData(
bytes::const_span data,
int maxSize) {
Expects((flags & Flag::LoadingFromCache) != 0);
Expects(!(flags & Flag::LoadedFromCache));
@ -117,24 +124,28 @@ bool Reader::Slice::processCacheData(QByteArray &&data, int maxSliceSize) {
flags &= ~Flag::LoadingFromCache;
});
const auto size = data.size();
if (!(size % kPartSize) || (size == maxSliceSize)) {
if (size > maxSliceSize) {
return false;
const auto size = int(data.size());
if (IsContiguousSerialization(size, maxSize)) {
if (size > maxSize) {
return {};
}
for (auto offset = 0; offset < size; offset += kPartSize) {
parts.emplace(offset, data.mid(offset, kPartSize));
const auto part = data.subspan(
offset,
std::min(kPartSize, size - offset));
parts.try_emplace(
offset,
reinterpret_cast<const char*>(part.data()),
part.size());
}
return true;
return {};
}
return processComplexCacheData(
bytes::make_span(data),
maxSliceSize);
return processComplexCacheData(bytes::make_span(data), maxSize);
}
bool Reader::Slice::processComplexCacheData(
bytes::const_span Reader::Slice::processComplexCacheData(
bytes::const_span data,
int maxSliceSize) {
int maxSize) {
const auto takeInt = [&]() -> std::optional<int> {
if (data.size() < sizeof(int32)) {
return std::nullopt;
@ -144,35 +155,42 @@ bool Reader::Slice::processComplexCacheData(
data = data.subspan(sizeof(int32));
return result;
};
const auto takeBytes = [&](int count) -> std::optional<QByteArray> {
const auto takeBytes = [&](int count) {
if (count <= 0 || data.size() < count) {
return std::nullopt;
return bytes::const_span();
}
auto result = QByteArray(
reinterpret_cast<const char*>(data.data()),
count);
const auto result = data.subspan(0, count);
data = data.subspan(count);
return result;
};
const auto count = takeInt().value_or(0);
if (count <= 0) {
return false;
const auto maybeCount = takeInt();
if (!maybeCount) {
return {};
}
const auto count = *maybeCount;
if (count < 0) {
return {};
} else if (!count) {
return data;
}
for (auto i = 0; i != count; ++i) {
const auto offset = takeInt().value_or(0);
const auto size = takeInt().value_or(0);
auto bytes = takeBytes(size).value_or(QByteArray());
const auto bytes = takeBytes(size);
if (offset < 0
|| offset >= maxSliceSize
|| offset >= maxSize
|| size <= 0
|| size > maxSliceSize
|| offset + size > maxSliceSize
|| size > maxSize
|| offset + size > maxSize
|| bytes.size() != size) {
return false;
return {};
}
parts.emplace(offset, std::move(bytes));
parts.try_emplace(
offset,
reinterpret_cast<const char*>(bytes.data()),
bytes.size());
}
return true;
return data;
}
void Reader::Slice::addPart(int offset, QByteArray bytes) {
@ -258,26 +276,50 @@ Reader::Slices::Slices(int size, bool useCache)
: _size(size) {
Expects(size > 0);
_header.flags |= Slice::Flag::Header;
if (useCache) {
_header.flags |= Slice::Flag::LoadingFromCache;
} else {
_headerMode = HeaderMode::NoCache;
}
if (!fullInHeader()) {
if (!isFullInHeader()) {
_data.resize((_size + kInSlice - 1) / kInSlice);
}
}
bool Reader::Slices::fullInHeader() const {
bool Reader::Slices::headerModeUnknown() const {
return (_headerMode == HeaderMode::Unknown);
}
bool Reader::Slices::isFullInHeader() const {
return (_size <= kMaxOnlyInHeader);
}
bool Reader::Slices::isGoodHeader() const {
return (_headerMode == HeaderMode::Good);
}
bool Reader::Slices::computeIsGoodHeader() const {
if (isFullInHeader()) {
return false;
}
const auto outsideFirstSliceIt = ranges::lower_bound(
_header.parts,
kInSlice,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
const auto outsideFirstSlice = end(_header.parts) - outsideFirstSliceIt;
return (outsideFirstSlice <= kPartsOutsideFirstSliceGood);
}
void Reader::Slices::headerDone(bool fromCache) {
if (_headerMode != HeaderMode::Unknown) {
return;
}
_headerMode = HeaderMode::Small;
_headerMode = isFullInHeader()
? HeaderMode::Full
: computeIsGoodHeader()
? HeaderMode::Good
: HeaderMode::Small;
if (!fromCache) {
for (auto &slice : _data) {
using Flag = Slice::Flag;
@ -289,14 +331,14 @@ void Reader::Slices::headerDone(bool fromCache) {
}
bool Reader::Slices::headerWontBeFilled() const {
return (_headerMode == HeaderMode::Unknown)
return headerModeUnknown()
&& (_header.parts.size() >= kMaxPartsInHeader);
}
void Reader::Slices::applyHeaderCacheData() {
if (_header.parts.empty()) {
if (_header.parts.empty() || _headerMode != HeaderMode::Unknown) {
return;
} else if (fullInHeader()) {
} else if (isFullInHeader()) {
headerDone(true);
return;
}
@ -311,28 +353,43 @@ void Reader::Slices::applyHeaderCacheData() {
void Reader::Slices::processCacheResult(
int sliceNumber,
QByteArray &&result) {
bytes::const_span result) {
Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
if (!(slice.flags &Slice::Flag::LoadingFromCache)) {
if (!sliceNumber && isGoodHeader()) {
// We've loaded header slice because really we wanted first slice.
if (!(_data[0].flags & Slice::Flag::LoadingFromCache)) {
// We could've already unloaded this slice using LRU _usedSlices.
return;
}
// So just process whole result even if we didn't want header really.
slice.flags |= Slice::Flag::LoadingFromCache;
}
if (!(slice.flags & Slice::Flag::LoadingFromCache)) {
// We could've already unloaded this slice using LRU _usedSlices.
return;
}
const auto success = slice.processCacheData(
std::move(result),
const auto remaining = slice.processCacheData(
result,
maxSliceSize(sliceNumber));
if (!sliceNumber) {
applyHeaderCacheData();
if (isGoodHeader()) {
// When we first read header we don't request the first slice.
// But we get it, so let's apply it anyway.
_data[0].flags |= Slice::Flag::LoadingFromCache;
processCacheResult(1, remaining);
}
}
}
void Reader::Slices::processPart(
int offset,
QByteArray &&bytes) {
Expects(fullInHeader() || (offset / kInSlice < _data.size()));
Expects(isFullInHeader() || (offset / kInSlice < _data.size()));
if (fullInHeader()) {
if (isFullInHeader()) {
_header.addPart(offset, bytes);
return;
} else if (_headerMode == HeaderMode::Unknown) {
@ -357,8 +414,9 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
if (_headerMode != HeaderMode::NoCache
&& !(_header.flags & Flag::LoadedFromCache)) {
// Waiting for initial cache query.
Assert(_header.flags & Flag::LoadingFromCache);
return {};
} else if (fullInHeader()) {
} else if (isFullInHeader()) {
return fillFromHeader(offset, buffer);
}
@ -496,42 +554,84 @@ Reader::SerializedSlice Reader::Slices::serializeAndUnloadUnused() {
}
Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice(
int sliceNumber) {
int sliceNumber) {
Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
if (isGoodHeader() && (sliceNumber == 1)) {
return serializeAndUnloadSlice(0);
}
const auto writeHeaderAndSlice = isGoodHeader() && !sliceNumber;
auto &slice = sliceNumber ? _data[sliceNumber - 1] : _header;
const auto count = slice.parts.size();
Assert(count > 0);
auto result = SerializedSlice();
result.number = sliceNumber;
const auto continuous = FindNotLoadedStart(slice.parts, 0);
if (continuous > slice.parts.back().first) {
// We always use complex serialization for header + first slice.
const auto continuousTill = writeHeaderAndSlice
? 0
: FindNotLoadedStart(slice.parts, 0);
const auto continuous = (continuousTill > slice.parts.back().first);
if (continuous) {
// All data is continuous.
result.data.reserve(count * kPartSize);
for (const auto &[offset, part] : slice.parts) {
result.data.append(part);
}
} else {
const auto intSize = sizeof(int32);
result.data.reserve(count * kPartSize + 2 * intSize * (count + 1));
const auto appendInt = [&](int value) {
auto serialized = int32(value);
result.data.append(
reinterpret_cast<const char*>(&serialized),
intSize);
};
appendInt(count);
for (const auto &[offset, part] : slice.parts) {
appendInt(offset);
appendInt(part.size());
result.data.append(part);
result.data = serializeComplexSlice(slice);
if (writeHeaderAndSlice) {
result.data.append(serializeAndUnloadFirstSliceNoHeader());
}
if (result.data.size() == maxSliceSize(sliceNumber)) {
// Make sure this data won't be taken for full continuous data.
appendInt(0);
// Make sure this data won't be taken for full continuous data.
const auto maxSize = maxSliceSize(sliceNumber);
while (IsContiguousSerialization(result.data.size(), maxSize)) {
result.data.push_back(char(0));
}
}
// We may serialize header in the middle of streaming, if we use
// HeaderMode::Good and we unload first slice. We still require
// header data to continue working, so don't really unload the header.
if (sliceNumber) {
slice = Slice();
} else {
slice.flags &= ~Slice::Flag::ChangedSinceCache;
}
return result;
}
QByteArray Reader::Slices::serializeComplexSlice(const Slice &slice) const {
auto result = QByteArray();
const auto count = slice.parts.size();
const auto intSize = sizeof(int32);
result.reserve(count * kPartSize + 2 * intSize * (count + 1));
const auto appendInt = [&](int value) {
auto serialized = int32(value);
result.append(
reinterpret_cast<const char*>(&serialized),
intSize);
};
appendInt(count);
for (const auto &[offset, part] : slice.parts) {
appendInt(offset);
appendInt(part.size());
result.append(part);
}
return result;
}
QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() {
Expects(_data[0].flags & Slice::Flag::LoadedFromCache);
auto &slice = _data[0];
for (const auto &[offset, part] : _header.parts) {
slice.parts.erase(offset);
}
auto result = serializeComplexSlice(slice);
slice = Slice();
return result;
}
@ -595,7 +695,11 @@ std::shared_ptr<Reader::CacheHelper> Reader::InitCacheHelper(
// 0 is for headerData, slice index = sliceNumber - 1.
void Reader::readFromCache(int sliceNumber) {
Expects(_cacheHelper != nullptr);
Expects(!sliceNumber || !_slices.headerModeUnknown());
if (sliceNumber == 1 && _slices.isGoodHeader()) {
return readFromCache(1);
}
LOG(("READING FROM CACHE: %1").arg(sliceNumber));
const auto key = _cacheHelper->key(sliceNumber);
const auto weak = std::weak_ptr<CacheHelper>(_cacheHelper);
@ -703,8 +807,12 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) {
++several;
if (_cacheHelper && result.toCache.number > 0) {
const auto index = result.toCache.number - 1;
if (_cacheHelper && result.toCache.number >= 0) {
// If we put to cache the header (number == 0) that means we're in
// HeaderMode::Good and really are putting the first slice to cache.
Assert(result.toCache.number > 0 || _slices.isGoodHeader());
const auto index = std::min(result.toCache.number, 1) - 1;
cancelLoadInRange(index * kInSlice, (index + 1) * kInSlice);
putToCache(std::move(result.toCache));
}
@ -747,10 +855,8 @@ bool Reader::processCacheResults() {
auto loaded = base::take(_cacheHelper->results);
lock.unlock();
for (auto &&part : loaded) {
const auto sliceNumber = part.first;
auto &serialized = part.second;
_slices.processCacheResult(sliceNumber, std::move(serialized));
for (const auto &[sliceNumber, result] : loaded) {
_slices.processCacheResult(sliceNumber, bytes::make_span(result));
}
return !loaded.empty();
}

View File

@ -77,7 +77,6 @@ private:
struct Slice {
enum class Flag : uchar {
Header = 0x01,
LoadingFromCache = 0x02,
LoadedFromCache = 0x04,
ChangedSinceCache = 0x08,
@ -92,10 +91,12 @@ private:
bool ready = true;
};
bool processCacheData(QByteArray &&data, int maxSliceSize);
bool processComplexCacheData(
bytes::const_span processCacheData(
bytes::const_span data,
int maxSliceSize);
int maxSize);
bytes::const_span processComplexCacheData(
bytes::const_span data,
int maxSize);
void addPart(int offset, QByteArray bytes);
PrepareFillResult prepareFill(int from, int till);
@ -115,8 +116,11 @@ private:
void headerDone(bool fromCache);
[[nodiscard]] bool headerWontBeFilled() const;
[[nodiscard]] bool headerModeUnknown() const;
[[nodiscard]] bool isFullInHeader() const;
[[nodiscard]] bool isGoodHeader() const;
void processCacheResult(int sliceNumber, QByteArray &&result);
void processCacheResult(int sliceNumber, bytes::const_span result);
void processPart(int offset, QByteArray &&bytes);
[[nodiscard]] FillResult fill(int offset, bytes::span buffer);
@ -126,16 +130,24 @@ private:
enum class HeaderMode {
Unknown,
Small,
Good,
Full,
NoCache,
};
void applyHeaderCacheData();
int maxSliceSize(int sliceNumber) const;
SerializedSlice serializeAndUnloadSlice(int sliceNumber);
SerializedSlice serializeAndUnloadUnused();
[[nodiscard]] int maxSliceSize(int sliceNumber) const;
[[nodiscard]] SerializedSlice serializeAndUnloadSlice(
int sliceNumber);
[[nodiscard]] SerializedSlice serializeAndUnloadUnused();
[[nodiscard]] QByteArray serializeComplexSlice(
const Slice &slice) const;
[[nodiscard]] QByteArray serializeAndUnloadFirstSliceNoHeader();
void markSliceUsed(int sliceIndex);
bool fullInHeader() const;
FillResult fillFromHeader(int offset, bytes::span buffer);
[[nodiscard]] bool computeIsGoodHeader() const;
[[nodiscard]] FillResult fillFromHeader(
int offset,
bytes::span buffer);
std::vector<Slice> _data;
Slice _header;