← Index
NYTProf Performance Profile   « line view »
For /usr/local/libexec/sympa/task_manager-debug.pl
  Run on Tue Jun 1 22:32:51 2021
Reported on Tue Jun 1 22:35:15 2021

Filename/usr/local/libexec/sympa/Sympa/Spool/Outgoing.pm
StatementsExecuted 0 statements in 0s
Subroutines
Calls P F Exclusive
Time
Inclusive
Time
Subroutine
0000s0sSympa::Spool::Outgoing::::BEGIN@26Sympa::Spool::Outgoing::BEGIN@26
0000s0sSympa::Spool::Outgoing::::BEGIN@27Sympa::Spool::Outgoing::BEGIN@27
0000s0sSympa::Spool::Outgoing::::BEGIN@28Sympa::Spool::Outgoing::BEGIN@28
0000s0sSympa::Spool::Outgoing::::BEGIN@29Sympa::Spool::Outgoing::BEGIN@29
0000s0sSympa::Spool::Outgoing::::BEGIN@30Sympa::Spool::Outgoing::BEGIN@30
0000s0sSympa::Spool::Outgoing::::BEGIN@31Sympa::Spool::Outgoing::BEGIN@31
0000s0sSympa::Spool::Outgoing::::BEGIN@33Sympa::Spool::Outgoing::BEGIN@33
0000s0sSympa::Spool::Outgoing::::BEGIN@34Sympa::Spool::Outgoing::BEGIN@34
0000s0sSympa::Spool::Outgoing::::BEGIN@35Sympa::Spool::Outgoing::BEGIN@35
0000s0sSympa::Spool::Outgoing::::BEGIN@36Sympa::Spool::Outgoing::BEGIN@36
0000s0sSympa::Spool::Outgoing::::BEGIN@37Sympa::Spool::Outgoing::BEGIN@37
0000s0sSympa::Spool::Outgoing::::BEGIN@38Sympa::Spool::Outgoing::BEGIN@38
0000s0sSympa::Spool::Outgoing::::BEGIN@39Sympa::Spool::Outgoing::BEGIN@39
0000s0sSympa::Spool::Outgoing::::__ANON__Sympa::Spool::Outgoing::__ANON__ (xsub)
0000s0sSympa::Spool::Outgoing::::_create_spoolSympa::Spool::Outgoing::_create_spool
0000s0sSympa::Spool::Outgoing::::_get_recipient_tabs_by_domainSympa::Spool::Outgoing::_get_recipient_tabs_by_domain
0000s0sSympa::Spool::Outgoing::::newSympa::Spool::Outgoing::new
0000s0sSympa::Spool::Outgoing::::nextSympa::Spool::Outgoing::next
0000s0sSympa::Spool::Outgoing::::quarantineSympa::Spool::Outgoing::quarantine
0000s0sSympa::Spool::Outgoing::::removeSympa::Spool::Outgoing::remove
0000s0sSympa::Spool::Outgoing::::storeSympa::Spool::Outgoing::store
0000s0sSympa::Spool::Outgoing::::too_much_remaining_packetsSympa::Spool::Outgoing::too_much_remaining_packets
Call graph for these subroutines as a Graphviz dot language file.
Line State
ments
Time
on line
Calls Time
in subs
Code
1# -*- indent-tabs-mode: nil; -*-
2# vim:ft=perl:et:sw=4
3# $Id$
4
5# Sympa - SYsteme de Multi-Postage Automatique
6#
7# Copyright 2019, 2020 The Sympa Community. See the AUTHORS.md
8# file at the top-level directory of this distribution and at
9# <https://github.com/sympa-community/sympa.git>.
10#
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program. If not, see <http://www.gnu.org/licenses/>.
23
24package Sympa::Spool::Outgoing;
25
26use strict;
27use warnings;
28use Cwd qw();
29use English qw(-no_match_vars);
30use File::Copy qw();
31use Time::HiRes qw();
32
33use Conf;
34use Sympa::Constants;
35use Sympa::LockedFile;
36use Sympa::Log;
37use Sympa::Message;
38use Sympa::Spool;
39use Sympa::Tools::File;
40
41my $log = Sympa::Log->instance;
42
43sub new {
44 my $class = shift;
45 my %options = @_;
46
47 my $self = bless {
48 msg_directory => $Conf::Conf{'queuebulk'} . '/msg',
49 pct_directory => $Conf::Conf{'queuebulk'} . '/pct',
50 bad_directory => $Conf::Conf{'queuebulk'} . '/bad',
51 bad_msg_directory => $Conf::Conf{'queuebulk'} . '/bad/msg',
52 bad_pct_directory => $Conf::Conf{'queuebulk'} . '/bad/pct',
53 _metadatas => undef,
54 } => $class;
55
56 $self->_create_spool;
57
58 # Build glob pattern (for pct entries).
59 $self->{_glob_pattern} = Sympa::Spool::build_glob_pattern(
60 '%s.%s.%d.%f.%s@%s_%s,%ld,%d/%s',
61 [ qw(priority packet_priority date time localpart domainpart tag pid rand serial)
62 ],
63 %options
64 ) || '*/*';
65
66 return $self;
67}
68
69sub _create_spool {
70 my $self = shift;
71
72 my $umask = umask oct $Conf::Conf{'umask'};
73 foreach my $directory (
74 $Conf::Conf{queuebulk}, $self->{msg_directory},
75 $self->{pct_directory}, $self->{bad_directory},
76 $self->{bad_msg_directory}, $self->{bad_pct_directory}
77 ) {
78 unless (-d $directory) {
79 $log->syslog('info', 'Creating spool %s', $directory);
80 unless (
81 mkdir($directory, 0755)
82 and Sympa::Tools::File::set_file_rights(
83 file => $directory,
84 user => Sympa::Constants::USER(),
85 group => Sympa::Constants::GROUP()
86 )
87 ) {
88 die sprintf 'Cannot create %s: %s', $directory, $ERRNO;
89 }
90 }
91 }
92 umask $umask;
93}
94
95sub next {
96 my $self = shift;
97 my %options = @_;
98
99 unless ($self->{_metadatas}) {
100 my $cwd = Cwd::getcwd();
101 unless (chdir $self->{pct_directory}) {
102 die sprintf 'Cannot chdir to %s: %s', $self->{pct_directory},
103 $ERRNO;
104 }
105 $self->{_metadatas} = [
106 sort grep {
107 !/,lock/
108 and !m{(?:\A|/)(?:\.|T\.|BAD-)}
109 and -f ($self->{pct_directory} . '/' . $_)
110 } glob $self->{_glob_pattern}
111 ];
112 chdir $cwd;
113 }
114 unless (@{$self->{_metadatas}}) {
115 undef $self->{_metadatas};
116 return;
117 }
118
119 while (my $marshalled = shift @{$self->{_metadatas}}) {
120 my ($lock_fh, $metadata, $message);
121
122 # Try locking packet. Those locked or removed by other process will
123 # be skipped.
124 $lock_fh =
125 Sympa::LockedFile->new($self->{pct_directory} . '/' . $marshalled,
126 -1, '+<');
127 next unless $lock_fh;
128
129 # FIXME: The list or the robot that injected packet can no longer be
130 # available.
131 $metadata = Sympa::Spool::unmarshal_metadata(
132 $self->{pct_directory},
133 $marshalled,
134 qr{\A(\w+)\.(\w+)\.(\d+)\.(\d+\.\d+)\.(\@?[^\s\@]*)\@([\w\.\-*]*)_(\w+),(\d+),(\d+)/(\w+)\z},
135 [ qw(priority packet_priority date time localpart domainpart tag pid rand serial)
136 ]
137 );
138
139 if ($metadata) {
140 unless ($options{no_filter}) {
141 # Skip messages not yet to be delivered.
142 next unless $metadata->{date} <= time;
143 }
144
145 my $msg_file = Sympa::Spool::marshal_metadata(
146 $metadata,
147 '%s.%s.%d.%f.%s@%s_%s,%ld,%d',
148 [ qw(priority packet_priority date time localpart domainpart tag pid rand)
149 ]
150 );
151 $message = Sympa::Message->new_from_file(
152 $self->{msg_directory} . '/' . $msg_file, %$metadata);
153
154 if ($message) {
155 my $rcpt_string = do { local $RS; <$lock_fh> };
156 $message->{rcpt} = [split /\n+/, $rcpt_string];
157 }
158 }
159
160 # Though message might not be deserialized, anyway return the result.
161 return ($message, $lock_fh);
162 }
163 return;
164}
165
166sub quarantine {
167 my $self = shift;
168 my $lock_fh = shift;
169
170 my $marshalled = $lock_fh->basename(1);
171 my $bad_pct_directory = $self->{bad_pct_directory} . '/' . $marshalled;
172 my $bad_msg_file = $self->{bad_msg_directory} . '/' . $marshalled;
173 my $bad_pct_file;
174
175 File::Copy::cp($self->{msg_directory} . '/' . $marshalled, $bad_msg_file)
176 unless -e $bad_msg_file;
177
178 $bad_pct_file = $bad_pct_directory . '/' . $lock_fh->basename;
179 mkdir $bad_pct_directory unless -d $bad_pct_directory;
180 unless (-d $bad_pct_directory and $lock_fh->rename($bad_pct_file)) {
181 $bad_pct_file =
182 $self->{pct_directory} . '/BAD-'
183 . $lock_fh->basename(1) . '-'
184 . $lock_fh->basename;
185 return undef unless $lock_fh->rename($bad_pct_file);
186 }
187
188 if (rmdir($self->{pct_directory} . '/' . $marshalled)) {
189 # No more packet.
190 unlink($self->{msg_directory} . '/' . $marshalled);
191 }
192 return 1;
193}
194
195sub remove {
196 my $self = shift;
197 my $lock_fh = shift;
198
199 my $marshalled = $lock_fh->basename(1);
200
201 if ($lock_fh->unlink) {
202 if (rmdir($self->{pct_directory} . '/' . $marshalled)) {
203 # No more packet.
204 unlink($self->{msg_directory} . '/' . $marshalled);
205 }
206 return 1;
207 }
208 return undef;
209}
210
211# DEPRECATED: No longer used.
212#sub messageasstring($messagekey);
213
214# fetch message from bulkspool_table by key
215# Old name: Sympa::Bulk::message_from_spool()
216# DEPRECATED: Not used.
217#sub fetch_content($messagekey);
218
219# DEPRECATED: Use Sympa::Message::personalize().
220# sub merge_msg;
221
222# DEPRECATED: Use Sympa::Message::personalize_text().
223# sub merge_data ($rcpt, $listname, $robot_id, $data, $body, \$message_output)
224
225sub store {
226 my $self = shift;
227 my $message = shift->dup;
228 my $rcpt = shift;
229 my %options = @_;
230
231 delete $message->{rcpt}; #FIXME
232
233 my ($list, $robot_id);
234 if (ref($message->{context}) eq 'Sympa::List') {
235 $list = $message->{context};
236 $robot_id = $message->{context}->{'domain'};
237 } elsif ($message->{context} and $message->{context} ne '*') {
238 $robot_id = $message->{context};
239 } else {
240 $robot_id = '*';
241 }
242
243 my $tag = $options{tag};
244 $tag = 's' unless defined $tag;
245 $message->{tag} = $tag;
246
247 $message->{priority} =
248 $list
249 ? $list->{admin}{priority}
250 : Conf::get_robot_conf($robot_id, 'sympa_priority')
251 unless defined $message->{priority} and length $message->{priority};
252 $message->{packet_priority} =
253 Conf::get_robot_conf($robot_id, 'sympa_packet_priority');
254 $message->{date} = time unless defined $message->{date};
255 $message->{time} = Time::HiRes::time();
256
257 # First, store the message in bulk/msg spool, because as soon as packets
258 # are created bulk.pl may distribute them.
259
260 my $marshalled = Sympa::Spool::store_spool(
261 $self->{msg_directory},
262 $message,
263 '%s.%s.%d.%f.%s@%s_%s,%ld,%d',
264 [ qw(priority packet_priority date time localpart domainpart tag PID RAND)
265 ],
266 %options
267 );
268 return unless $marshalled;
269
270 unless (mkdir($self->{pct_directory} . '/' . $marshalled)) {
271 $log->syslog(
272 'err',
273 'Cannot mkdir %s/%s: %m',
274 $self->{pct_directory}, $marshalled
275 );
276 unlink($self->{msg_directory} . '/' . $marshalled);
277 return;
278 }
279
280 # Second, create each recipient packet in bulk/pct spool.
281
282 my @rcpts;
283 unless (ref $rcpt) {
284 @rcpts = ([$rcpt]);
285 } else {
286 @rcpts = _get_recipient_tabs_by_domain($robot_id, @{$rcpt || []});
287 }
288 my $total_sent = $#rcpts + 1;
289
290 # Create a temporary lock file in the packet directory to prevent bulk.pl
291 # from removing packet directory and the message during addition of
292 # packets.
293 my $lock_fh_tmp = Sympa::LockedFile->new(
294 $self->{pct_directory} . '/' . $marshalled . '/dont_rmdir',
295 -1, '+');
296
297 my $serial = $message->{tag};
298 while (my $rcpt = shift @rcpts) {
299 my $lock_fh = Sympa::LockedFile->new(
300 $self->{pct_directory} . '/' . $marshalled . '/' . $serial,
301 5, '>>');
302 return unless $lock_fh;
303
304 $lock_fh_tmp->close unless @rcpts; # Now the last packet is written.
305
306 print $lock_fh join("\n", @{$rcpt}) . "\n";
307 $lock_fh->close;
308
309 if (length $serial == 1) { # '0', 's' or 'z'.
310 $serial = '0001';
311 } else {
312 $serial++;
313 }
314 }
315
316 $log->syslog('notice', 'Message %s is stored into bulk spool as <%s>',
317 $message, $marshalled);
318 return unless $marshalled;
319 return {marshalled => $marshalled, total_packets => $total_sent};
320}
321
322# Old name: (part of) Sympa::Mail::mail_message().
323sub _get_recipient_tabs_by_domain {
324 my $robot_id = shift;
325 my @rcpt = @_;
326
327 return unless @rcpt;
328
329 # Sort by domain.
330 @rcpt = map {
331 join '@', grep { defined $_ } @$_;
332 } sort {
333 (($a->[1] // '') cmp($b->[1] // '')) || ($a->[0] cmp $b->[0])
334 } map {
335 [split /\@/, $_, 2]
336 } @rcpt;
337
338 my ($i, $j, $nrcpt);
339 my $size = 0;
340
341 my %rcpt_by_dom;
342
343 my @sendto;
344 my @sendtobypacket;
345
346 while (defined($i = shift @rcpt)) {
347 my @k = reverse split /[\.@]/, $i;
348 my @l = reverse split /[\.@]/, (defined $j ? $j : '@');
349
350 my $dom;
351 if ($i =~ /\@(.*)$/) {
352 $dom = $1;
353 chomp $dom;
354 }
355 $rcpt_by_dom{$dom} += 1;
356 $log->syslog(
357 'debug2',
358 'Domain: %s; rcpt by dom: %s; limit for this domain: %s',
359 $dom,
360 $rcpt_by_dom{$dom},
361 $Conf::Conf{'nrcpt_by_domain'}{$dom}
362 );
363
364 if (
365 # number of recipients by each domain
366 ( defined $Conf::Conf{'nrcpt_by_domain'}{$dom}
367 and $rcpt_by_dom{$dom} >= $Conf::Conf{'nrcpt_by_domain'}{$dom}
368 )
369 or
370 ( $j
371
372 and scalar(@sendto) > Conf::get_robot_conf($robot_id, 'avg')
373 and lc "$k[0] $k[1]" ne lc "$l[0] $l[1]"
374 )
375 or
376 (@sendto and $nrcpt >= Conf::get_robot_conf($robot_id, 'nrcpt'))
377
378 ) {
379 undef %rcpt_by_dom;
380 # do not replace this line by "push @sendtobypacket, \@sendto" !!!
381 my @tab = @sendto;
382 push @sendtobypacket, \@tab;
383 $nrcpt = $size = 0;
384 @sendto = ();
385 }
386
387 $nrcpt++;
388 $size += length($i) + 5;
389 push(@sendto, $i);
390 $j = $i;
391 }
392
393 if (@sendto) {
394 my @tab = @sendto;
395 # do not replace this line by push @sendtobypacket, \@sendto !!!
396 push @sendtobypacket, \@tab;
397 }
398
399 return @sendtobypacket;
400}
401
402## remove file that are not referenced by any packet
403# DEPRECATED: No longer used.
404#sub purge_bulkspool();
405
406# Old name: Bulk::there_is_too_much_remaining_packets().
407sub too_much_remaining_packets {
408 my $self = shift;
409
410 my $remaining_packets = scalar @{$self->{_metadatas} || []};
411 if ($remaining_packets > Conf::get_robot_conf('*', 'bulk_fork_threshold'))
412 {
413 return $remaining_packets;
414 } else {
415 return 0;
416 }
417}
418
4191;
420__END__