在Perl中处理15亿行的文件

问题描述

我需要处理一个包含15亿个条目的文件,其中包含11列,大小为300GB。我需要从每行中提取一些信息。 我已将问题分为两部分。首先,我阅读了文件,并尝试将文件缩小为我需要的列和一些过滤条件。我写出了这个所谓的精简文件〜700GB约100GB。项。

然后,我阅读精简文件并处理每个条目。这是我的代码段:

while (my $line = <$fh_file_vss_rlrp_inst>) {
  chomp $line;
  my @columns_line = split('\s+',$line);
  if( scalar @columns_line == 10 && $columns_line[-1] !~ /X|Y|Z|K/ ) {
    print $fh_reduced "$columns_line[1] $columns_line[7] $columns_line[-1]\n";
  } 
  $inst_count++;
   if($inst_count % 10000000 == 0) { 
     $date = `date`;
     print "Processed $inst_count ... time: $date";
   }
}

现在正在读取缩小的文件以进行进一步处理,我的%h_kI_vB内存中有100,000个条目:

while (my $line = <$fh_file_vss_rlrp_inst>) {
chomp $line;
my @columns_line = split('\s+',$line);
  my $wip = $columns_line[0];
  my $this_inst = $columns_line[0];
  my @loc_xyz;
  my @loc_ijk;
  while (grep(/\//,$wip)){
      if (exists($h_kI_vB{$wip})){ 
          push(@loc_xyz,$h_kI_vB{$wip});                         
      }
      $wip =~ s/\/[^\/]+$//;
  }
  if (exists($h_kI_vB{$wip})){
      push(@loc_xyz,$h_kI_vB{$wip});
  }
  
  if (@loc_xyz){
      my $loc_block_string;
      foreach my $loc ( @loc_xyz){
          if (not defined $loc_block_string){
              $loc_block_string = "$loc";
          } else {
              $loc_block_string = $loc_block_string.":$loc";
          }
      }
      $hierarchical_blocks{$this_inst}=$loc_block_string;
      
  }
  if(exists $hierarchical_blocks{$this_inst}) {
    my @x_y_layer = split(',',$columns_line[1]);
    $x_y_layer[0] =~ s/\(//;
    if(! defined $H_instBlock_coordinates{ $hierarchical_blocks{$this_inst} } ) { 
      my @initial_coordinate = ($x_y_layer[0],$x_y_layer[1],$x_y_layer[0],$x_y_layer[1]);
      print $fh_log "For inst:$this_inst $hierarchical_blocks{$this_inst}: @initial_coordinate\n" ;
      @{ $H_instBlock_coordinates{$hierarchical_blocks{$this_inst}} } = @initial_coordinate; 
    } else {
      my @old_x1_y1_x2_y2 =  @{ $H_instBlock_coordinates{$hierarchical_blocks{$this_inst}} };
      my $x1 = $old_x1_y1_x2_y2[0];
      my $y1 = $old_x1_y1_x2_y2[1];
      my $x2 = $old_x1_y1_x2_y2[2];
      my $y2 = $old_x1_y1_x2_y2[3];
      if($x_y_layer[0] < $x1) {
        $x1 = $x_y_layer[0];
      } elsif ($x_y_layer[0] > $x2) {
        $x2 = $x_y_layer[0];
      }
      
      if($x_y_layer[1] < $y1) {
        $y1 = $x_y_layer[1];
      } elsif ($x_y_layer[1] > $y2) {
        $y2 = $x_y_layer[1];
      }
      
      my @new_x1_y1_x2_y2 = ($x1,$y1,$x2,$y2);
      
      print $fh_log "For inst:$this_inst Changing coordinate to block $hierarchical_blocks{$this_inst}: @new_x1_y1_x2_y2\n" ;
      @{ $H_instBlock_coordinates{$hierarchical_blocks{$this_inst}} } = @new_x1_y1_x2_y2; 
      
    }
 }
  
 $inst_count++;
 if($inst_count % 1000000 == 0) {
   $date = `date`;
   print "Processed $inst_count ...time: $date\n";
 }  
 }

这太慢了。我将作业分发到具有450GB内存的远程服务器,该服务器运行24小时。 我需要优化代码,以使其在1小时内完成(最坏的情况)。

谢谢。

解决方法

通过从根本上改变自己的方法,可以获得真正的性能提升。由于您拥有简单的O(N)算法,因此该部门中没有什么特别突出的。

下面,我清理了您的代码并提供了一些微优化。但是我怀疑他们会造成很大的损失。原因是您的代码已经非常快了。您说处理700,000,000条线需要24小时,这意味着每条线仅需要12μs的时间即可处理。这是合理的。

  24 / 700,000 hour/line
* 60 minute/hour
* 60 s/minute
= 1.2 * 10^(-4) s/line
= 12 μs/line

您可能会从并行化中获益。例如,没有什么可以阻止第二个程序与第一个程序同时运行,而是使用两个内核而不是1个内核。它看起来像这样:

./prog_a | ./prog_b

进一步并行化此过程的复杂性在于,处理一行依赖于早期行的处理输出。

尽管如此,将$block的处理从prog_a移到prog_b可能会更有利,甚至可能在管道中创建一个中间阶段。

./prog_a | ./prog_i | ./prog_b

由您自己决定要在阶段数和每个阶段所执行的操作之间取得平衡,以达到最佳效果。例如,我猜想将原始文件的第10个字段的分析移到prog_a会更有利,因此在我发布的版本中将其从prog_b移到prog_a在下面。

但是工作仍然大部分是连续的。下一步是将使用$block的工作分配到多个内核中。只要具有相同值$block的行最终被同一实例处理,就可以这样做。我把这个留给你。


#!/usr/bin/perl

use strict;
use warnings;

while (<>) {
   my @fields = split;
   # Using a lookup hash would be faster if you know
   # the specific values C<< $fields[9] >> can take.
   # Something like the following before the loop:
   # C<< my %skip = map { $_ => 1 } qw( X Y Z K ); >>.
   # Then,you'd use C<< !$skip{$fields[9]} >>
   # instead of a regex match.
   if (@fields == 10 && $fields[9] !~ /[XYZK]/) {
      my ($x,$y) = split(/,/,substr($fields[7],1))
      print "$fields[1] $x $y\n";
   } 
   
   if ($. % 10_000_000 == 0) { 
      my $ts = localtime();
      print STDERR "[$ts] prog_a processed $. lines.\n";
   }
}
#!/usr/bin/perl

use strict;
use warnings;

# Should be safe to use since it's deemed safe enough to enabled by default in Perl 7.
# Allows us to cleanly avoid repeatedly doing the same hash lookup.
# We could use a reference instead of an alias by replacing
#    \my $coords = \$H_instBlock_coordinates{$block};
# with
#    my $coords_ref = \$H_instBlock_coordinates{$block};
# and changing all other instances of
#    $coords
# with
#    ${$coords_ref}
# But this would be a lot more noisy.
use experimental qw( refaliasing );

my %h_kI_vB = ...;

my %hierarchical_blocks;
my %H_instBlock_coordinates;
while (<>) {
   my ($this_inst,$x,$y) = split;

   # C<< $this_inst >> contains something like C<< a/b/c >>

   my @loc_xyz;
   {
      my $wip = $this_inst;
      while (1) {
         # If an existing C<< $h_kI_vB{$wip}) >> won't ever be a false value
         # (zero or an empty string),replacing C<< exists($h_kI_vB{$wip}) >>
         # with C<< $h_kI_vB{$wip} >> would be a tiny tiny bit faster.
         if (exists($h_kI_vB{$wip})) {
            push(@loc_xyz,$h_kI_vB{$wip});
         }

         # The regex engine is pretty heavy,so while the
         # remainder of the loop could be replaced with
         # C<< $wip =~ s{/[^/]*\z}{} or last; >>,it
         # probably wouldn't be as fast.
         ( my $i = rindex($wip,"/") ) >= 0
            or last;

         substr($wip,$i,length($wip),"");
      }
   }

   if (@loc_xyz) {
      my $block = join(":",@loc_xyz);
      $hierarchical_blocks{$this_inst} = $block;

      # C<< $block >> contains something like C<< d:e:f >>.
      # It may have fewer parts than C<< $this_inst >> did.

      # C<< $coords >> is an alias for C<< $H_instBlock_coordinates{$block} >>.
      \my $coords = \$H_instBlock_coordinates{$block};
      if ($coords) {
         if    ($x < $coords->[0]) { $coords->[0] = $x; }
         elsif ($x > $coords->[2]) { $coords->[2] = $x; }

         if    ($y < $coords->[1]) { $coords->[1] = $y; }
         elsif ($y > $coords->[3]) { $coords->[3] = $y; }

         print $fh_log "For inst:$this_inst Changing coordinate to block $block: @$coords\n";
      } else {
         $coords = [ $x,$y,$y ];
         print $fh_log "For inst:$this_inst " .                         "$block: @$coords\n";
      }
   }

   if ($. % 1_000_000 == 0) {
      my $ts = localtime();
      print STDERR "[$ts] prog_b processed $. lines.\n";
   }
}