diff --git a/image/main.c b/image/main.c index c4a9a423..f0d1abf7 100644 --- a/image/main.c +++ b/image/main.c @@ -1363,19 +1363,173 @@ static void write_backup_supers(int fd, u8 *buf) } } +/* + * Restore one item. + * + * For uncompressed data, it's just reading from work->buf then write to output. + * For compressed data, since we can have very large decompressed data + * (up to 256M), we need to consider memory usage. So here we will fill buffer + * then write the decompressed buffer to output. + */ +static int restore_one_work(struct mdrestore_struct *mdres, + struct async_work *async, u8 *buffer, int bufsize) +{ + z_stream strm; + /* Offset inside work->buffer */ + int buf_offset = 0; + /* Offset for output */ + int out_offset = 0; + int out_len; + int outfd = fileno(mdres->out); + int compress_method = mdres->compress_method; + int ret; + + ASSERT(is_power_of_2(bufsize)); + + if (compress_method == COMPRESS_ZLIB) { + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = async->bufsize; + strm.next_in = async->buffer; + strm.avail_out = 0; + strm.next_out = Z_NULL; + ret = inflateInit(&strm); + if (ret != Z_OK) { + error("failed to initialize decompress parameters: %d", ret); + return ret; + } + } + while (buf_offset < async->bufsize) { + bool compress_end = false; + int read_size = min_t(u64, async->bufsize - buf_offset, bufsize); + + /* Read part */ + if (compress_method == COMPRESS_ZLIB) { + if (strm.avail_out == 0) { + strm.avail_out = bufsize; + strm.next_out = buffer; + } + pthread_mutex_unlock(&mdres->mutex); + ret = inflate(&strm, Z_NO_FLUSH); + pthread_mutex_lock(&mdres->mutex); + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; + __attribute__ ((fallthrough)); + case Z_DATA_ERROR: + case Z_MEM_ERROR: + goto out; + } + if (ret == Z_STREAM_END) { + ret = 0; + compress_end = true; + } + out_len = bufsize - strm.avail_out; + } else { + /* No compress, read as much data as possible */ + memcpy(buffer, async->buffer + buf_offset, read_size); + + buf_offset += read_size; + out_len = read_size; + } + + /* Fixup part */ + if (!mdres->multi_devices) { + if (async->start == BTRFS_SUPER_INFO_OFFSET) { + memcpy(mdres->original_super, buffer, + BTRFS_SUPER_INFO_SIZE); + if (mdres->old_restore) { + update_super_old(buffer); + } else { + ret = update_super(mdres, buffer); + if (ret < 0) + goto out; + } + } else if (!mdres->old_restore) { + ret = fixup_chunk_tree_block(mdres, async, + buffer, out_len); + if (ret) + goto out; + } + } + + /* Write part */ + if (!mdres->fixup_offset) { + int size = out_len; + off_t offset = 0; + + while (size) { + u64 logical = async->start + out_offset + offset; + u64 chunk_size = size; + u64 physical_dup = 0; + u64 bytenr; + + if (!mdres->multi_devices && !mdres->old_restore) + bytenr = logical_to_physical(mdres, + logical, &chunk_size, + &physical_dup); + else + bytenr = logical; + + ret = pwrite64(outfd, buffer + offset, chunk_size, bytenr); + if (ret != chunk_size) + goto write_error; + + if (physical_dup) + ret = pwrite64(outfd, buffer + offset, + chunk_size, physical_dup); + if (ret != chunk_size) + goto write_error; + + size -= chunk_size; + offset += chunk_size; + continue; + } + } else if (async->start != BTRFS_SUPER_INFO_OFFSET) { + ret = write_data_to_disk(mdres->info, buffer, + async->start, out_len, 0); + if (ret) { + error("failed to write data"); + exit(1); + } + } + + /* backup super blocks are already there at fixup_offset stage */ + if (async->start == BTRFS_SUPER_INFO_OFFSET && + !mdres->multi_devices) + write_backup_supers(outfd, buffer); + out_offset += out_len; + if (compress_end) { + inflateEnd(&strm); + break; + } + } + return ret; + +write_error: + if (ret < 0) { + error("unable to write to device: %m"); + ret = -errno; + } else { + error("short write"); + ret = -EIO; + } +out: + if (compress_method == COMPRESS_ZLIB) + inflateEnd(&strm); + return ret; +} + static void *restore_worker(void *data) { struct mdrestore_struct *mdres = (struct mdrestore_struct *)data; struct async_work *async; - size_t size; u8 *buffer; - u8 *outbuf; - int outfd; int ret; - int compress_size = current_version->max_pending_size * 4; + int buffer_size = SZ_512K; - outfd = fileno(mdres->out); - buffer = malloc(compress_size); + buffer = malloc(buffer_size); if (!buffer) { error("not enough memory for restore worker buffer"); pthread_mutex_lock(&mdres->mutex); @@ -1386,10 +1540,6 @@ static void *restore_worker(void *data) } while (1) { - u64 bytenr, physical_dup; - off_t offset = 0; - int err = 0; - pthread_mutex_lock(&mdres->mutex); while (!mdres->nodesize || list_empty(&mdres->list)) { if (mdres->done) { @@ -1401,92 +1551,12 @@ static void *restore_worker(void *data) async = list_entry(mdres->list.next, struct async_work, list); list_del_init(&async->list); - if (mdres->compress_method == COMPRESS_ZLIB) { - size = compress_size; + ret = restore_one_work(mdres, async, buffer, buffer_size); + if (ret < 0) { + mdres->error = ret; pthread_mutex_unlock(&mdres->mutex); - ret = uncompress(buffer, (unsigned long *)&size, - async->buffer, async->bufsize); - pthread_mutex_lock(&mdres->mutex); - if (ret != Z_OK) { - error("decompression failed with %d", ret); - err = -EIO; - } - outbuf = buffer; - } else { - outbuf = async->buffer; - size = async->bufsize; + goto out; } - - if (!mdres->multi_devices) { - if (async->start == BTRFS_SUPER_INFO_OFFSET) { - memcpy(mdres->original_super, outbuf, - BTRFS_SUPER_INFO_SIZE); - if (mdres->old_restore) { - update_super_old(outbuf); - } else { - ret = update_super(mdres, outbuf); - if (ret) - err = ret; - } - } else if (!mdres->old_restore) { - ret = fixup_chunk_tree_block(mdres, async, outbuf, size); - if (ret) - err = ret; - } - } - - if (!mdres->fixup_offset) { - while (size) { - u64 chunk_size = size; - physical_dup = 0; - if (!mdres->multi_devices && !mdres->old_restore) - bytenr = logical_to_physical(mdres, - async->start + offset, - &chunk_size, - &physical_dup); - else - bytenr = async->start + offset; - - ret = pwrite64(outfd, outbuf+offset, chunk_size, - bytenr); - if (ret != chunk_size) - goto error; - - if (physical_dup) - ret = pwrite64(outfd, outbuf+offset, - chunk_size, - physical_dup); - if (ret != chunk_size) - goto error; - - size -= chunk_size; - offset += chunk_size; - continue; - -error: - if (ret < 0) { - error("unable to write to device: %m"); - err = errno; - } else { - error("short write"); - err = -EIO; - } - } - } else if (async->start != BTRFS_SUPER_INFO_OFFSET) { - ret = write_data_to_disk(mdres->info, outbuf, async->start, size, 0); - if (ret) { - error("failed to write data"); - exit(1); - } - } - - - /* backup super blocks are already there at fixup_offset stage */ - if (!mdres->multi_devices && async->start == BTRFS_SUPER_INFO_OFFSET) - write_backup_supers(outfd, outbuf); - - if (err && !mdres->error) - mdres->error = err; mdres->num_items--; pthread_mutex_unlock(&mdres->mutex);