File : adagio-upload-queue-manager.adb


------------------------------------------------------------------------------

--                         ADAGIO - ADALID - AENEA.                         --

--                                                                          --

--                            Copyright (C) 2003                            --

--                                 A. Mosteo.                               --

--                                                                          --

--  Authors: A. Mosteo. (adagio@mosteo.com)                                 --

--                                                                          --

--  If you have any questions in regard to this software, please address    --

--  them to the above email.                                                --

--                                                                          --

--  This program is free software; you can redistribute it and/or modify    --

--  it under the terms of the GNU General Public License as published by    --

--  the Free Software Foundation; either version 2 of the License, or (at   --

--  your option) any later version.                                         --

--                                                                          --

--  This program is distributed in the hope that it will be useful, but     --

--  WITHOUT ANY WARRANTY; without even the implied warranty of              --

--  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU       --

--  General Public License for more details.                                --

--                                                                          --

--  You should have received a copy of the GNU General Public License       --

--  along with this library; if not, write to the Free Software Foundation, --

--  Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.          --

--                                                                          --

--  You are not allowed to use any part of this code to develop a program   --

--  whose output would be used to harass or prosecute other users of the    --

--  networks Adagio connects with. All data collected with Adagio or a tool --

--  containing Adagio code about other network users must remain            --

--  confidential and cannot be made public by any mean, nor be used to      --

--  harass or legally prosecute these users.                                --

------------------------------------------------------------------------------

--  $Id: adagio-upload-queue-manager.adb,v 1.15 2004/02/29 20:36:46 Jano Exp $


with Adagio.Convert;
with Adagio.Debug;
with Adagio.File;
with Adagio.Socket;
with Adagio.Statistics;
with Adagio.Statistics.Booleans;
with Adagio.Statistics.Integers;
with Adagio.Statistics.Strings;
with Adagio.Trace;
with Adagio.Upload.Client_data;
with Adagio.Upload.Log;
with Adagio.Upload.Queue.Create_from_xml;
with Adagio.Xml;
with Adagio.Xml.Utils;
with Generic_event_queue;
with Sequence;

with Agpl.Geoip;

with Ada.Calendar; use Ada;
with Ada.Real_time;
with Ada.Unchecked_deallocation;
with Interfaces;

