aboutsummaryrefslogtreecommitdiff
path: root/lib/App
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org.ua>2017-05-22 19:44:12 +0300
committerSergey Poznyakoff <gray@gnu.org.ua>2017-05-22 22:17:57 +0300
commitba211551ca112afe9d19221c60afba0ce493526c (patch)
tree2fec900f5503c81957f4bac6cfd9cd5da681aa51 /lib/App
parent0e6a48636e92226a43dc8bc58cea4484e9ecf84b (diff)
downloadglacier-ba211551ca112afe9d19221c60afba0ce493526c.tar.gz
glacier-ba211551ca112afe9d19221c60afba0ce493526c.tar.bz2
Fix multipart upload & download
* lib/App/Glacier/Command/Get.pm: Fix chunk alignment and the use of shared variables. * lib/App/Glacier/Command/ListVault.pm: Fix typo. * lib/App/Glacier/Command/Put.pm (_upload_multipart): Rewrite. * lib/App/Glacier/DB/GDBM.pm: Don't keep the DB mapped through the lifetime of the object, as this interacts badly with the threaded code (it causes coredumps when the copies of the object are destroyed in their corresponding threads, which means that the underlying gdbm_close gets called multiple times on the same GDBM structure. See comment to the _tied method, for details).
Diffstat (limited to 'lib/App')
-rw-r--r--lib/App/Glacier/Command/Get.pm43
-rw-r--r--lib/App/Glacier/Command/ListVault.pm2
-rw-r--r--lib/App/Glacier/Command/Put.pm104
-rw-r--r--lib/App/Glacier/DB/GDBM.pm55
4 files changed, 123 insertions, 81 deletions
diff --git a/lib/App/Glacier/Command/Get.pm b/lib/App/Glacier/Command/Get.pm
index b00b658..09e739f 100644
--- a/lib/App/Glacier/Command/Get.pm
+++ b/lib/App/Glacier/Command/Get.pm
@@ -78,18 +78,18 @@ sub run {
$filename, $ver);
if ($self->{_options}{test}) {
- $self->error("downloading file $filename initialized on",
- $job->get('CreationDate')->canned_format('full-iso'));
- $self->error("job id:", $job->id);
+ print "downloading file $filename initialized on",
+ $job->get('CreationDate')->canned_format('full-iso'),"\n";
+ print "job id: ", $job->id, "\n";
my ($status, $message) = $job->status;
- $self->error("current status:", $status);
+ print "current status: $status\n";
if ($message) {
- $self->error("status message: $message\n");
+ print "status message: $message\n";
}
if ($job->is_completed) {
- $self->error("completed on",
- $job->get('CompletionDate')->canned_format('full-iso'));
+ print "completed on ",
+ $job->get('CompletionDate')->canned_format('full-iso'),"\n";
}
exit(0);
}
@@ -112,13 +112,16 @@ sub run {
}
}
+use constant MB => 1024*1024;
+use constant TWOMB => 2*MB;
+
sub download {
my ($self, $job, $localname) = @_;
use threads;
use threads::shared;
- my $fd :shared;
+ my $fd;
open($fd, '>', $localname)
or $self->abort(EX_FAILURE, "can't open $localname: $!");
binmode($fd);
@@ -130,12 +133,18 @@ sub download {
my $part_size;
if ($self->{_options}{jobs}) {
$njobs = $self->{_options}{jobs};
+ # Compute approximate part size
$part_size = int(($archive_size + $njobs - 1) / $njobs);
} else {
- $part_size = $self->{_options}{part_size} || 10*1024*1024*1024; # FIXME
- $njobs = int(($archive_size + $part_size - 1) / $part_size);
+ $part_size = ($self->{_options}{part_size} || 10 * MB);
}
+ # Make sure the chunk is Tree-Hash aligned
+ # http://docs.aws.amazon.com/amazonglacier/latest/dev/checksum-calculations-range.html?shortFooter=true#checksum-calculations-upload-archive-with-ranges
+ $part_size = TWOMB * 2 ** int(log($part_size / TWOMB) / log(2));
+ # Adjust the number of jobs
+ $njobs = int(($archive_size + $part_size - 1) / $part_size);
+
my $glacier = $self->{_glacier};
my $tree_hash;
@@ -152,7 +161,7 @@ sub download {
$self->debug(1,
"downloading", $job->file_name(1), "to $localname in chunks of $part_size bytes, in $njobs jobs");
- my @part_hashes = ();
+ my @part_hashes :shared = ();
my $read_bytes;
my $rest_size = $archive_size;
my $off = 0;
@@ -163,13 +172,14 @@ sub download {
my ($thr) = threads->create(
sub {
my ($part_idx, $off) = @_;
- my $range = $off . '-' . ($off + $part_size);
+ my $range = 'bytes=' . $off . '-' . ($off + $part_size - 1);
my ($res, $hash) =
$glacier->get_job_output($job->vault, $job->id, $range);
- lock $fd;
+ lock @part_hashes;
seek($fd, $off, SEEK_SET);
syswrite($fd, $res);
- return ($part_idx, $hash);
+ $part_hashes[$part_idx] = $hash;
+ return 1;
},
$i, $off);
}
@@ -177,17 +187,18 @@ sub download {
$self->debug(2, "waiting for download to finish");
foreach my $thr (threads->list()) {
# FIXME: error handling
- my ($idx, $hash) = $thr->join() or croak "thread $thr failed";
- $part_hashes[$idx] = $hash;
+ $thr->join() or croak "thread $thr failed";
}
$tree_hash = $glacier->_tree_hash_from_array_ref(\@part_hashes);
}
close($fd);
+# print $tree_hash. ' ', $job->get('ArchiveSHA256TreeHash') , "\n";
if ($tree_hash ne $job->get('ArchiveSHA256TreeHash')) {
unlink $localname;
$self->abend(EX_SOFTWARE, "downloaded file is corrupt");
}
+ print "finished\n";
}
1;
diff --git a/lib/App/Glacier/Command/ListVault.pm b/lib/App/Glacier/Command/ListVault.pm
index 96ffbd1..10fa755 100644
--- a/lib/App/Glacier/Command/ListVault.pm
+++ b/lib/App/Glacier/Command/ListVault.pm
@@ -77,7 +77,7 @@ sub getopt {
if (defined($self->{_options}{sort})) {
my $sortfun = $self->{_options}{d}
? \%sort_vaults : \%sort_archives;
- $self->abend(EX_USAGE, "unknowns sort field")
+ $self->abend(EX_USAGE, "unknown sort field")
unless exists($sortfun->{$self->{_options}{sort}});
$self->{_options}{sort} = $sortfun->{$self->{_options}{sort}};
}
diff --git a/lib/App/Glacier/Command/Put.pm b/lib/App/Glacier/Command/Put.pm
index 200ccda..d0e6cc0 100644
--- a/lib/App/Glacier/Command/Put.pm
+++ b/lib/App/Glacier/Command/Put.pm
@@ -75,6 +75,9 @@ sub _upload_multipart {
my ($self, $vaultname, $localname, $remotename) = @_;
my $glacier = $self->{_glacier};
+ use threads;
+ use threads::shared;
+
my $archive_size = -s $localname;
my $part_size =
$glacier->calculate_multipart_upload_partsize($archive_size);
@@ -82,67 +85,82 @@ sub _upload_multipart {
$self->abend(EX_FAILURE, "$localname is too big for upload")
if $part_size == 0;
+ # Number of parts to upload:
+ my $total_parts = int(($archive_size + $part_size - 1) / $part_size);
+
+ # Compute number of threads
my $njobs = $self->{_options}{jobs};
- unless (defined($njobs)) {
- $njobs = $archive_size / $part_size;
- if ($njobs > 128) {
- $njobs = 128;
- }
+ if (defined($njobs)) {
+ # Allow at most so many jobs as there are parts
+ $njobs = $total_parts if $njobs > $total_parts;
+ } else {
+ # Select default.
+ $njobs = $total_parts < 16 ? $total_parts : 16;
}
+ # Number of parts to upload by each job;
+ my $job_parts = int(($total_parts + $njobs - 1) / $njobs);
+
$self->debug(1,
"uploading $localname in chunks of $part_size bytes, in $njobs jobs");
open(my $fd, '<', $localname)
or $self->abort(EX_FAILURE, "can't open $localname: $!");
+ binmode($fd);
my $upload_id = $glacier->multipart_upload_init($vaultname, $part_size,
$remotename);
$self->debug(1, "Upload ID: $upload_id");
- use threads;
use Fcntl qw(SEEK_SET);
- my @part_hashes = ();
- my $part_idx = 0;
- my $read_bytes;
- my $rest_size = $archive_size;
- my $off = 0;
- while ($rest_size) {
- for (my $i = 0; $i < $njobs && $rest_size; $i++) {
- my ($thr) = threads->create(
- sub {
- my ($part_idx, $off) = @_;
-
+
+ my @part_hashes :shared = ();
+
+ for (my $i = 0; $i < $njobs; $i++) {
+ my $thr = threads->create(
+ sub {
+ my ($job_idx) = @_;
+ # Number of part to start from
+ my $part_idx = $job_idx * $job_parts;
+ # Offset in file
+ my $off = $part_idx * $part_size;
+ # Return value
+ my @ret;
+
+ for (my $j = 0; $j < $job_parts;
+ $j++, $part_idx++, $off += $part_size) {
+ last if $off >= $archive_size;
my $part;
- seek($fd, $off, SEEK_SET);
- $read_bytes = read($fd, $part, $part_size);
+ {
+ lock @part_hashes;
+ seek($fd, $off, SEEK_SET);
+ my $rb = sysread($fd, $part, $part_size);
+ if ($rb == 0) {
+ $self->abend(EX_OSERR,
+ "failed to read part $part_idx: $!");
+ }
+ }
+
# FIXME: use http_catch
- my $res = $glacier->multipart_upload_upload_part($vaultname,
- $upload_id,
- $part_size,
- $part_idx,
- \$part);
- return ($part_idx, $res);
- },
- $part_idx, $off);
- $part_idx++;
- $off += $part_size;
- if ($rest_size < $part_size) {
- $part_size = $rest_size;
- }
- $rest_size -= $part_size;
- }
-
- $self->debug(2, "waiting for the bunch to finish");
- while (threads->list()) {
- foreach my $thr (threads->list(threads::joinable)) {
- # FIXME: error handling, see the note about http_catch above
- my ($idx, $hash) = $thr->join() or croak "thread $thr failed";
- $part_hashes[$idx] = $hash;
- }
- }
+ my $res = $glacier->multipart_upload_upload_part(
+ $vaultname,
+ $upload_id,
+ $part_size,
+ $part_idx,
+ \$part);
+ $part_hashes[$part_idx] = $res;
+ }
+ return 1;
+ }, $i);
+ }
+
+ $self->debug(2, "waiting for dowload to finish");
+ foreach my $thr (threads->list) {
+ # FIXME: error handling, see the note about http_catch above
+ $thr->join() or croak "thread $thr failed";
}
# Capture archive id or error code
+ $self->debug(2, "finalizing the upload");
my $archive_id = $self->glacier_eval('multipart_upload_complete',
$vaultname, $upload_id,
\@part_hashes,
diff --git a/lib/App/Glacier/DB/GDBM.pm b/lib/App/Glacier/DB/GDBM.pm
index 2e75b59..5976515 100644
--- a/lib/App/Glacier/DB/GDBM.pm
+++ b/lib/App/Glacier/DB/GDBM.pm
@@ -6,62 +6,75 @@ use parent qw(App::Glacier::DB);
use GDBM_File;
use Carp;
-my %dbtab;
-
sub new {
my $class = shift;
my $filename = shift;
local %_ = @_;
my $mode = delete $_{mode} || 0644;
- unless (exists($dbtab{$filename})) {
- my %map;
- tie %map, 'GDBM_File', $filename, GDBM_WRCREAT, $mode;
- $dbtab{$filename} = \%map;
- }
my $self = $class->SUPER::new(%_);
$self->{_filename} = $filename;
- $self->{_map} = $dbtab{$filename};
+ $self->{_mode} = $mode;
+ $self->{_nref} = 0;
return $self;
}
+# We can't tie the DB to $self->{_map} at once, in the new method, because
+# this will cause coredumps in threaded code (see
+# https://rt.perl.org/Public/Bug/Display.html?id=61912). So, the following
+# auxiliary method is used, which calls &$code with $self->{_mode} tied
+# to the DB.
+sub _tied {
+ my ($self, $code) = @_;
+ croak "argument must be a CODE ref" unless ref($code) eq 'CODE';
+ if ($self->{_nref}++ == 0) {
+ tie %{$self->{_map}}, 'GDBM_File', $self->{_filename}, GDBM_WRCREAT, $self->{_mode};
+ }
+ my $ret = wantarray ? [ &{$code}() ] : &{$code}();
+ if (--$self->{_nref} == 0) {
+ untie %{$self->{_map}};
+ }
+ return wantarray ? @$ret : $ret;
+}
+
sub drop {
my ($self) = @_;
my $filename = $self->{_filename};
unlink $filename or carp "can't unlink $filename: $!";
- untie %{$dbtab{$filename}};
- delete $dbtab{$filename};
- delete $self->{_map};
}
sub has {
my ($self, $key) = @_;
- return exists($self->{_map}{$key});
+ return $self->_tied(sub { exists($self->{_map}{$key}) });
}
sub retrieve {
my ($self, $key) = @_;
- return undef unless $self->has($key);
- return $self->decode($self->{_map}{$key});
+ return $self->_tied(sub {
+ return undef unless exists $self->{_map}{$key};
+ return $self->decode($self->{_map}{$key});
+ });
}
sub store {
my ($self, $key, $val) = @_;
- $self->{_map}{$key} = $self->encode($val);
-# use Data::Dumper;
-# print "stored $key", Dumper([$self->{_map}{$key}]);
+ return $self->_tied(sub {
+ $self->{_map}{$key} = $self->encode($val);
+ });
}
sub delete {
my ($self, $key) = @_;
- delete $self->{_map}{$key};
+ $self->_tied(sub { delete $self->{_map}{$key} });
}
sub foreach {
my ($self, $code) = @_;
croak "argument must be a CODE" unless ref($code) eq 'CODE';
- while (my ($key, $val) = each %{$self->{_map}}) {
- &{$code}($key, $self->decode($val));
- }
+ $self->_tied(sub {
+ while (my ($key, $val) = each %{$self->{_map}}) {
+ &{$code}($key, $self->decode($val));
+ }
+ });
}
1;

Return to:

Send suggestions and report system problems to the System administrator.