aboutsummaryrefslogtreecommitdiff
path: root/lib/App
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org.ua>2017-05-19 16:29:59 +0300
committerSergey Poznyakoff <gray@gnu.org.ua>2017-05-19 16:29:59 +0300
commitcbf0afdd4234d541bb3b33c9120bdb3c5ecb7962 (patch)
tree5fb3f30e9796d0d7da0ef80b5c17fd4c44880d5f /lib/App
parentf0f8633db743bb3edf490ec0ae6035acce38be97 (diff)
downloadglacier-cbf0afdd4234d541bb3b33c9120bdb3c5ecb7962.tar.gz
glacier-cbf0afdd4234d541bb3b33c9120bdb3c5ecb7962.tar.bz2
Implement the put command
* glacier: Register the put command * lib/App/Glacier/Command.pm (multipart-upload-size): New configuration parameter. * lib/App/Glacier/Command/ListVault.pm: Remove useless sub * lib/App/Glacier/Command/Put.pm: New file. * lib/App/Glacier/DateTime.pm (new): New constructor. * lib/App/Glacier/Directory.pm (info): Rewrite as accessor (set_info, is_dirty, clear_dirty, invalidate): New methods. (update_sync_time): Clear the Dirty flag. (status): Return DIR_OUTDATED if Dirty is set. * lib/App/Glacier/Job.pm: New keyword argument "invalidate". (_get_job): Initiate new job if _invalidate is set. * lib/App/Glacier/Job/InventoryRetrieval.pm (new): Pass all arguments to the super-class.
Diffstat (limited to 'lib/App')
-rw-r--r--lib/App/Glacier/Command.pm4
-rw-r--r--lib/App/Glacier/Command/ListVault.pm20
-rw-r--r--lib/App/Glacier/Command/Put.pm167
-rw-r--r--lib/App/Glacier/Command/Sync.pm2
-rw-r--r--lib/App/Glacier/DateTime.pm14
-rw-r--r--lib/App/Glacier/Directory.pm38
-rw-r--r--lib/App/Glacier/Job.pm15
-rw-r--r--lib/App/Glacier/Job/InventoryRetrieval.pm10
8 files changed, 230 insertions, 40 deletions
diff --git a/lib/App/Glacier/Command.pm b/lib/App/Glacier/Command.pm
index e049d94..e1524e7 100644
--- a/lib/App/Glacier/Command.pm
+++ b/lib/App/Glacier/Command.pm
@@ -73,6 +73,8 @@ my %parameters = (
access => 1,
secret => 1,
region => 1,
+ 'multipart-upload-size' => { default => 100*1024*1024,
+ check => \&ck_number },
}
},
database => {
@@ -91,7 +93,7 @@ my %parameters = (
}
}
}
- }
+ }
);
sub new {
diff --git a/lib/App/Glacier/Command/ListVault.pm b/lib/App/Glacier/Command/ListVault.pm
index 1df44b6..fb26698 100644
--- a/lib/App/Glacier/Command/ListVault.pm
+++ b/lib/App/Glacier/Command/ListVault.pm
@@ -190,26 +190,6 @@ sub show_archive {
}
}
-use constant {
- CACHE_OK => 0,
- CACHE_INIT => 1,
- CACHE_UPDATE => 2
-};
-
-sub _check_dir {
- my ($self, $dir) = @_;
-
- if (defined($dir->last_sync_time)) {
- if (time - $dir->last_sync_time >
- $self->cfget(qw(database inv ttl))) {
- return CACHE_UPDATE;
- }
- } else {
- return CACHE_INIT;
- }
- return CACHE_OK;
-}
-
sub get_vault_inventory {
my ($self, $vault_name, @file_list) = @_;
my $dir = $self->directory($vault_name);
diff --git a/lib/App/Glacier/Command/Put.pm b/lib/App/Glacier/Command/Put.pm
new file mode 100644
index 0000000..2bb8a45
--- /dev/null
+++ b/lib/App/Glacier/Command/Put.pm
@@ -0,0 +1,167 @@
+package App::Glacier::Command::Put;
+use strict;
+use warnings;
+use App::Glacier::Command;
+use App::Glacier::DateTime;
+use parent qw(App::Glacier::Command);
+use File::Basename;
+use Carp;
+
+=head1 NAME
+
+glacier put - upload file to a vault
+
+=head1 SYNOPSIS
+
+B<glacier put>
+[B<-j> I<NJOBS>]
+[B<--jobs=>I<NJOBS>]
+I<VAULT>
+I<FILE>
+[I<REMOTENAME>]
+
+=cut
+
+sub getopt {
+ my ($self, %opts) = @_;
+ return $self->SUPER::getopt('jobs|j=n' => \$self->{_options}{jobs},
+ %opts);
+}
+
+sub run {
+ my ($self, $vaultname, $localname, $remotename) = @_;
+ $self->abend(EX_USAGE, "two or three arguments expected")
+ unless @_ == 2 || @_ == 3;
+
+ $remotename = basename($localname);
+ $self->_update($vaultname, $localname, $remotename);
+ # Initiate inventory retrieval
+ my $job = new App::Glacier::Job::InventoryRetrieval($self, $vault_name,
+ invalidate => 1);
+}
+
+sub _update {
+ my ($self, $vaultname, $localname, $remotename) = @_;
+ my $size = -s $localname
+ or $self->abend(EX_USAGE, "can't stat \"$localname\": $!");
+
+ my $dir = $self->directory($vaultname);
+ my $id = ($size < $self->cfget(qw(glacier multipart-upload-size)))
+ ? $self->_upload_simple($vaultname, $localname, $remotename)
+ : $self->_upload_multipart($vaultname, $localname, $remotename);
+ $dir->add_version($remotename, { ArchiveId => $id,
+ Size => $size,
+ CreationDate => new App::Glacier::DateTime,
+ ArchiveDescription => $remotename });
+ $dir->invalidate;
+}
+
+sub _upload_simple {
+ my ($self, $vaultname, $localname, $remotename) = @_;
+
+ $self->debug(1, "uploading $localname in single part");
+
+ my $archive_id = $self->glacier_eval('upload_archive',
+ $vaultname,
+ $localname,
+ $remotename);
+ if ($self->lasterr) {
+ $self->abend(EX_FAILURE, "upload failed: ",
+ $self->last_error_message);
+ }
+ return $archive_id;
+}
+
+sub _upload_multipart {
+ my ($self, $vaultname, $localname, $remotename) = @_;
+ my $glacier = $self->{_glacier};
+
+ my $archive_size = -s $localname;
+ my $part_size =
+ $glacier->calculate_multipart_upload_partsize($archive_size);
+
+ $self->abend(EX_FAILURE, "$localname is too big for upload")
+ if $part_size == 0;
+
+ my $njobs = $self->{_options}{jobs};
+ unless (defined($njobs)) {
+ $njobs = $archive_size / $part_size;
+ if ($njobs > 128) {
+ $njobs = 128;
+ }
+ }
+
+ $self->debug(1,
+ "uploading $localname in chunks of $part_size bytes, in $njobs jobs");
+
+ open(my $fd, '<', $localname)
+ or $self->abort(EX_FAILURE, "can't open $localname: $!");
+ my $upload_id = $glacier->multipart_upload_init($vaultname, $part_size,
+ $remotename);
+ $self->debug(1, "Upload ID: $upload_id");
+
+ use threads;
+ use Fcntl qw(SEEK_SET);
+ my @part_hashes = ();
+ my $part_idx = 0;
+ my $read_bytes;
+ my $rest_size = $archive_size;
+ my $off = 0;
+ while ($rest_size) {
+ for (my $i = 0; $i < $njobs && $rest_size; $i++) {
+ my ($thr) = threads->create(
+ sub {
+ my ($part_idx, $off) = @_;
+
+ my $part;
+ seek($fd, $off, SEEK_SET);
+ $read_bytes = read($fd, $part, $part_size);
+ # FIXME: use http_catch
+ my $res = $glacier->multipart_upload_upload_part($vaultname,
+ $upload_id,
+ $part_size,
+ $part_idx,
+ \$part);
+ return ($part_idx, $res);
+ },
+ $part_idx, $off);
+ $part_idx++;
+ $off += $part_size;
+ if ($rest_size < $part_size) {
+ $part_size = $rest_size;
+ }
+ $rest_size -= $part_size;
+ }
+
+ $self->debug(2, "waiting for the bunch to finish");
+ while (threads->list()) {
+ foreach my $thr (threads->list(threads::joinable)) {
+ # FIXME: error handling, see the note about http_catch above
+ my ($idx, $hash) = $thr->join() or croak "thread $thr failed";
+ $part_hashes[$idx] = $hash;
+ }
+ }
+ }
+
+ # Capture archive id or error code
+ my $archive_id = $self->glacier_eval('multipart_upload_complete',
+ $vaultname, $upload_id,
+ \@part_hashes,
+ $archive_size);
+
+ if ($self->lasterr) {
+ $glacier->multipart_upload_abort($vaultname, $upload_id);
+ $self->abend(EX_FAILURE, "upload failed: ",
+ $self->last_error_message);
+ }
+
+ # Check if we have a valid $archive_id
+ unless ($archive_id =~ /^[a-zA-Z0-9_\-]{10,}$/) {
+ $glacier->multipart_upload_abort($vaultname, $upload_id);
+ $self->abend(EX_FAILURE, "upload completion failed");
+ }
+
+ return $archive_id;
+}
+
+1;
diff --git a/lib/App/Glacier/Command/Sync.pm b/lib/App/Glacier/Command/Sync.pm
index 4603c34..285bfa1 100644
--- a/lib/App/Glacier/Command/Sync.pm
+++ b/lib/App/Glacier/Command/Sync.pm
@@ -29,7 +29,7 @@ sub run {
sub sync {
my ($self, $vault_name) = @_;
-
+
my $dir = $self->directory($vault_name);
my $job = new App::Glacier::Job::InventoryRetrieval($self, $vault_name);
if ($job->is_completed) {
diff --git a/lib/App/Glacier/DateTime.pm b/lib/App/Glacier/DateTime.pm
index f5033b1..ca7d61b 100644
--- a/lib/App/Glacier/DateTime.pm
+++ b/lib/App/Glacier/DateTime.pm
@@ -6,6 +6,20 @@ use parent 'DateTime';
use Carp;
use DateTime;
+sub new {
+ my ($class, @opts) = shift;
+ unless (@opts) {
+ my ($second, $minute, $hour, $day, $month, $year) = gmtime;
+ return $class->SUPER::new(year => 1900 + $year,
+ month => $month + 1,
+ day => $day,
+ hour => $hour,
+ minute => $minute,
+ second => $second);
+ }
+ return $class->SUPER::new(@_);
+}
+
sub strftime {
my $self = shift;
if (@_ > 1) {
diff --git a/lib/App/Glacier/Directory.pm b/lib/App/Glacier/Directory.pm
index bf588ed..31772d7 100644
--- a/lib/App/Glacier/Directory.pm
+++ b/lib/App/Glacier/Directory.pm
@@ -28,18 +28,19 @@ sub locate {
}
sub info {
- my ($self, $key, $val) = @_;
+ my ($self, $key) = @_;
my $rec = $self->retrieve(DB_INFO_KEY);
- if ($val) {
- $rec = {} unless defined($rec);
- $rec->{$key} = $val;
- $self->SUPER::store(DB_INFO_KEY, $rec);
- } elsif (!defined($rec)) {
- return undef;
- }
+ return undef unless defined($rec);
return $rec->{$key};
}
+sub set_info {
+ my ($self, $key, $val) = @_;
+ my $rec = $self->retrieve(DB_INFO_KEY) || {};
+ $rec->{$key} = $val;
+ $self->SUPER::store(DB_INFO_KEY, $rec);
+}
+
sub last_sync_time {
my ($self) = @_;
return $self->info('SyncTimeStamp');
@@ -47,9 +48,25 @@ sub last_sync_time {
sub update_sync_time {
my ($self) = @_;
- return $self->info('SyncTimeStamp', time);
+ $self->set_info('SyncTimeStamp', time);
+ $self->clear_dirty;
}
+sub is_dirty {
+ my ($self) = @_;
+ return $self->info('Dirty');
+}
+
+sub clear_dirty {
+ my ($self) = @_;
+ return $self->set_info('Dirty', 0);
+}
+
+sub invalidate {
+ my ($self) = @_;
+ return $self->set_info('Dirty', 1);
+}
+
sub foreach {
my ($self, $code) = @_;
$self->SUPER::foreach(sub {
@@ -107,7 +124,8 @@ sub status {
my ($self) = @_;
if (defined($self->last_sync_time)) {
- if (time - $self->last_sync_time > $self->{_ttl}) {
+ if (time - $self->last_sync_time > $self->{_ttl}
+ || $self->info('Dirty')) {
return DIR_OUTDATED;
}
} else {
diff --git a/lib/App/Glacier/Job.pm b/lib/App/Glacier/Job.pm
index 37d724b..409b91b 100644
--- a/lib/App/Glacier/Job.pm
+++ b/lib/App/Glacier/Job.pm
@@ -10,12 +10,19 @@ use App::Glacier::Timestamp;
# new(CMD, VAULT, KEY, INIT)
sub new {
- croak "bad number of arguments" unless $#_ == 4;
- my ($class, $cmd, $vault, $key, $init) = @_;
+ croak "bad number of arguments" unless $#_ >= 4;
+ my ($class, $cmd, $vault, $key, $init, %opts) = @_;
+ my $invalidate = delete $opts{invalidate};
+
+ if (keys(%opts)) {
+ croak "unrecognized parameters: ".join(', ', keys(%opts));
+ }
+
return bless { _cmd => $cmd,
_init => $init,
_vault => $vault,
- _key => $key }, $class;
+ _key => $key,
+ _invalidate => $invalidate }, $class;
}
sub _get_db {
@@ -27,7 +34,7 @@ sub _get_job {
my ($self) = @_;
unless ($self->{_job}) {
my $db = $self->_get_db;
- my $job = $db->retrieve($self->{_key});
+ my $job = $db->retrieve($self->{_key}) unless $self->{_invalidate};
if (!$job) {
my $jid = $self->{_cmd}->glacier_eval(@{$self->{_init}});
if ($self->{_cmd}->lasterr) {
diff --git a/lib/App/Glacier/Job/InventoryRetrieval.pm b/lib/App/Glacier/Job/InventoryRetrieval.pm
index a42f806..9980a09 100644
--- a/lib/App/Glacier/Job/InventoryRetrieval.pm
+++ b/lib/App/Glacier/Job/InventoryRetrieval.pm
@@ -8,9 +8,11 @@ use Carp;
# new(CMD, VAULT)
sub new {
- croak "bad number of arguments" unless $#_ == 2;
- my ($class, $cmd, $vault) = @_;
- return $class->SUPER::new($cmd, $vault, $vault,
- [ 'initiate_inventory_retrieval', $vault, 'JSON' ]);
+ croak "bad number of arguments" unless $#_ >= 2;
+ my ($class, $cmd, $vault, %opts) = @_;
+ return $class->SUPER::new(
+ $cmd, $vault, $vault,
+ [ 'initiate_inventory_retrieval', $vault, 'JSON' ],
+ %opts);
}

Return to:

Send suggestions and report system problems to the System administrator.