package body Adagio.Upload.Queue.Manager is

   Share_bandwidth : Boolean renames Globals.Options.Uploads_ShareBandwidth;
   Throttle        : Float renames Globals.Options.Uploads_throttle;
   Minimum_send_delay : Duration 
      renames Globals.Options.Uploads_MinimumSendDelay;

   Last_client_data_save : Calendar.Time := Calendar.Clock;

   package Client_pool renames Client.Pool.Map;

   -- Sequence to track completed uploads.

   package Positive_seq is new Sequence (Interfaces.Unsigned_32);
   Completed_seq : Positive_seq.Object;
   use type Interfaces.Unsigned_32;

   procedure Free is new Unchecked_deallocation (
      Client_slot, Client_slot_access);

   ------------------------------------------------------------------------

   -- Event system                                                       --

   ------------------------------------------------------------------------

   procedure Process_event (Queue_id : in String) is
   begin
      -- Only to test accessibility of Queue_id

      Trace.Log (" ~~~ Upload event for " & Queue_id, Trace.Never);
      Object.Process_event (Queue_id);
   end Process_event;

   package Event_queue is new Generic_event_queue (
      String, Process_event);

   Events : Event_queue.Object;

   procedure Create_event (Queue_id : in String; Deadline : Real_time.Time) is
      Event : Event_queue.Event_type;
   begin
      Event_queue.Create (Events, Event, Deadline, Queue_id);
   end Create_event;

   ------------------------------------------------------------------------

   -- PROTECTED OBJECT (MANAGER)                                         --

   ------------------------------------------------------------------------

   protected body Object is

   ------------------

   -- Upload_stats --

   ------------------

   procedure Upload_stats is
   begin
      if Chronos.Elapsed (Sent_cron) > 0.8 then
         Chronos.Reset (Sent_cron);
         -- Pending requests

         Statistics.Object.Set (
            "Network - Pending requests",
            Statistics.Integers.Create (Client_pool.Length (Pending)));
         -- Queued clients

         Statistics.Object.Set (
            "Uploads - Queued clients",
            Statistics.Integers.Create (Client_list.Length (Clients)));
         -- Session data sent

         begin
            Statistics.Object.Set (
               Stat_session_upload,
               Statistics.Strings.Create (
                  Long_long_integer'Image (Session_sent) & " B (" &
                  Convert.To_size (Float (Session_sent)) & ")"));
         exception
            when others =>
            Statistics.Object.Set (
               Stat_session_upload,
               Statistics.Strings.Create (
                  Long_long_integer'Image (Session_sent) & " B (>" &
                  Convert.To_size (Float'Last) & ")"));
         end;
         -- Session mean speed

         declare
            use Calendar;
         begin
            Mean_speed := Speed (
               Long_long_float (Session_sent) /
               Long_long_float (Clock - Globals.Adagio_start));
            Statistics.Object.Set (
               Stat_session_speed,
               Statistics.Strings.Create (
                  Convert.To_size (Float (Mean_speed)) & "/s"));
         exception
            when others =>
               Mean_speed := Speed'Last;
               Statistics.Object.Set (
                  Stat_session_speed,
                  Statistics.Strings.Create (">" &
                     Convert.To_size (Float'Last) & "/s"));
         end;
      end if;
   end Upload_stats;

   ------------------------------------------------------------------------

   -- Remove                                                             --

   ------------------------------------------------------------------------

   -- Frees all memory used by a client

   procedure Remove (Queue_id : in String) is
      Slot : Client_slot_access;
      use Client_list;
      use type Ada.Calendar.Time;
   begin
      Slot := Element (Find (Clients, Queue_id));

      if Slot.Session_sent > 0 then
         Client_data.List.Add_sent (
            Client.Id (Slot.Client.all), Slot.Session_sent);
         if Calendar.Clock - Last_client_data_save > 60.0 then
            Last_client_data_save := Calendar.Clock;
            Client_data.List.Save;
         end if;
      end if;

      Client.Cancel (Slot.Client.all);
      Client.Free (Slot.Client);
      Free (Slot);
      Delete (Clients, Queue_id);
   end Remove;

   ------------------------------------------------------------------------

   -- Enqueue                                                            --

   ------------------------------------------------------------------------

   procedure Enqueue (this : access Client.Object'Class) is
      Aux : Client.Object_access := Client.Object_access (this);
   begin
      -- Add to pending of identification clients

      if Client_pool.Length (Pending) >= Max_unknown then
         Trace.Log ("Upload.Queue.Manager: Cannot enqueue client " &
            Client.Id (This.all) & ", max" &
            " unknown reached.", Trace.Informative);
         Client.Cancel (this.all);
         Client.Free (Aux);
      elsif Client_pool.Is_in (Client.Id (This.all), Pending) then
         Trace.Log ("Upload.Queue.Manager: Cannot enqueue client " &
            Client.Id (This.all) & ", " &
            " unresolved request already pending.", Trace.Informative);
         Client.Cancel (this.all);
         Client.Free (Aux);
      else
         Client_pool.Insert (
            Pending, Client.Id (This.all),
               (Arrival_time => Calendar.Clock,
                Client       => Client.Object_access (This)));
      end if;
   end Enqueue;

   ------------------------------------------------------------------------

   -- Init                                                               --

   ------------------------------------------------------------------------

   -- Restore itself from a file

   -- And upload manager simply stores all its clients. When restoring,

   --    each queue searches them there by Id.

   procedure Init (File : in String) is
      Xml_queues : Xml.Node_array :=
         Xml.Get_all ("uploads/queue", Globals.Config);
      New_queue  : Queue.Object_access;
      Priorities : Natural := 0;
   begin
      Path := U (File);
      -- Sum all priorities:

      for N in Xml_queues'Range loop
         if Xml.Get_attribute (Xml_queues (N), "active", "yes") = "yes" then
            Priorities := Priorities + Xml.Utils.Get_num (
               Xml_queues (N), "priority", 1);
         end if;
      end loop;
      -- Create all queues:

      for N in Xml_queues'Range loop
         if Xml.Get_attribute (Xml_queues (N), "active", "yes") = "yes" then
            New_queue := new Queue.Object;
            Queue.Create_from_xml (
               New_queue.all,
               Xml_queues (N),
               Float (Xml.Utils.Get_num (Xml_queues (N), "priority", 1)) /
                 Float (Priorities));
            Queue_list.Insert (
               Queues,
               Xml.Get_attribute (Xml_queues (N), "name", ""),
               New_queue);
         end if;
      end loop;
   end Init;

   ------------------------------------------------------------------------

   -- Process                                                            --

   ------------------------------------------------------------------------

   -- Processes pending unknowns.

   procedure Process is
      use Client_pool;
      use type Adagio.File.Object;
      use type Ada.Calendar.Time;

      Pos           : Iterator_type := First (Pending);
      Context_in    : Client.Queue_context;
      Context_out   : Client.Client_results;
      Current       : Client.Object_access;
      Arrival       : Calendar.Time;
      Slot          : Client_slot_access;
      Client_queues : Queue_vector.Object (1);
      Success       : Boolean;

      -- Try to enqueue a client in all queues. Return in how many it is.

      procedure Try_queues (
         Current : in Client.Object_access;
         Queued  : out Queue_vector.Object) is

         use type Queue_list.Iterator_type;
         Pos     : Queue_list.Iterator_type := Queue_list.First (Queues);
         Success : Boolean;
      begin
         Queue_vector.Reset (Queued);
         while Pos /= Queue_list.Back (Queues) loop
            Queue_list.Element (Pos).Enqueue (Current, Success);
            if Success then
               Queue_vector.Append (Queued, Queue_list.Element (Pos));
            end if;
            Pos := Queue_list.Succ (Pos);
         end loop;
      end Try_queues;

      use type Upload.Resource.Handle;
   begin
      while Pos /= Back (Pending) loop
         Current := Element (Pos).Client;
         Arrival := Element (Pos).Arrival_time;

         -- Check starvation of request:

         if Calendar.Clock - Arrival >= 60.0 then

            Trace.Log ("Upload.Queue.Manager: Dropping client " &
               Client.Id (Current.all) & " because no request has been made" &
               " in a minute.", Trace.Informative);

            Client.Cancel (Current.all);
            Delete (Pending, Pos);
            Client.Free (Current);

         else

            Context_in.Must_start   := false;
            Context_in.Allowed_up   := File_size'Last;
            Context_in.Allowed_down := File_size'Last;
            begin
               Client.Process (Current.all, Context_in, Context_out);

               -- Enqueue resolved

               if Client.Requested_resource (Current.all) /=
                  Upload.Resource.Null_handle and then 
                  not Context_out.Is_done 
               then

                  if Client_list.Is_in(Client.Queue_id (Current.all), Clients)
                  then
                     Trace.Log ("Upload.Queue.Manager: Dropping request " &
                        Client.Queue_id (Current.all) & ": Already queued");
                     Client.Cancel (Current.all);
                     Delete (Pending, Pos);
                     Client.Free (Current);
                  else
                     Try_queues (Current, Client_queues);
                     if Queue_vector.Length (Client_queues) = 0 then
                        -- Forget it:

                        Client.Reject (Current.all, Client.Busy, Success);
                        Trace.Log ("Upload.Queue.Manager: No queue slots " &
                           "available", Trace.Informative);

                        if Success then
                           Client.Cancel (Current.all);
                           Delete (Pending, Pos);
                           Client.Free (Current);
                           Trace.Log (
                              "Upload.Queue.Manager: Dropping request:" &
                              " No suitable queues or queues are full.",
                              Trace.Informative);
                        end if;
                     else
                        Delete (Pending, Pos);
                        -- Insert it as queued

                        Slot := new Client_slot'(
                           Client        => Current,
                           Queue_id      => U (Client.Queue_id (Current.all)),
                           Position      => Natural'Last,
                           Is_uploading  => false,
                           Start_ack     => false,
                           BW_boost      => 1.0,
                           Last_run      => Calendar.Clock,
                           Active_queue  => null,
                           Queues        => Client_queues,
                           Speed         => (Calendar.Clock, 0, 0),
                           Session_sent  => 0,
                           Session_start => Calendar.Clock);
                        Client_list.Insert (
                           Clients,
                           S (Slot.Queue_id),
                           Slot);
                        Create_event (S (Slot.Queue_id), Real_time.Clock);
                     end if;
                  end if;
               elsif Context_out.Is_done then
               -- Drop failed

                  Trace.Log ("Upload.Queue.Manager: Dropping request for " &
                     Client.Id (Current.all));
                  Client.Cancel (Current.all);
                  Delete (Pending, Pos);
                  Client.Free (Current);
               else
                  Pos := Succ (Pos);
               end if;

            exception
               when Upload.Client.Connection_lost | Socket.Socket_error =>
                  Trace.Log (
                     "Upload.Queue.Manager: Connection lost [resolving] from "
                     & Client.Id (Current.all));
                  Client.Cancel (Current.all);
                  Delete (Pending, Pos);
                  Client.Free (Current);
               when Upload.Client.User_agent_is_banned =>
                  Trace.Log (
                     "Upload.Queue.Manager: Dropping client " &
                     Client.Id (Current.all) & " [security ban]");
                  Client.Cancel (Current.all);
                  Delete (Pending, Pos);
                  Client.Free (Current);
               when Upload.Client.Unknown_request =>
                  Trace.Log (
                     "Upload.Queue.Manager: Dropping client " &
                     Client.Id (Current.all) & " [unknown request]");
                  Client.Cancel (Current.all);
                  Delete (Pending, Pos);
                  Client.Free (Current);
               when E : others =>
                  Trace.Log (
                     "Upload.Queue.Manager: Dropping client " &
                     Client.Id (Current.all) & " because: " &
                     Trace.Report (E), Trace.Error);
                  Client.Cancel (Current.all);
                  Delete (Pending, Pos);
                  Client.Free (Current);
            end;
         end if;
      end loop;
      Upload_stats;
   end Process;

   ------------------------

   -- Lost_from_queues --

   ------------------------

   -- Mark a client lost in all queues except, maybe, a given one:

   procedure Lost_from_queues (Slot : access Client_slot) is
      use Queue_vector;
   begin
      for N in reverse 1 .. Last (Slot.Queues) loop
         Slot.Queues.Vector (N).Lost (S (Slot.Queue_id));
         Delete (Slot.Queues, N);
      end loop;
      pragma Assert (Length (Slot.Queues) = 0);
   end Lost_from_queues;

   ------------------------------------------------------------------------

   -- Process_event                                                      --

   ------------------------------------------------------------------------

   -- Process events for already identified and queued clients

   procedure Process_event (Queue_id : in String) is
      Slot : Client_slot_access;

      ----------------------------

      -- Register_upload_finish --

      ----------------------------

      -- Register an upload that has finished sucessfully

      procedure Register_upload_finish (This : in Client_slot_access) is
         Addr : constant String := Client.Id (This.Client.all);
         Code : Ustring;
         Name : Ustring := U (
            Upload.Resource.Name (
               Upload.Resource.V (
                  Client.Requested_resource (This.Client.all)).all)); 
         Sname : constant String := S (Name);
      begin
         if This.Active_queue = null then
            return;
         end if;
         if Sname'Length > 6 and then
            Sname (Sname'First .. Sname'First + 6) = "TTH of " 
         then 
            Name := U (Sname (Sname'First + 7 .. Sname'Last));
         end if;
         Code := U (Agpl.Geoip.Country_code_from_addr (Addr));
         if S (Code) = "??" then
            Code := U ("unknown");
         end if;
         Upload.Log.Records.Add ((
            Filename  => Name,
            Client    => U (Client.Name (This.Client.all)),
            Last_seen => Calendar.Clock,
            Address   => U (Addr),
            Code      => Code,
            Country   => U (Agpl.Geoip.Country_name_from_addr (Addr)),
            Queue     => U (This.Active_queue.Get_name)));
      end Register_upload_finish;
      ------------------------

      -- Reset_upload_speed --

      ------------------------

      procedure Reset_upload_speed (Slot : access Client_slot) is
         use type Calendar.Time;
         Avg_period : Duration := Slot.Active_queue.Get_avg_period;
      begin
         Slot.Speed.Current_amount := 0;
         Slot.Speed.Next_deadline  :=
            Calendar.Clock + Avg_period;
         Slot.Speed.Target_amount  := File_size (Avg_period) *
            Slot.Active_queue.Get_min_speed;
      end Reset_upload_speed;

      ------------------------

      -- Lost_from_queues --

      ------------------------

      -- Mark a client lost in all queues except, maybe, a given one:

      procedure Lost_from_queues (Except : Queue.Object_access := null) is
         use Queue_vector;
      begin
         for N in reverse 1 .. Last (Slot.Queues) loop
            if Slot.Queues.Vector (N) /= Except then
               Slot.Queues.Vector (N).Lost (Queue_id);
               Delete (Slot.Queues, N);
            end if;
         end loop;
      end Lost_from_queues;

      ------------------------

      -- Remove_from_queues --

      ------------------------

      -- Get a client out of all queues except, maybe, a given one:

      procedure Remove_from_queues (Except : Queue.Object_access := null) is
         use Queue_vector;
      begin
         for N in reverse 1 .. Last (Slot.Queues) loop
            if Slot.Queues.Vector (N) /= Except then
               Slot.Queues.Vector (N).Remove (Queue_id);
               Delete (Slot.Queues, N);
            end if;
         end loop;
         pragma Assert (
            (Length (Slot.Queues) = 1 and then Except /= null) or else
            (Length (Slot.Queues) = 0 and then Except = null));
      end Remove_from_queues;

      ---------------------

      -- Process_waiting --

      ---------------------

      procedure Process_waiting is
         Status     : Queue_slot;
         Data_in    : Client.Queue_context;
         Data_out   : Client.Client_results;
         Max_slots  : Natural := 0;
         Used_slots : Natural := 0;
         Aux        : Interfaces.Unsigned_32;
         use Ada.Real_time;
      begin
         Slot.Last_run := Calendar.Clock;

         -- Extract best queue position and can start:

         Slot.Position := Natural'Last;
         for N in 1 .. Queue_vector.Last (Slot.Queues) loop
            Slot.Queues.Vector (N).Check_client (Queue_id, Status);

            -- Start!

            if Status.Can_start then
               Slot.Is_uploading := true;
               Slot.Active_queue := Slot.Queues.Vector (N);
               Remove_from_queues (Except => Slot.Queues.Vector (N));
               exit;
            else
               Slot.Position := Natural'Min (Slot.Position, Status.Position);
               Max_slots     := Natural'Max (Max_slots, Status.Max_slots);
               Used_slots    := Natural'Max (Used_slots, Status.Used_slots);
            end if;
         end loop;

         -- Defer if upload starting, process cc.

         if Slot.Is_uploading then
            Trace.Log(
               "Upload.Queue.Manager: Starting upload of " & 
                  Upload.Resource.Name (
                     Upload.Resource.V (
                        Client.Requested_resource (Slot.Client.all)).all) &
                  " to " & Client.Id (Slot.Client.all) & " (" &
                  Client.Name (Slot.Client.all) & ")",
               Trace.Informative);
            Create_event (Queue_id, Real_time.Clock);
         else
            Data_in := (
               Position      => Slot.Position,
               Max_slots     => Max_slots,
               Current_slots => Used_slots,
               Must_start    => false,
               Allowed_up    => File_size'Last,
               Allowed_down  => File_size'Last);
            begin
               Upload.Client.Process (Slot.Client.all, Data_in, Data_out);
            exception
               when Upload.Client.Connection_lost | Socket.Socket_error =>
                  Trace.Log ("Queue [waiting]: Connection lost to " & 
                  Queue_id);
                  Lost_from_queues;
                  Remove (Queue_id);
                  return;
               when Upload.Client.Unknown_request =>
                  Trace.Log ("Queue [waiting]: Unknown request from " & 
                  Queue_id);
                  Lost_from_queues;
                  Remove (Queue_id);
                  return;
               when Upload.Client.Client_polled_too_soon =>
                  Trace.Log ("Queue [waiting]: Poll too soon for " & 
                  Queue_id);
                  Lost_from_queues;
                  Remove (Queue_id);
                  return;
               when Upload.Client.Client_missed_poll_deadline =>
                  Trace.Log ("Queue [waiting]: Poll too late for " & 
                  Queue_id);
                  Lost_from_queues;
                  Remove (Queue_id);
                  return;
               when Upload.Client.User_agent_is_banned =>
                  Remove_from_queues;
                  Remove (Queue_id);
                  return;
               when E : others =>
                  Trace.Log ("Upload.Queue.Process_event (queued): Slot " &
                     Queue_id & ": " & Trace.Report (E), Trace.Error);
                  -- Lost!

                  Lost_from_queues;
                  Remove (Queue_id);
                  return;
            end;

            -- Neat ending.

            if Data_out.Is_done then
               Trace.Log ("Upload.Queue.Process_event: Upload " &
                  Upload.Resource.Name (
                     Upload.Resource.V (
                        Client.Requested_resource (Slot.Client.all)).all) &
                  " finished successfully [waiting].", Trace.Informative);
               Register_upload_finish (Slot);
               Remove_from_queues;
               Remove (Queue_id);
               Completed_seq.Get_next (Aux);
               Statistics.Object.Set (Stat_session_completed, 
                  Statistics.Integers.Create (Integer (Aux + 1)));
               return;
            end if;
            -- Check for request change:

            if Client.Queue_id (Slot.Client.all) /= Queue_id then
               Trace.Log ("Upload.Queue.Manager: Client " & Queue_id &
                  " changed " & "request to " &
                  Client.Queue_id (Slot.Client.all) &
                  " while waiting, dropping.", Trace.Informative);
               Lost_from_queues;
               Remove (Queue_id);
               return;
            end if;
            Create_event (Queue_id, Data_out.Awakening);
         end if;
      end Process_waiting;

      -----------------------

      -- Process_uploading --

      -----------------------

      procedure Process_uploading is
         use type Calendar.Time;
         use type Real_time.Time;

         Safe_BW  : File_size;
         Extra_BW : File_size;
         Awarded, Awarded_extra : File_size := 0;

         Elapsed  : Duration := Calendar.Clock - Slot.Last_run;

         Data_in  : Client.Queue_context;
         Data_out : Client.Client_results;

         Aux : Interfaces.Unsigned_32;

         procedure Drop_client is
         begin
            Remove_from_queues;
            Remove (Queue_id);
         end Drop_client;
      begin
         -- BANDWIDTH MANAGEMENT

         begin
            Safe_BW  :=
               File_size (Float (Slot.Active_queue.Get_slot_bandwidth) *
               Float (Elapsed));
         exception
            when Constraint_error =>
               Safe_BW := File_size'Last;
         end;
         begin
            if Share_bandwidth then
               Extra_BW :=
                  File_size (Float (Slot.Active_queue.Get_slot_bandwidth) *
                  Float (Elapsed) * Slot.BW_boost) - Safe_BW;
            else
               Extra_BW := 0;
            end if;
         exception
            when Constraint_error =>
               Extra_BW := File_size'Last;
         end;

         if Extra_BW > 0 then
            Bandwidth.Commit (Safe_BW, Awarded);
            Bandwidth.Commit (Extra_BW, Awarded_extra, Extra => true);
         else
            Bandwidth.Commit (Safe_BW, Awarded);
            Extra_BW := 0;
         end if;

         Slot.Last_run := Calendar.Clock;

         Data_in := (
               Position      => 1,
               Max_slots     => 1,
               Current_slots => 1,
               Must_start    => true,
               Allowed_up    => Awarded + Awarded_extra,
               Allowed_down  => 0);

--         Globals.Main_throttle.Start_work;

         begin
            Upload.Client.Process (Slot.Client.all, Data_in, Data_out);
         exception
            when Upload.Client.Connection_lost | Socket.Socket_error =>
               Trace.Log ("Queue [uploading]: Connection lost to " & 
               Queue_id);
               Register_upload_finish (Slot);
               Completed_seq.Get_next (Aux);
               Statistics.Object.Set (Stat_session_completed, 
                  Statistics.Integers.Create (Integer (Aux + 1)));
               Remove_from_queues;
               Remove (Queue_id);
               return;
            when Upload.Client.Unknown_request =>
               Trace.Log ("Queue [uploading]: Unknown request from " & 
               Queue_id);
               Lost_from_queues;
               Remove (Queue_id);
               return;
            when E : others =>
               Trace.Log ("Upload.Queue.Process_event (Uploading): Slot " &
                  Queue_id & ": " & Trace.Report (E), Trace.Warning);
               -- Drop

               Remove_from_queues;
               Remove (Queue_id);
               return;
         end;
--         Globals.Main_throttle.End_work;


         -- Neat ending.

         if Data_out.Is_done then
            Trace.Log ("Upload.Queue.Process_event: Upload " &
               Upload.Resource.Name (
                  Upload.Resource.V (
                     Client.Requested_resource (Slot.Client.all)).all) &
               " finished successfully [uploading].", Trace.Informative);
            Register_upload_finish (Slot);
            Remove_from_queues;
            Remove (Queue_id);
            Completed_seq.Get_next (Aux);
            Statistics.Object.Set (Stat_session_completed, 
               Statistics.Integers.Create (Integer (Aux + 1)));
            return;
         end if;

         -- Check for request change (not allowed while uploading):

         if Client.Queue_id (Slot.Client.all) /= Queue_id then
            Trace.Log ("Upload.Queue.Manager: Client " & Queue_id &
               " changed " & "request to " &
               Client.Queue_id (Slot.Client.all) &
               " while uploading, dropping.", Trace.Informative);
            Remove_from_queues;
            Remove (Queue_id);
            return;
         end if;

          -- Check speeds:

         if not Slot.Start_ack and Data_out.Is_uploading then
            Slot.Start_ack := true;
            Reset_upload_speed (Slot);
            Slot.Session_start := Calendar.Clock;
         end if;
         if Slot.Start_ack then
            Slot.Speed.Current_amount :=
               Slot.Speed.Current_amount + Data_out.Sent;
            Slot.Session_sent := Slot.Session_sent + Data_out.Sent;
            if Slot.Speed.Current_amount >= Slot.Speed.Target_amount then
               Reset_upload_speed (Slot);
            elsif Slot.Speed.Next_deadline < Calendar.Clock then
               -- Failed, too slow:

               Trace.Log ("Upload.Queue.Manager.Process_event: Upload " &
                  Queue_id & " is too slow, dropping.", Trace.Informative);
               Remove_from_queues;
               Remove (Queue_id);
               return;
            end if;
            -- Stats

            Object.Session_sent := 
               Object.Session_sent + Long_long_integer (Data_out.Sent);
            Upload_stats;
         end if;

         -- Check preemptions

         if Slot.Start_ack then
            declare
               use type Queue.Preemptions;
               Preempt : Queue.Preemptions :=
                  Slot.Active_queue.Get_preemptions;
            begin
               if Preempt.Kind = Time or Preempt.Kind = Both then
                  if Calendar.Clock - Slot.Session_start >= Preempt.Time then
                     Drop_client;
                     Trace.Log ("Upload.Queue.Manager.Process_event: " &
                        "Time preemption for " & Queue_id, Trace.Informative);
                     return;
                  end if;
               end if;
               if Preempt.Kind = Size or Preempt.Kind = Both then
                  if Slot.Session_sent >= Preempt.Size then
                     Drop_client;
                     Trace.Log ("Upload.Queue.Manager.Process_event: " &
                        "Size preemption for " & Queue_id, Trace.Informative);
                     return;
                  end if;
               end if;
            end;
         end if;

         -- Throttling

         if Data_out.Sent = Awarded + Awarded_extra then
            -- Throttle faster if consumed all awarded

            if Awarded + Awarded_extra > 0 then
               if Awarded + Awarded_extra = Extra_BW + Safe_BW then
                  Slot.BW_boost := Float'Max (
                     Slot.BW_boost * (1.0 + (1.0 - Throttle)), 0.1);
               end if;
               -- Maximum throttle:

               Slot.BW_boost := Float'Min (Float'Last / 2.0, Slot.BW_boost);
            end if;
         else
            -- Throttle slower

            Slot.BW_boost := Slot.BW_boost * Throttle;
         end if;
--         Trace.Log ("Upload throttle: " &

--            Misc.To_string (Slot.BW_boost, 2));


         -- Schedule a new event for this slot:

         if Data_out.Awakening <
            Real_time.Clock + Real_time.To_time_span (Minimum_send_delay)
         then
            Data_out.Awakening :=
               Real_time.Clock + Real_time.To_time_span (Minimum_send_delay);
         end if;

         Create_event (Queue_id, Data_out.Awakening);
      end Process_uploading;

   begin
      -- Check applicable:

      if Globals.Requested_exit then
         return;
      elsif not Client_list.Is_in (Queue_id, Clients) then
         Trace.Log ("Upload.Queue.Manager.Process_event: " &
            "Event for missing upload: " & Queue_id, Trace.Warning);
         return;
      else
         Slot := Client_list.Element (Client_list.Find (Clients, Queue_id));
      end if;

      declare
         Cron : Chronos.Object;
      begin
         if Slot.Is_uploading then
            Process_uploading;
            if Chronos.Elapsed (Cron) > 5.0 then
               Trace.Log ("Upload process client too long [uploading]", 
                  Trace.Warning);
            end if;
         else
            Process_waiting;
            if Chronos.Elapsed (Cron) > 5.0 then
               Trace.Log ("Upload process client too long [uploading]", 
                  Trace.Warning);
            end if;
         end if;
      end;

   exception
      when E : others =>
         if Upload.Client."/=" (Slot.Client, null) then
            Remove_from_queues;
            Remove (S (Slot.Queue_id));
         end if;
         Trace.Log ("Upload.Queue.Manager.Process_event: " & Trace.Report (E),
            Trace.Error);
   end Process_event;

   ------------------------------------------------------------------------

   -- Save_queues                                                        --

   ------------------------------------------------------------------------

   -- Dump queues to disk

   procedure Save_queues is
      use Queue_list;
      Pos : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Element (Pos).Serialize (S (Globals.Data_folder));
         Pos := Succ (Pos);
      end loop;
   end Save_queues;

   ------------------------------------------------------------------------

   -- Total_length                                                       --

   ------------------------------------------------------------------------

   -- Sumed length of all queues

   function Total_length return Natural is
      use Queue_list;
      Total : Natural := 0;
      Pos   : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Total := Total + Element (Pos).Get_alive_length;
         Pos   := Succ (Pos);
      end loop;

      return Total;
   end Total_length;

   ------------------------------------------------------------------------

   -- Max_length                                                         --

   ------------------------------------------------------------------------

   -- Max length of any queue

   function Max_length return Natural is
      use Queue_list;
      Total : Natural := 0;
      Pos   : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Total := Natural'Max (Total, Element (Pos).Get_alive_length);
         Pos   := Succ (Pos);
      end loop;

      return Total;
   end Max_length;

   ------------------------------------------------------------------------

   -- Max_active_length                                                  --

   ------------------------------------------------------------------------

   -- Sumed length of allowed active uploads for every queue

   function Max_active_length return Natural is
      use Queue_list;
      Total : Natural := 0;
      Pos   : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Total := Total + Element (Pos).Get_uploads;
         Pos := Succ (Pos);
      end loop;

      return Total;
   end Max_active_length;

   ------------------------------------------------------------------------

   -- Num_active_uploads                                                 --

   ------------------------------------------------------------------------

   -- Total numbr of clients uploading

   function Num_active_uploads return Natural is
      use Queue_list;
      Total : Natural := 0;
      Pos   : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Total := Total + Element (Pos).Get_current_uploads;
         Pos := Succ (Pos);
      end loop;

      return Total;
   end Num_active_uploads;

   ------------------------------------------------------------------------

   -- Num_waiting                                                        --

   ------------------------------------------------------------------------

   -- Total number of clients waiting

   function Num_waiting return Natural is
      use Queue_list;
      Total : Natural := 0;
      Pos   : Iterator_type := First (Queues);
   begin
      while Pos /= Back (Queues) loop
         Total := Total + Element (Pos).Get_current_waiting;
         Pos := Succ (Pos);
      end loop;

      return Total;
   end Num_waiting;

   ------------------------------------------------------------------------

   -- Shutdown                                                           --

   ------------------------------------------------------------------------

   procedure Shutdown is
   begin
      declare
         use Client_list;
         Pos : Iterator_type := First (Clients);
      begin
         while Pos /= Back (Clients) loop
            Remove (S (Element (Pos).Queue_id));
            Pos := First (Clients);
         end loop;
      end;
      Event_queue.Shutdown (Events);
      Save_queues;
      Client_data.List.Save;
   end Shutdown;

   ------------------------------------------------------------------------

   -- Report                                                             --

   ------------------------------------------------------------------------

   -- Return queues names:

   function Report return Ustring_array is
      Result : Ustring_array (1 .. Queue_list.Length (Queues));
      use Queue_list;
      I : Iterator_type := First (Queues);
   begin
      for N in Result'Range loop
         Result (N) := U (Element (I).Get_name);
         I := Succ (I);
      end loop;
      return Result;
   end Report;

   ------------------------------------------------------------------------

   -- Report_queue                                                       --

   ------------------------------------------------------------------------

   function Report_queue (
      Name : in String;
      From : in Natural;
      Qty  : in Natural;
      Lost : in Boolean) return Queue.Report_array 
   is
      use Queue_list;
      I         : Iterator_type := Find (Queues, Name);
      No_report : Queue.Report_array (1 .. 0);
   begin
      if I /= Back (Queues) then
         return Element (I).Report (From, Qty, Lost);
      else
         return No_report;
      end if;
   end Report_queue;

   ------------------------------------------------------------------------

   -- Http_report                                                        --

   ------------------------------------------------------------------------

   procedure Http_report (
      Name : in  String;  -- Queue name

      Lost : in  Boolean; -- Show lost ones

      Data : out Agpl.Http.Server.Sort_handler.Data_set) 
   is
      use Queue_list;
      I : Iterator_type := Find (Queues, Name);
   begin
      if I /= Back (Queues) then
         Element (I).Http_report (Lost, Data);
      else
         I := First (Queues);
         while I /= Back (Queues) loop
            Element (I).Http_report (Lost, Data);
            I := Succ (I);
         end loop;
      end if;
   end Http_report;

   ------------------------------------------------------------------------

   -- Members access                                                     --

   ------------------------------------------------------------------------

   -- Get_session_sent

   function Get_session_sent return Long_long_integer is
   begin
      return Session_sent;
   end Get_session_sent;

   -- Get_mean_speed

   function Get_mean_speed return Speed is
   begin
      return Mean_speed;
   end Get_Mean_speed;

   end Object;

   task Manager_Update;

   task body Manager_Update is
   begin
      loop
         exit when Globals.Requested_exit;
         delay 0.25;
         if Debug.Debug_statistics_enabled then
            Statistics.Object.Set ("Tasking - Uploads manager",
               Statistics.Booleans.Create (true));
         end if;
         begin
            Manager.Object.Process;
         exception
            when E : others =>
               Trace.Log ("Upload.Queue_manager.Manager_update: " &
                  Trace.Report (E), Trace.Error);
         end;
      end loop;
      Trace.Log ("Adagio.Upload.Queue.Manager.Manager_update exited.");
   end Manager_Update;

begin
   Client_data.List.Init;
end Adagio.Upload.Queue.Manager;