diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-22 19:44:12 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-22 22:17:57 +0300 |
commit | ba211551ca112afe9d19221c60afba0ce493526c (patch) | |
tree | 2fec900f5503c81957f4bac6cfd9cd5da681aa51 /lib/App | |
parent | 0e6a48636e92226a43dc8bc58cea4484e9ecf84b (diff) | |
download | glacier-ba211551ca112afe9d19221c60afba0ce493526c.tar.gz glacier-ba211551ca112afe9d19221c60afba0ce493526c.tar.bz2 |
Fix multipart upload & download
* lib/App/Glacier/Command/Get.pm: Fix chunk alignment and the use of shared
variables.
* lib/App/Glacier/Command/ListVault.pm: Fix typo.
* lib/App/Glacier/Command/Put.pm (_upload_multipart): Rewrite.
* lib/App/Glacier/DB/GDBM.pm: Don't keep the DB mapped through the
lifetime of the object, as this interacts badly with the threaded
code (it causes coredumps when the copies of the object are destroyed
in their corresponding threads, which means that the underlying
gdbm_close gets called multiple times on the same GDBM structure. See
comment to the _tied method, for details).
Diffstat (limited to 'lib/App')
-rw-r--r-- | lib/App/Glacier/Command/Get.pm | 43 | ||||
-rw-r--r-- | lib/App/Glacier/Command/ListVault.pm | 2 | ||||
-rw-r--r-- | lib/App/Glacier/Command/Put.pm | 104 | ||||
-rw-r--r-- | lib/App/Glacier/DB/GDBM.pm | 55 |
4 files changed, 123 insertions, 81 deletions
diff --git a/lib/App/Glacier/Command/Get.pm b/lib/App/Glacier/Command/Get.pm index b00b658..09e739f 100644 --- a/lib/App/Glacier/Command/Get.pm +++ b/lib/App/Glacier/Command/Get.pm @@ -78,18 +78,18 @@ sub run { $filename, $ver); if ($self->{_options}{test}) { - $self->error("downloading file $filename initialized on", - $job->get('CreationDate')->canned_format('full-iso')); - $self->error("job id:", $job->id); + print "downloading file $filename initialized on", + $job->get('CreationDate')->canned_format('full-iso'),"\n"; + print "job id: ", $job->id, "\n"; my ($status, $message) = $job->status; - $self->error("current status:", $status); + print "current status: $status\n"; if ($message) { - $self->error("status message: $message\n"); + print "status message: $message\n"; } if ($job->is_completed) { - $self->error("completed on", - $job->get('CompletionDate')->canned_format('full-iso')); + print "completed on ", + $job->get('CompletionDate')->canned_format('full-iso'),"\n"; } exit(0); } @@ -112,13 +112,16 @@ sub run { } } +use constant MB => 1024*1024; +use constant TWOMB => 2*MB; + sub download { my ($self, $job, $localname) = @_; use threads; use threads::shared; - my $fd :shared; + my $fd; open($fd, '>', $localname) or $self->abort(EX_FAILURE, "can't open $localname: $!"); binmode($fd); @@ -130,12 +133,18 @@ sub download { my $part_size; if ($self->{_options}{jobs}) { $njobs = $self->{_options}{jobs}; + # Compute approximate part size $part_size = int(($archive_size + $njobs - 1) / $njobs); } else { - $part_size = $self->{_options}{part_size} || 10*1024*1024*1024; # FIXME - $njobs = int(($archive_size + $part_size - 1) / $part_size); + $part_size = ($self->{_options}{part_size} || 10 * MB); } + # Make sure the chunk is Tree-Hash aligned + # http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations-range.html?shortFooter=true#checksum-calculations-upload-archive-with-ranges + $part_size = TWOMB * 2 ** int(log($part_size / TWOMB) / log(2)); + # Adjust the number of jobs + $njobs = int(($archive_size + $part_size - 1) / $part_size); + my $glacier = $self->{_glacier}; my $tree_hash; @@ -152,7 +161,7 @@ sub download { $self->debug(1, "downloading", $job->file_name(1), "to $localname in chunks of $part_size bytes, in $njobs jobs"); - my @part_hashes = (); + my @part_hashes :shared = (); my $read_bytes; my $rest_size = $archive_size; my $off = 0; @@ -163,13 +172,14 @@ sub download { my ($thr) = threads->create( sub { my ($part_idx, $off) = @_; - my $range = $off . '-' . ($off + $part_size); + my $range = 'bytes=' . $off . '-' . ($off + $part_size - 1); my ($res, $hash) = $glacier->get_job_output($job->vault, $job->id, $range); - lock $fd; + lock @part_hashes; seek($fd, $off, SEEK_SET); syswrite($fd, $res); - return ($part_idx, $hash); + $part_hashes[$part_idx] = $hash; + return 1; }, $i, $off); } @@ -177,17 +187,18 @@ sub download { $self->debug(2, "waiting for download to finish"); foreach my $thr (threads->list()) { # FIXME: error handling - my ($idx, $hash) = $thr->join() or croak "thread $thr failed"; - $part_hashes[$idx] = $hash; + $thr->join() or croak "thread $thr failed"; } $tree_hash = $glacier->_tree_hash_from_array_ref(\@part_hashes); } close($fd); +# print $tree_hash. ' ', $job->get('ArchiveSHA256TreeHash') , "\n"; if ($tree_hash ne $job->get('ArchiveSHA256TreeHash')) { unlink $localname; $self->abend(EX_SOFTWARE, "downloaded file is corrupt"); } + print "finished\n"; } 1; diff --git a/lib/App/Glacier/Command/ListVault.pm b/lib/App/Glacier/Command/ListVault.pm index 96ffbd1..10fa755 100644 --- a/lib/App/Glacier/Command/ListVault.pm +++ b/lib/App/Glacier/Command/ListVault.pm @@ -77,7 +77,7 @@ sub getopt { if (defined($self->{_options}{sort})) { my $sortfun = $self->{_options}{d} ? \%sort_vaults : \%sort_archives; - $self->abend(EX_USAGE, "unknowns sort field") + $self->abend(EX_USAGE, "unknown sort field") unless exists($sortfun->{$self->{_options}{sort}}); $self->{_options}{sort} = $sortfun->{$self->{_options}{sort}}; } diff --git a/lib/App/Glacier/Command/Put.pm b/lib/App/Glacier/Command/Put.pm index 200ccda..d0e6cc0 100644 --- a/lib/App/Glacier/Command/Put.pm +++ b/lib/App/Glacier/Command/Put.pm @@ -75,6 +75,9 @@ sub _upload_multipart { my ($self, $vaultname, $localname, $remotename) = @_; my $glacier = $self->{_glacier}; + use threads; + use threads::shared; + my $archive_size = -s $localname; my $part_size = $glacier->calculate_multipart_upload_partsize($archive_size); @@ -82,67 +85,82 @@ sub _upload_multipart { $self->abend(EX_FAILURE, "$localname is too big for upload") if $part_size == 0; + # Number of parts to upload: + my $total_parts = int(($archive_size + $part_size - 1) / $part_size); + + # Compute number of threads my $njobs = $self->{_options}{jobs}; - unless (defined($njobs)) { - $njobs = $archive_size / $part_size; - if ($njobs > 128) { - $njobs = 128; - } + if (defined($njobs)) { + # Allow at most so many jobs as there are parts + $njobs = $total_parts if $njobs > $total_parts; + } else { + # Select default. + $njobs = $total_parts < 16 ? $total_parts : 16; } + # Number of parts to upload by each job; + my $job_parts = int(($total_parts + $njobs - 1) / $njobs); + $self->debug(1, "uploading $localname in chunks of $part_size bytes, in $njobs jobs"); open(my $fd, '<', $localname) or $self->abort(EX_FAILURE, "can't open $localname: $!"); + binmode($fd); my $upload_id = $glacier->multipart_upload_init($vaultname, $part_size, $remotename); $self->debug(1, "Upload ID: $upload_id"); - use threads; use Fcntl qw(SEEK_SET); - my @part_hashes = (); - my $part_idx = 0; - my $read_bytes; - my $rest_size = $archive_size; - my $off = 0; - while ($rest_size) { - for (my $i = 0; $i < $njobs && $rest_size; $i++) { - my ($thr) = threads->create( - sub { - my ($part_idx, $off) = @_; - + + my @part_hashes :shared = (); + + for (my $i = 0; $i < $njobs; $i++) { + my $thr = threads->create( + sub { + my ($job_idx) = @_; + # Number of part to start from + my $part_idx = $job_idx * $job_parts; + # Offset in file + my $off = $part_idx * $part_size; + # Return value + my @ret; + + for (my $j = 0; $j < $job_parts; + $j++, $part_idx++, $off += $part_size) { + last if $off >= $archive_size; my $part; - seek($fd, $off, SEEK_SET); - $read_bytes = read($fd, $part, $part_size); + { + lock @part_hashes; + seek($fd, $off, SEEK_SET); + my $rb = sysread($fd, $part, $part_size); + if ($rb == 0) { + $self->abend(EX_OSERR, + "failed to read part $part_idx: $!"); + } + } + # FIXME: use http_catch - my $res = $glacier->multipart_upload_upload_part($vaultname, - $upload_id, - $part_size, - $part_idx, - \$part); - return ($part_idx, $res); - }, - $part_idx, $off); - $part_idx++; - $off += $part_size; - if ($rest_size < $part_size) { - $part_size = $rest_size; - } - $rest_size -= $part_size; - } - - $self->debug(2, "waiting for the bunch to finish"); - while (threads->list()) { - foreach my $thr (threads->list(threads::joinable)) { - # FIXME: error handling, see the note about http_catch above - my ($idx, $hash) = $thr->join() or croak "thread $thr failed"; - $part_hashes[$idx] = $hash; - } - } + my $res = $glacier->multipart_upload_upload_part( + $vaultname, + $upload_id, + $part_size, + $part_idx, + \$part); + $part_hashes[$part_idx] = $res; + } + return 1; + }, $i); + } + + $self->debug(2, "waiting for dowload to finish"); + foreach my $thr (threads->list) { + # FIXME: error handling, see the note about http_catch above + $thr->join() or croak "thread $thr failed"; } # Capture archive id or error code + $self->debug(2, "finalizing the upload"); my $archive_id = $self->glacier_eval('multipart_upload_complete', $vaultname, $upload_id, \@part_hashes, diff --git a/lib/App/Glacier/DB/GDBM.pm b/lib/App/Glacier/DB/GDBM.pm index 2e75b59..5976515 100644 --- a/lib/App/Glacier/DB/GDBM.pm +++ b/lib/App/Glacier/DB/GDBM.pm @@ -6,62 +6,75 @@ use parent qw(App::Glacier::DB); use GDBM_File; use Carp; -my %dbtab; - sub new { my $class = shift; my $filename = shift; local %_ = @_; my $mode = delete $_{mode} || 0644; - unless (exists($dbtab{$filename})) { - my %map; - tie %map, 'GDBM_File', $filename, GDBM_WRCREAT, $mode; - $dbtab{$filename} = \%map; - } my $self = $class->SUPER::new(%_); $self->{_filename} = $filename; - $self->{_map} = $dbtab{$filename}; + $self->{_mode} = $mode; + $self->{_nref} = 0; return $self; } +# We can't tie the DB to $self->{_map} at once, in the new method, because +# this will cause coredumps in threaded code (see +# https://rt.perl.org/Public/Bug/Display.html?id=61912). So, the following +# auxiliary method is used, which calls &$code with $self->{_mode} tied +# to the DB. +sub _tied { + my ($self, $code) = @_; + croak "argument must be a CODE ref" unless ref($code) eq 'CODE'; + if ($self->{_nref}++ == 0) { + tie %{$self->{_map}}, 'GDBM_File', $self->{_filename}, GDBM_WRCREAT, $self->{_mode}; + } + my $ret = wantarray ? [ &{$code}() ] : &{$code}(); + if (--$self->{_nref} == 0) { + untie %{$self->{_map}}; + } + return wantarray ? @$ret : $ret; +} + sub drop { my ($self) = @_; my $filename = $self->{_filename}; unlink $filename or carp "can't unlink $filename: $!"; - untie %{$dbtab{$filename}}; - delete $dbtab{$filename}; - delete $self->{_map}; } sub has { my ($self, $key) = @_; - return exists($self->{_map}{$key}); + return $self->_tied(sub { exists($self->{_map}{$key}) }); } sub retrieve { my ($self, $key) = @_; - return undef unless $self->has($key); - return $self->decode($self->{_map}{$key}); + return $self->_tied(sub { + return undef unless exists $self->{_map}{$key}; + return $self->decode($self->{_map}{$key}); + }); } sub store { my ($self, $key, $val) = @_; - $self->{_map}{$key} = $self->encode($val); -# use Data::Dumper; -# print "stored $key", Dumper([$self->{_map}{$key}]); + return $self->_tied(sub { + $self->{_map}{$key} = $self->encode($val); + }); } sub delete { my ($self, $key) = @_; - delete $self->{_map}{$key}; + $self->_tied(sub { delete $self->{_map}{$key} }); } sub foreach { my ($self, $code) = @_; croak "argument must be a CODE" unless ref($code) eq 'CODE'; - while (my ($key, $val) = each %{$self->{_map}}) { - &{$code}($key, $self->decode($val)); - } + $self->_tied(sub { + while (my ($key, $val) = each %{$self->{_map}}) { + &{$code}($key, $self->decode($val)); + } + }); } 1; |