diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-19 16:29:59 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2017-05-19 16:29:59 +0300 |
commit | cbf0afdd4234d541bb3b33c9120bdb3c5ecb7962 (patch) | |
tree | 5fb3f30e9796d0d7da0ef80b5c17fd4c44880d5f /lib/App | |
parent | f0f8633db743bb3edf490ec0ae6035acce38be97 (diff) | |
download | glacier-cbf0afdd4234d541bb3b33c9120bdb3c5ecb7962.tar.gz glacier-cbf0afdd4234d541bb3b33c9120bdb3c5ecb7962.tar.bz2 |
Implement the put command
* glacier: Register the put command
* lib/App/Glacier/Command.pm (multipart-upload-size): New configuration
parameter.
* lib/App/Glacier/Command/ListVault.pm: Remove useless sub
* lib/App/Glacier/Command/Put.pm: New file.
* lib/App/Glacier/DateTime.pm (new): New constructor.
* lib/App/Glacier/Directory.pm (info): Rewrite as accessor
(set_info, is_dirty, clear_dirty, invalidate): New methods.
(update_sync_time): Clear the Dirty flag.
(status): Return DIR_OUTDATED if Dirty is set.
* lib/App/Glacier/Job.pm: New keyword argument "invalidate".
(_get_job): Initiate new job if _invalidate is set.
* lib/App/Glacier/Job/InventoryRetrieval.pm (new): Pass all arguments
to the super-class.
Diffstat (limited to 'lib/App')
-rw-r--r-- | lib/App/Glacier/Command.pm | 4 | ||||
-rw-r--r-- | lib/App/Glacier/Command/ListVault.pm | 20 | ||||
-rw-r--r-- | lib/App/Glacier/Command/Put.pm | 167 | ||||
-rw-r--r-- | lib/App/Glacier/Command/Sync.pm | 2 | ||||
-rw-r--r-- | lib/App/Glacier/DateTime.pm | 14 | ||||
-rw-r--r-- | lib/App/Glacier/Directory.pm | 38 | ||||
-rw-r--r-- | lib/App/Glacier/Job.pm | 15 | ||||
-rw-r--r-- | lib/App/Glacier/Job/InventoryRetrieval.pm | 10 |
8 files changed, 230 insertions, 40 deletions
diff --git a/lib/App/Glacier/Command.pm b/lib/App/Glacier/Command.pm index e049d94..e1524e7 100644 --- a/lib/App/Glacier/Command.pm +++ b/lib/App/Glacier/Command.pm @@ -73,6 +73,8 @@ my %parameters = ( access => 1, secret => 1, region => 1, + 'multipart-upload-size' => { default => 100*1024*1024, + check => \&ck_number }, } }, database => { @@ -91,7 +93,7 @@ my %parameters = ( } } } - } + } ); sub new { diff --git a/lib/App/Glacier/Command/ListVault.pm b/lib/App/Glacier/Command/ListVault.pm index 1df44b6..fb26698 100644 --- a/lib/App/Glacier/Command/ListVault.pm +++ b/lib/App/Glacier/Command/ListVault.pm @@ -190,26 +190,6 @@ sub show_archive { } } -use constant { - CACHE_OK => 0, - CACHE_INIT => 1, - CACHE_UPDATE => 2 -}; - -sub _check_dir { - my ($self, $dir) = @_; - - if (defined($dir->last_sync_time)) { - if (time - $dir->last_sync_time > - $self->cfget(qw(database inv ttl))) { - return CACHE_UPDATE; - } - } else { - return CACHE_INIT; - } - return CACHE_OK; -} - sub get_vault_inventory { my ($self, $vault_name, @file_list) = @_; my $dir = $self->directory($vault_name); diff --git a/lib/App/Glacier/Command/Put.pm b/lib/App/Glacier/Command/Put.pm new file mode 100644 index 0000000..2bb8a45 --- /dev/null +++ b/lib/App/Glacier/Command/Put.pm @@ -0,0 +1,167 @@ +package App::Glacier::Command::Put; +use strict; +use warnings; +use App::Glacier::Command; +use App::Glacier::DateTime; +use parent qw(App::Glacier::Command); +use File::Basename; +use Carp; + +=head1 NAME + +glacier put - upload file to a vault + +=head1 SYNOPSIS + +B<glacier put> +[B<-j> I<NJOBS>] +[B<--jobs=>I<NJOBS>] +I<VAULT> +I<FILE> +[I<REMOTENAME>] + +=cut + +sub getopt { + my ($self, %opts) = @_; + return $self->SUPER::getopt('jobs|j=n' => \$self->{_options}{jobs}, + %opts); +} + +sub run { + my ($self, $vaultname, $localname, $remotename) = @_; + $self->abend(EX_USAGE, "two or three arguments expected") + unless @_ == 2 || @_ == 3; + + $remotename = basename($localname); + $self->_update($vaultname, $localname, $remotename); + # Initiate inventory retrieval + my $job = new App::Glacier::Job::InventoryRetrieval($self, $vault_name, + invalidate => 1); +} + +sub _update { + my ($self, $vaultname, $localname, $remotename) = @_; + my $size = -s $localname + or $self->abend(EX_USAGE, "can't stat \"$localname\": $!"); + + my $dir = $self->directory($vaultname); + my $id = ($size < $self->cfget(qw(glacier multipart-upload-size))) + ? $self->_upload_simple($vaultname, $localname, $remotename) + : $self->_upload_multipart($vaultname, $localname, $remotename); + $dir->add_version($remotename, { ArchiveId => $id, + Size => $size, + CreationDate => new App::Glacier::DateTime, + ArchiveDescription => $remotename }); + $dir->invalidate; +} + +sub _upload_simple { + my ($self, $vaultname, $localname, $remotename) = @_; + + $self->debug(1, "uploading $localname in single part"); + + my $archive_id = $self->glacier_eval('upload_archive', + $vaultname, + $localname, + $remotename); + if ($self->lasterr) { + $self->abend(EX_FAILURE, "upload failed: ", + $self->last_error_message); + } + return $archive_id; +} + +sub _upload_multipart { + my ($self, $vaultname, $localname, $remotename) = @_; + my $glacier = $self->{_glacier}; + + my $archive_size = -s $localname; + my $part_size = + $glacier->calculate_multipart_upload_partsize($archive_size); + + $self->abend(EX_FAILURE, "$localname is too big for upload") + if $part_size == 0; + + my $njobs = $self->{_options}{jobs}; + unless (defined($njobs)) { + $njobs = $archive_size / $part_size; + if ($njobs > 128) { + $njobs = 128; + } + } + + $self->debug(1, + "uploading $localname in chunks of $part_size bytes, in $njobs jobs"); + + open(my $fd, '<', $localname) + or $self->abort(EX_FAILURE, "can't open $localname: $!"); + my $upload_id = $glacier->multipart_upload_init($vaultname, $part_size, + $remotename); + $self->debug(1, "Upload ID: $upload_id"); + + use threads; + use Fcntl qw(SEEK_SET); + my @part_hashes = (); + my $part_idx = 0; + my $read_bytes; + my $rest_size = $archive_size; + my $off = 0; + while ($rest_size) { + for (my $i = 0; $i < $njobs && $rest_size; $i++) { + my ($thr) = threads->create( + sub { + my ($part_idx, $off) = @_; + + my $part; + seek($fd, $off, SEEK_SET); + $read_bytes = read($fd, $part, $part_size); + # FIXME: use http_catch + my $res = $glacier->multipart_upload_upload_part($vaultname, + $upload_id, + $part_size, + $part_idx, + \$part); + return ($part_idx, $res); + }, + $part_idx, $off); + $part_idx++; + $off += $part_size; + if ($rest_size < $part_size) { + $part_size = $rest_size; + } + $rest_size -= $part_size; + } + + $self->debug(2, "waiting for the bunch to finish"); + while (threads->list()) { + foreach my $thr (threads->list(threads::joinable)) { + # FIXME: error handling, see the note about http_catch above + my ($idx, $hash) = $thr->join() or croak "thread $thr failed"; + $part_hashes[$idx] = $hash; + } + } + } + + # Capture archive id or error code + my $archive_id = $self->glacier_eval('multipart_upload_complete', + $vaultname, $upload_id, + \@part_hashes, + $archive_size); + + if ($self->lasterr) { + $glacier->multipart_upload_abort($vaultname, $upload_id); + $self->abend(EX_FAILURE, "upload failed: ", + $self->last_error_message); + } + + # Check if we have a valid $archive_id + unless ($archive_id =~ /^[a-zA-Z0-9_\-]{10,}$/) { + $glacier->multipart_upload_abort($vaultname, $upload_id); + $self->abend(EX_FAILURE, "upload completion failed"); + } + + return $archive_id; +} + +1; diff --git a/lib/App/Glacier/Command/Sync.pm b/lib/App/Glacier/Command/Sync.pm index 4603c34..285bfa1 100644 --- a/lib/App/Glacier/Command/Sync.pm +++ b/lib/App/Glacier/Command/Sync.pm @@ -29,7 +29,7 @@ sub run { sub sync { my ($self, $vault_name) = @_; - + my $dir = $self->directory($vault_name); my $job = new App::Glacier::Job::InventoryRetrieval($self, $vault_name); if ($job->is_completed) { diff --git a/lib/App/Glacier/DateTime.pm b/lib/App/Glacier/DateTime.pm index f5033b1..ca7d61b 100644 --- a/lib/App/Glacier/DateTime.pm +++ b/lib/App/Glacier/DateTime.pm @@ -6,6 +6,20 @@ use parent 'DateTime'; use Carp; use DateTime; +sub new { + my ($class, @opts) = shift; + unless (@opts) { + my ($second, $minute, $hour, $day, $month, $year) = gmtime; + return $class->SUPER::new(year => 1900 + $year, + month => $month + 1, + day => $day, + hour => $hour, + minute => $minute, + second => $second); + } + return $class->SUPER::new(@_); +} + sub strftime { my $self = shift; if (@_ > 1) { diff --git a/lib/App/Glacier/Directory.pm b/lib/App/Glacier/Directory.pm index bf588ed..31772d7 100644 --- a/lib/App/Glacier/Directory.pm +++ b/lib/App/Glacier/Directory.pm @@ -28,18 +28,19 @@ sub locate { } sub info { - my ($self, $key, $val) = @_; + my ($self, $key) = @_; my $rec = $self->retrieve(DB_INFO_KEY); - if ($val) { - $rec = {} unless defined($rec); - $rec->{$key} = $val; - $self->SUPER::store(DB_INFO_KEY, $rec); - } elsif (!defined($rec)) { - return undef; - } + return undef unless defined($rec); return $rec->{$key}; } +sub set_info { + my ($self, $key, $val) = @_; + my $rec = $self->retrieve(DB_INFO_KEY) || {}; + $rec->{$key} = $val; + $self->SUPER::store(DB_INFO_KEY, $rec); +} + sub last_sync_time { my ($self) = @_; return $self->info('SyncTimeStamp'); @@ -47,9 +48,25 @@ sub last_sync_time { sub update_sync_time { my ($self) = @_; - return $self->info('SyncTimeStamp', time); + $self->set_info('SyncTimeStamp', time); + $self->clear_dirty; } +sub is_dirty { + my ($self) = @_; + return $self->info('Dirty'); +} + +sub clear_dirty { + my ($self) = @_; + return $self->set_info('Dirty', 0); +} + +sub invalidate { + my ($self) = @_; + return $self->set_info('Dirty', 1); +} + sub foreach { my ($self, $code) = @_; $self->SUPER::foreach(sub { @@ -107,7 +124,8 @@ sub status { my ($self) = @_; if (defined($self->last_sync_time)) { - if (time - $self->last_sync_time > $self->{_ttl}) { + if (time - $self->last_sync_time > $self->{_ttl} + || $self->info('Dirty')) { return DIR_OUTDATED; } } else { diff --git a/lib/App/Glacier/Job.pm b/lib/App/Glacier/Job.pm index 37d724b..409b91b 100644 --- a/lib/App/Glacier/Job.pm +++ b/lib/App/Glacier/Job.pm @@ -10,12 +10,19 @@ use App::Glacier::Timestamp; # new(CMD, VAULT, KEY, INIT) sub new { - croak "bad number of arguments" unless $#_ == 4; - my ($class, $cmd, $vault, $key, $init) = @_; + croak "bad number of arguments" unless $#_ >= 4; + my ($class, $cmd, $vault, $key, $init, %opts) = @_; + my $invalidate = delete $opts{invalidate}; + + if (keys(%opts)) { + croak "unrecognized parameters: ".join(', ', keys(%opts)); + } + return bless { _cmd => $cmd, _init => $init, _vault => $vault, - _key => $key }, $class; + _key => $key, + _invalidate => $invalidate }, $class; } sub _get_db { @@ -27,7 +34,7 @@ sub _get_job { my ($self) = @_; unless ($self->{_job}) { my $db = $self->_get_db; - my $job = $db->retrieve($self->{_key}); + my $job = $db->retrieve($self->{_key}) unless $self->{_invalidate}; if (!$job) { my $jid = $self->{_cmd}->glacier_eval(@{$self->{_init}}); if ($self->{_cmd}->lasterr) { diff --git a/lib/App/Glacier/Job/InventoryRetrieval.pm b/lib/App/Glacier/Job/InventoryRetrieval.pm index a42f806..9980a09 100644 --- a/lib/App/Glacier/Job/InventoryRetrieval.pm +++ b/lib/App/Glacier/Job/InventoryRetrieval.pm @@ -8,9 +8,11 @@ use Carp; # new(CMD, VAULT) sub new { - croak "bad number of arguments" unless $#_ == 2; - my ($class, $cmd, $vault) = @_; - return $class->SUPER::new($cmd, $vault, $vault, - [ 'initiate_inventory_retrieval', $vault, 'JSON' ]); + croak "bad number of arguments" unless $#_ >= 2; + my ($class, $cmd, $vault, %opts) = @_; + return $class->SUPER::new( + $cmd, $vault, $vault, + [ 'initiate_inventory_retrieval', $vault, 'JSON' ], + %opts); } |