1
0
mirror of https://github.com/kdave/btrfs-progs synced 2025-04-01 22:48:06 +00:00

btrfs-progs: image: reduce memory requirements for decompression

With recent change to enlarge max_pending_size to 256M for data dump,
the decompress code requires quite a lot of memory, up to 256M * 4.

The reason is we're using wrapped uncompress() function call, which
needs the buffer to be large enough to contain the decompressed data.

This patch will re-work the decompress work to use inflate() which can
resume it decompression so that we can use a much smaller buffer size.

Use 512K as buffer size.

Now the memory consumption for restore is reduced to

 cluster data size + 512K * nr_running_threads

Instead of the original one:

 cluster data size + 1G * nr_running_threads

Signed-off-by: Qu Wenruo <wqu@suse.com>
Signed-off-by: David Sterba <dsterba@suse.com>
This commit is contained in:
Qu Wenruo 2021-08-24 15:41:07 +08:00 committed by David Sterba
parent 0cc8a7bd4a
commit c4ff83cb5d

View File

@ -1363,19 +1363,173 @@ static void write_backup_supers(int fd, u8 *buf)
}
}
/*
* Restore one item.
*
* For uncompressed data, it's just reading from work->buf then write to output.
* For compressed data, since we can have very large decompressed data
* (up to 256M), we need to consider memory usage. So here we will fill buffer
* then write the decompressed buffer to output.
*/
static int restore_one_work(struct mdrestore_struct *mdres,
struct async_work *async, u8 *buffer, int bufsize)
{
z_stream strm;
/* Offset inside work->buffer */
int buf_offset = 0;
/* Offset for output */
int out_offset = 0;
int out_len;
int outfd = fileno(mdres->out);
int compress_method = mdres->compress_method;
int ret;
ASSERT(is_power_of_2(bufsize));
if (compress_method == COMPRESS_ZLIB) {
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = async->bufsize;
strm.next_in = async->buffer;
strm.avail_out = 0;
strm.next_out = Z_NULL;
ret = inflateInit(&strm);
if (ret != Z_OK) {
error("failed to initialize decompress parameters: %d", ret);
return ret;
}
}
while (buf_offset < async->bufsize) {
bool compress_end = false;
int read_size = min_t(u64, async->bufsize - buf_offset, bufsize);
/* Read part */
if (compress_method == COMPRESS_ZLIB) {
if (strm.avail_out == 0) {
strm.avail_out = bufsize;
strm.next_out = buffer;
}
pthread_mutex_unlock(&mdres->mutex);
ret = inflate(&strm, Z_NO_FLUSH);
pthread_mutex_lock(&mdres->mutex);
switch (ret) {
case Z_NEED_DICT:
ret = Z_DATA_ERROR;
__attribute__ ((fallthrough));
case Z_DATA_ERROR:
case Z_MEM_ERROR:
goto out;
}
if (ret == Z_STREAM_END) {
ret = 0;
compress_end = true;
}
out_len = bufsize - strm.avail_out;
} else {
/* No compress, read as much data as possible */
memcpy(buffer, async->buffer + buf_offset, read_size);
buf_offset += read_size;
out_len = read_size;
}
/* Fixup part */
if (!mdres->multi_devices) {
if (async->start == BTRFS_SUPER_INFO_OFFSET) {
memcpy(mdres->original_super, buffer,
BTRFS_SUPER_INFO_SIZE);
if (mdres->old_restore) {
update_super_old(buffer);
} else {
ret = update_super(mdres, buffer);
if (ret < 0)
goto out;
}
} else if (!mdres->old_restore) {
ret = fixup_chunk_tree_block(mdres, async,
buffer, out_len);
if (ret)
goto out;
}
}
/* Write part */
if (!mdres->fixup_offset) {
int size = out_len;
off_t offset = 0;
while (size) {
u64 logical = async->start + out_offset + offset;
u64 chunk_size = size;
u64 physical_dup = 0;
u64 bytenr;
if (!mdres->multi_devices && !mdres->old_restore)
bytenr = logical_to_physical(mdres,
logical, &chunk_size,
&physical_dup);
else
bytenr = logical;
ret = pwrite64(outfd, buffer + offset, chunk_size, bytenr);
if (ret != chunk_size)
goto write_error;
if (physical_dup)
ret = pwrite64(outfd, buffer + offset,
chunk_size, physical_dup);
if (ret != chunk_size)
goto write_error;
size -= chunk_size;
offset += chunk_size;
continue;
}
} else if (async->start != BTRFS_SUPER_INFO_OFFSET) {
ret = write_data_to_disk(mdres->info, buffer,
async->start, out_len, 0);
if (ret) {
error("failed to write data");
exit(1);
}
}
/* backup super blocks are already there at fixup_offset stage */
if (async->start == BTRFS_SUPER_INFO_OFFSET &&
!mdres->multi_devices)
write_backup_supers(outfd, buffer);
out_offset += out_len;
if (compress_end) {
inflateEnd(&strm);
break;
}
}
return ret;
write_error:
if (ret < 0) {
error("unable to write to device: %m");
ret = -errno;
} else {
error("short write");
ret = -EIO;
}
out:
if (compress_method == COMPRESS_ZLIB)
inflateEnd(&strm);
return ret;
}
static void *restore_worker(void *data)
{
struct mdrestore_struct *mdres = (struct mdrestore_struct *)data;
struct async_work *async;
size_t size;
u8 *buffer;
u8 *outbuf;
int outfd;
int ret;
int compress_size = current_version->max_pending_size * 4;
int buffer_size = SZ_512K;
outfd = fileno(mdres->out);
buffer = malloc(compress_size);
buffer = malloc(buffer_size);
if (!buffer) {
error("not enough memory for restore worker buffer");
pthread_mutex_lock(&mdres->mutex);
@ -1386,10 +1540,6 @@ static void *restore_worker(void *data)
}
while (1) {
u64 bytenr, physical_dup;
off_t offset = 0;
int err = 0;
pthread_mutex_lock(&mdres->mutex);
while (!mdres->nodesize || list_empty(&mdres->list)) {
if (mdres->done) {
@ -1401,92 +1551,12 @@ static void *restore_worker(void *data)
async = list_entry(mdres->list.next, struct async_work, list);
list_del_init(&async->list);
if (mdres->compress_method == COMPRESS_ZLIB) {
size = compress_size;
ret = restore_one_work(mdres, async, buffer, buffer_size);
if (ret < 0) {
mdres->error = ret;
pthread_mutex_unlock(&mdres->mutex);
ret = uncompress(buffer, (unsigned long *)&size,
async->buffer, async->bufsize);
pthread_mutex_lock(&mdres->mutex);
if (ret != Z_OK) {
error("decompression failed with %d", ret);
err = -EIO;
}
outbuf = buffer;
} else {
outbuf = async->buffer;
size = async->bufsize;
goto out;
}
if (!mdres->multi_devices) {
if (async->start == BTRFS_SUPER_INFO_OFFSET) {
memcpy(mdres->original_super, outbuf,
BTRFS_SUPER_INFO_SIZE);
if (mdres->old_restore) {
update_super_old(outbuf);
} else {
ret = update_super(mdres, outbuf);
if (ret)
err = ret;
}
} else if (!mdres->old_restore) {
ret = fixup_chunk_tree_block(mdres, async, outbuf, size);
if (ret)
err = ret;
}
}
if (!mdres->fixup_offset) {
while (size) {
u64 chunk_size = size;
physical_dup = 0;
if (!mdres->multi_devices && !mdres->old_restore)
bytenr = logical_to_physical(mdres,
async->start + offset,
&chunk_size,
&physical_dup);
else
bytenr = async->start + offset;
ret = pwrite64(outfd, outbuf+offset, chunk_size,
bytenr);
if (ret != chunk_size)
goto error;
if (physical_dup)
ret = pwrite64(outfd, outbuf+offset,
chunk_size,
physical_dup);
if (ret != chunk_size)
goto error;
size -= chunk_size;
offset += chunk_size;
continue;
error:
if (ret < 0) {
error("unable to write to device: %m");
err = errno;
} else {
error("short write");
err = -EIO;
}
}
} else if (async->start != BTRFS_SUPER_INFO_OFFSET) {
ret = write_data_to_disk(mdres->info, outbuf, async->start, size, 0);
if (ret) {
error("failed to write data");
exit(1);
}
}
/* backup super blocks are already there at fixup_offset stage */
if (!mdres->multi_devices && async->start == BTRFS_SUPER_INFO_OFFSET)
write_backup_supers(outfd, outbuf);
if (err && !mdres->error)
mdres->error = err;
mdres->num_items--;
pthread_mutex_unlock(&mdres->mutex);