diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-22 13:43:26 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-22 13:43:26 +0300 |
commit | 0e6a48636e92226a43dc8bc58cea4484e9ecf84b (patch) | |
tree | eb697375fb9ecb34f153bf5109b35e4fc3d82010 | |
parent | 0ef4b6de1ddac63549ba289fe1cd0270386b5426 (diff) | |
download | glacier-0e6a48636e92226a43dc8bc58cea4484e9ecf84b.tar.gz glacier-0e6a48636e92226a43dc8bc58cea4484e9ecf84b.tar.bz2 |
Further fixes in get and sync
* lib/App/Glacier/Command.pm (glacier_eval): Check for return context
and return appropriate type.
* lib/App/Glacier/Command/Get.pm (run): Minor simplification.
(run): Delete failed jobs.
(download): Revamp multi-part download algorithm.
* lib/App/Glacier/Command/ListVault.pm (getopt): Use
Command::set_time_style_option to set the --time-style option.
* lib/App/Glacier/Command/Sync.pm (sync): Restart expired jobs.
(_sync): Actually store the changed DB records.
* lib/App/Glacier/DateTime.pm (format_can): New format: standard
* lib/App/Glacier/Job.pm (new): Set _job => undef, to avoid
spurious warnings.
(_get_job): Handle invalidate for cached values as well
(is_finished, status): New methods.
(forget): Rename to delete.
-rw-r--r-- | lib/App/Glacier/Command.pm | 26 | ||||
-rw-r--r-- | lib/App/Glacier/Command/Get.pm | 95 | ||||
-rw-r--r-- | lib/App/Glacier/Command/ListVault.pm | 12 | ||||
-rw-r--r-- | lib/App/Glacier/Command/Sync.pm | 26 | ||||
-rw-r--r-- | lib/App/Glacier/DateTime.pm | 1 | ||||
-rw-r--r-- | lib/App/Glacier/Job.pm | 27 |
6 files changed, 129 insertions, 58 deletions
diff --git a/lib/App/Glacier/Command.pm b/lib/App/Glacier/Command.pm index d72bd03..e46ed6e 100644 --- a/lib/App/Glacier/Command.pm +++ b/lib/App/Glacier/Command.pm @@ -288,7 +288,10 @@ sub run { sub glacier_eval { my $self = shift; my $method = shift; - my $ret = http_catch(sub { $self->{_glacier}->${\$method}(@_) }, + my $ret = http_catch(sub { + wantarray ? [ $self->{_glacier}->${\$method}(@_) ] + : $self->{_glacier}->${\$method}(@_) + }, err => \my %err, args => \@_); if (keys(%err)) { @@ -296,7 +299,7 @@ sub glacier_eval { } else { $self->{_last_http_err} = undef; } - return $ret; + return (wantarray && ref($ret) eq 'ARRAY') ? @$ret : $ret; } sub lasterr { @@ -345,6 +348,25 @@ sub getopt { }, %opts) or exit(EX_USAGE); } + +sub set_time_style_option { + my ($self, $style) = @_; + + eval { + use App::Glacier::DateTime; + my $x = new App::Glacier::DateTime(year=>1970); + $x->canned_format($style); + }; + if ($@) { + $self->abend(EX_USAGE, "unrecognized time style: $style"); + } + $self->{_options}{time_style} = $style; +} + +sub format_date_time { + my ($self, $obj, $field) = @_; + return $obj->{$field}->canned_format($self->{_options}{time_style}); +} sub usage_error { new App::Glacier::Command(usage_error => \@_); diff --git a/lib/App/Glacier/Command/Get.pm b/lib/App/Glacier/Command/Get.pm index dd3ca37..b00b658 100644 --- a/lib/App/Glacier/Command/Get.pm +++ b/lib/App/Glacier/Command/Get.pm @@ -57,6 +57,9 @@ sub run { or die "unexpected failure"; my ($filename, $ver) = ($+{file}, $+{ver}); + # Reset $ver and $filespec for error reporting + $ver = 1 unless defined $ver; + $filespec = "$filespec;$ver"; $localname = $filename unless defined($localname); @@ -78,7 +81,12 @@ sub run { $self->error("downloading file $filename initialized on", $job->get('CreationDate')->canned_format('full-iso')); $self->error("job id:", $job->id); - $self->error("current status:", $job->get('StatusCode')); + my ($status, $message) = $job->status; + $self->error("current status:", $status); + if ($message) { + $self->error("status message: $message\n"); + } + if ($job->is_completed) { $self->error("completed on", $job->get('CompletionDate')->canned_format('full-iso')); @@ -89,10 +97,18 @@ sub run { if ($job->is_completed) { $self->download($job, $localname); } else { - $self->abend(EX_TEMPFAIL, - "archive retrieval job for $vaultname:$filespec initiated at " . - $job->get('CreationDate')->canned_format - . "; please retry later to download the file"); + my ($status, $message) = $job->status; + if ($status eq 'InProgress') { + $self->abend(EX_TEMPFAIL, + "archive retrieval job for $vaultname:$filespec initiated at " . + $job->get('CreationDate')->canned_format + . "; please retry later to download the file"); + } else { + $self->error("archive retrieval job for $vaultname:$filespec: $status: $message"); + $self->error("deleting job", $job->id); + $job->delete; + exit (EX_FAILURE); + } } } @@ -109,8 +125,16 @@ sub download { truncate($fd, 0); my $archive_size = $job->get('ArchiveSizeInBytes'); - my $njobs = $self->{_options}{jobs} || 1; - my $part_size = $self->{_options}{part_size} || 10*1024*1024*1024; # FIXME + + my $njobs; + my $part_size; + if ($self->{_options}{jobs}) { + $njobs = $self->{_options}{jobs}; + $part_size = int(($archive_size + $njobs - 1) / $njobs); + } else { + $part_size = $self->{_options}{part_size} || 10*1024*1024*1024; # FIXME + $njobs = int(($archive_size + $part_size - 1) / $part_size); + } my $glacier = $self->{_glacier}; @@ -119,51 +143,42 @@ sub download { if ($njobs <= 1 || $archive_size < $part_size) { # simple download my $res; + $self->debug(1, "downloading", $job->file_name(1), "in single part"); ($res, $tree_hash) = $glacier->get_job_output($job->vault, $job->id); syswrite($fd, $res); } else { use Fcntl qw(SEEK_SET); - + $self->debug(1, - "downloading ".$job->file_name(1)." to $localname in chunks of $part_size bytes, in $njobs jobs"); + "downloading", $job->file_name(1), "to $localname in chunks of $part_size bytes, in $njobs jobs"); my @part_hashes = (); my $read_bytes; my $rest_size = $archive_size; my $off = 0; - my $part_idx = 0; - while ($rest_size) { - for (my $i = 0; $i < $njobs && $rest_size; $i++) { - if ($rest_size < $part_size) { - $part_size = $rest_size; - } - my ($thr) = threads->create( - sub { - my ($part_idx, $off) = @_; - my $range = $off . '-' . ($off + $part_size); - my ($res, $hash) = - $glacier->get_job_output($job->vault, - $job->id, $range); - lock $fd; - seek($fd, $off, SEEK_SET); - syswrite($fd, $res); - return ($part_idx, $hash); - }, - $part_idx, $off); - $part_idx++; - $off += $part_size; - $rest_size -= $part_size; + for (my $i = 0; $i < $njobs; $i++, $off += $part_size, $rest_size -= $part_size) { + if ($rest_size < $part_size) { + $part_size = $rest_size; } + my ($thr) = threads->create( + sub { + my ($part_idx, $off) = @_; + my $range = $off . '-' . ($off + $part_size); + my ($res, $hash) = + $glacier->get_job_output($job->vault, $job->id, $range); + lock $fd; + seek($fd, $off, SEEK_SET); + syswrite($fd, $res); + return ($part_idx, $hash); + }, + $i, $off); + } - $self->debug(2, "waiting for the bunch to finish"); - while (threads->list()) { - foreach my $thr (threads->list(threads::joinable)) { - # FIXME: error handling - my ($idx, $hash) = $thr->join() - or croak "thread $thr failed"; - $part_hashes[$idx] = $hash; - } - } + $self->debug(2, "waiting for download to finish"); + foreach my $thr (threads->list()) { + # FIXME: error handling + my ($idx, $hash) = $thr->join() or croak "thread $thr failed"; + $part_hashes[$idx] = $hash; } $tree_hash = $glacier->_tree_hash_from_array_ref(\@part_hashes); } diff --git a/lib/App/Glacier/Command/ListVault.pm b/lib/App/Glacier/Command/ListVault.pm index 8277d4b..96ffbd1 100644 --- a/lib/App/Glacier/Command/ListVault.pm +++ b/lib/App/Glacier/Command/ListVault.pm @@ -68,7 +68,7 @@ sub getopt { 'U' => sub { $self->{_options}{sort} = 'none' }, 'human-readable|h' => \$self->{_options}{h}, 'reverse|r' => \$self->{_options}{r}, - 'time-style=s' => \$self->{_options}{time_style}, + 'time-style=s' => sub { $self->set_time_style_option($_[1]) }, %opts); return $rc unless $rc; @@ -81,16 +81,6 @@ sub getopt { unless exists($sortfun->{$self->{_options}{sort}}); $self->{_options}{sort} = $sortfun->{$self->{_options}{sort}}; } - if (defined($self->{_options}{time_style})) { - eval { - my $x = new App::Glacier::DateTime(year=>1970); - $x->canned_format($self->{_options}{time_style}); - }; - if ($@) { - # FIXME: if ($@ =~ /unknown canned format/ - $self->abend(EX_USAGE, "unrecognized time style: $self->{_options}{time_style}"); - } - } } sub run { diff --git a/lib/App/Glacier/Command/Sync.pm b/lib/App/Glacier/Command/Sync.pm index ccca322..df797a9 100644 --- a/lib/App/Glacier/Command/Sync.pm +++ b/lib/App/Glacier/Command/Sync.pm @@ -40,7 +40,7 @@ sub run { sub sync { my ($self, $vault_name, %opts) = @_; - + my $dir = $self->directory($vault_name); $dir->invalidate if $opts{force}; my $job = new App::Glacier::Job::InventoryRetrieval( @@ -49,8 +49,21 @@ sub sync { if ($job->is_completed) { my $res = $self->glacier_eval('get_job_output', $vault_name, $job->id); if ($self->lasterr) { - $self->abend(EX_FAILURE, "can't list vault $vault_name: ", - $self->last_error_message); + if ($self->lasterr('code') == 404 && !$opts{force}) { + if ($opts{restart}) { + $self->abend(EX_FAILURE, + "unexpected error after restart:", + $self->last_error_message); + } + # Job expired, delete it + # ('mesg' => 'The job ID was not found...) + $opts{force} = 1; + return $self->sync($vault_name, %opts); + } else { + # FIXME + $self->abend(EX_FAILURE, "can't list vault $vault_name: ", + $self->last_error_message); + } } $res = decode_json($res); $self->_sync($dir, [map { timestamp_unserialize($_) } @@ -80,6 +93,7 @@ sub _sync { $dir->foreach(sub { my ($key, $val) = @_; + my $mod = 0; for (my $i = 0; $i <= $#{$val}; ) { if (exists($arch{$val->[$i]{ArchiveId}})) { $self->debug(1, "found $key;".($i+1)); @@ -87,6 +101,7 @@ sub _sync { unless (exists($val->[$i]{$k})) { $self->debug(1, "$key;".($i+1).": updating $k"); $val->[$i]{$k} = $v; + $mod = 1; } } delete $arch{$val->[$i]{ArchiveId}}; @@ -94,7 +109,12 @@ sub _sync { } elsif ($delete) { $self->debug(1, "deleting $key;".($i+1)); splice(@{$val}, $i, 1); + $mod = 1; + } else { + $self->debug(1, "$key;".($i+1),"not found"); + $i++; } + $dir->store($key, $val) if $mod; } if ($delete && @{$val} == 0) { $self->debug(1, "deleting $key"); diff --git a/lib/App/Glacier/DateTime.pm b/lib/App/Glacier/DateTime.pm index ca7d61b..da6b60d 100644 --- a/lib/App/Glacier/DateTime.pm +++ b/lib/App/Glacier/DateTime.pm @@ -73,6 +73,7 @@ my %format_can = ( iso => \&_fmt_iso, 'long-iso' => '%Y-%m-%d %H:%M', 'full-iso' => '%Y-%m-%d %H:%M:%S.%N %z', + 'standard' => '%Y-%m-%dT%H:%M:%SZ', locale => '%c' ); diff --git a/lib/App/Glacier/Job.pm b/lib/App/Glacier/Job.pm index 85d76ec..4cd3de5 100644 --- a/lib/App/Glacier/Job.pm +++ b/lib/App/Glacier/Job.pm @@ -22,6 +22,7 @@ sub new { _init => $init, _vault => $vault, _key => $key, + _job => undef, _invalidate => $invalidate }, $class; } @@ -32,8 +33,16 @@ sub _get_db { sub _get_job { my ($self) = @_; + my $db = $self->_get_db; + + if ($self->{_job}) { + if ($self->{_invalidate}) { + $db->delete($self->{_key}); + $self->{_job} = undef; + } + } + unless ($self->{_job}) { - my $db = $self->_get_db; my $job = $db->retrieve($self->{_key}) unless $self->{_invalidate}; if (!$job) { $self->debug(2, "initiating job $self->{_key}"); @@ -99,18 +108,32 @@ sub get { return $job->{$key}; } +sub is_finished { + my $self = shift; + my $db = $self->_get_db; + return defined($self->get('StatusCode')); +} + sub is_completed { my $self = shift; my $db = $self->_get_db; return ($self->get('StatusCode') || '') eq 'Succeeded'; } +sub status { + my $self = shift; + my $db = $self->_get_db; + my $status = $self->get('StatusCode'); + return undef unless defined $status; + return wantarray ? ($status, $self->get('StatusMessage')) : $status; +} + sub vault { my $self = shift; return $self->{_vault}; } -sub forget { +sub delete { my $self = shift; my $db = $self->_get_db; $db->delete($self->{_key}); |