File : adagio-g2-transceiver_types-prot.adb
------------------------------------------------------------------------------
-- ADAGIO - ADALID - AENEA. --
-- --
-- Copyright (C) 2003 --
-- A. Mosteo. --
-- --
-- Authors: A. Mosteo. (adagio@mosteo.com) --
-- --
-- If you have any questions in regard to this software, please address --
-- them to the above email. --
-- --
-- This program is free software; you can redistribute it and/or modify --
-- it under the terms of the GNU General Public License as published by --
-- the Free Software Foundation; either version 2 of the License, or (at --
-- your option) any later version. --
-- --
-- This program is distributed in the hope that it will be useful, but --
-- WITHOUT ANY WARRANTY; without even the implied warranty of --
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU --
-- General Public License for more details. --
-- --
-- You should have received a copy of the GNU General Public License --
-- along with this library; if not, write to the Free Software Foundation, --
-- Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. --
-- --
-- You are not allowed to use any part of this code to develop a program --
-- whose output would be used to harass or prosecute other users of the --
-- networks Adagio connects with. All data collected with Adagio or a tool --
-- containing Adagio code about other network users must remain --
-- confidential and cannot be made public by any mean, nor be used to --
-- harass or legally prosecute these users. --
------------------------------------------------------------------------------
-- $Id: adagio-g2-transceiver_types-prot.adb,v 1.18 2004/03/29 19:13:30 Jano Exp $
with Adagio.Chronos;
with Adagio.G2.Packet;
with Adagio.G2.Packet.Parsing;
with Adagio.Memory_stream_constrained;
with Adagio.Misc;
with Adagio.Network_Settings;
with Adagio.Security;
with Adagio.Socket.Ip;
with Adagio.Statistics;
with Adagio.Statistics.Integers;
with Adagio.Trace;
with Adagio.Traffic;
with Adagio.Zutil;
with Zlib;
with Ada.Real_time;
with Ada.Streams; use Ada.Streams;
with Ada.Unchecked_conversion;
use Ada;
package body Adagio.G2.Transceiver_types.Prot is
use type Fragment_list.Iterator_type;
------------------------------------------------------------------------
-- Get_Outbound_Udp_Delay --
------------------------------------------------------------------------
-- Returns the estimated time to deplete the outbound udp data queue
function Get_Outbound_Udp_Delay (This : in Object) return Duration is
begin
return This.Core.Get_Outbound_Udp_Delay;
end Get_Outbound_Udp_Delay;
------------------------------------------------------------------------
-- Start --
------------------------------------------------------------------------
-- Activate it!
-- Receives a binded udp socket which will be used for
-- incoming and outcoming data.
-- Receives a packet queue wherein arrived packets will be added.
procedure Start (
this : in out Object;
S : in Socket.Object_access;
Queue : in Packet.Queue.Object_access) is
begin
This.Core.Start (S, Queue);
This.Dispatcher.Start;
This.Sender.Start;
end Start;
------------------------------------------------------------------------
-- Send --
------------------------------------------------------------------------
-- Send a packet, securely or not:
procedure Send (
This : in out Object;
Item : in G2.Packet.Queue.Item_type) is
use type Network_Settings.Routings;
begin
if Network_Settings.Internet_Route = Network_Settings.None and then
Socket.IP.Is_Public (Socket.Image (Item.Udp_Destination))
then
Trace.Log ("G2.Transceiver.Send: Dropped unroutable public packet while firewalled.", Trace.Debug);
return; -- <-- EARLY EXIT, DROPPED UNROUTABLE PACKET
end if;
This.Core.Send (Item);
G2.Packets_sent := Natural'Min (
G2.Packets_sent + 1, Natural'Last - 1);
-- Mark traffic
Traffic.Add ((
Arrival => Calendar.Clock,
Protocol => Protocol_descr,
Way => Traffic.Outgoing,
From => U (Socket.Image (Item.Udp_destination)),
Name => U (G2.Packet.Name (Item.Packet)),
Data => Safe_descr (Item.Udp_safe) &
U (G2.Packet.To_hex (Item.Packet))));
end Send;
------------------------------------------------------------------------
-- Set_BW_limits --
------------------------------------------------------------------------
procedure Set_BW_limits (
This : in out Object;
BW_in : in Speed;
BW_out : in Speed) is
begin
This.BW_in := BW_in;
This.BW_out := BW_out;
end Set_BW_limits;
------------------------------------------------------------------------
-- Shutdown --
------------------------------------------------------------------------
-- The end.
procedure Shutdown (this : in out Object) is
begin
Event_queue.Shutdown (This.Timeouts);
end Shutdown;
-----------
-- Stats --
-----------
function Get_stat (This : in Object; Stat : in Stats) return Natural is
begin
case Stat is
when Pending_in =>
return This.Core.Length_by_arrival;
when Pending_out =>
return This.Core.Length_outbound;
when Throttled_out =>
return This.Core.Length_throttled_out;
end case;
end Get_stat;
------------------------------------------------------------------------
-- Core_type --
------------------------------------------------------------------------
protected body Core_type is
procedure Debug_out is
I : Fragment_list.Iterator_type := Fragment_list.First (Packets_out);
J : Fragment_list.Iterator_type;
begin
Trace.log ("PACKETS_OUT CONTENTS");
while I /= Fragment_list.Back (Packets_out) loop
Trace.log ("KEY:" & Interfaces.Unsigned_16'Image (
Fragment_list.Key (I)));
J := Fragment_list.Find (Packets_out,
Fragment_list.Key (I));
Trace.log ("KEYBIS:" & Interfaces.Unsigned_16'Image (
Fragment_list.Key (J)));
I := Fragment_list.Succ (I);
end loop;
end Debug_out;
---------------------
-- Add_safe_packet --
---------------------
-- Success says if the packet is stored in the list
procedure Add_safe_packet (Packet : access Packet_type) is
Retry : Retry_context;
use Ada.Real_time;
begin
Retry.Transceiver := Object_access (Parent);
-- Insert in list of tracked packets:
-- Trace.Log ("INSERTING SAFE PACKET ------------>" &
-- Packet.Header.nSequence'Img);
Fragment_list.Insert (
Packets_out, Packet.Header.nSequence, Packet_access (Packet));
-- Debug_out;
-- Create timeouts:
Retry.Retry.nSequence := Packet.Header.nSequence;
for N in 1 .. Integer (Packet.Header.nCount) loop
Retry.Retry.nPart := Interfaces.Unsigned_8 (N);
Event_queue.Create (
Parent.Timeouts,
Packet.Fragments (N).Timeout,
Real_time.Clock + To_time_span (Retry_timeout),
Queue_retry'Access,
Retry);
end loop;
-- Done
end Add_safe_packet;
----------------------
-- Allocate_inbound --
----------------------
-- Allocates a new packet and drops the oldest one if already at limit
procedure Allocate_inbound (
From : in Packet_access;
P : out Packet_access;
Frags : in Natural)
is
It : PLbA.Iterator_type;
use type Calendar.Time;
begin
if Available_inbound = 0 then
-- Delete oldest:
Trace.Log (
"G2.Transceiver: Dropping oldest inbound packet.",
Trace.Warning);
Remove_older_inbound;
end if;
P := new Packet_type (Count => Frags);
P.Header := From.Header;
P.Source := From.Source;
-- Ensure no duplicate arrival times in structure:
-- (Could use a multiset here, shouldn't be a problem but...)
loop
if P.Arrived /= Last_inbound then
Last_inbound := P.Arrived;
exit;
end if;
delay 0.000001;
P.Arrived := Calendar.Clock;
end loop;
-- Add to the two indexes:
PLbA.Insert (Packets_in_by_arrival, P, It);
PLbS.Insert (Packets_in_by_source, Id_inbound (P.all), It);
end Allocate_inbound;
-----------------------
-- Allocate_outbound --
-----------------------
-- Allocates a new packet and drops the oldest one if already at limit
procedure Allocate_outbound (
P : out Packet_access; Frags : in Natural) is
begin
if Available_outbound = 0 then
P := Fragment_list.Element (Fragment_list.First (Packets_out));
if P = null then
Trace.Log ("G2.Transceiver: Empty outbound queue while " &
" Available_outbound = 0!", Trace.Warning);
else
Drop_out_packet (P.Header.nSequence, Success => false);
Trace.Log ("G2.Transceiver: Dropping oldest outbound packet",
Trace.Warning);
end if;
end if;
P := new Packet_type (Count => Frags);
end Allocate_outbound;
---------------
-- Available --
---------------
function Available return Boolean is
begin
return Socket.Available (Udp.all) > 0;
end Available;
--------------------------------
-- Available_inbound/outbound --
--------------------------------
function Available_inbound return Natural is
begin
return Natural (Integer'Max (Max_packets - Length_by_source, 0));
end Available_inbound;
function Available_outbound return Natural is
begin
return Natural (Integer'Max (Max_packets - Length_outbound, 0));
end Available_outbound;
--------------
-- Complete --
--------------
-- Do completion of a packet:
procedure Complete (Packet : in out Packet_access) is
function Is_compressed return Boolean is
begin
return (Packet.Header.nFlags and Flag_deflate) /= 0;
end Is_compressed;
Length : Stream_element_offset := 0;
Item : G2.Packet.Queue.Item_type;
use Ada.Streams;
begin
-- Defaults:
Item.Source := G2.Packet.Queue.Listener_udp;
Item.Udp_source := Packet.Source;
-- Reconstruct data
for N in Packet.Fragments'Range loop
Length := Length + Packet.Fragments (N).Last;
end loop;
declare
Data : Stream_element_array (1 .. Length);
Pos : Stream_element_offset := Data'First;
begin
for N in Packet.Fragments'Range loop
Data (Pos .. Pos + Packet.Fragments (N).Last - 1) :=
Packet.Fragments (N).Data (1 .. Packet.Fragments (N).Last);
Pos := Pos + Packet.Fragments (N).Last;
end loop;
if Is_compressed then
declare
ZData : Stream_element_array := Zutil.Inflate (Data);
begin
Item.Packet := G2.Packet.Parsing.From_element_array (ZData);
exception
when Zlib.Zlib_Error =>
Trace.Log ("Transceiver.Complete: Zlib error inflating packet with" &
Integer'Image (Packet.Fragments'Length) & " fragments",
Trace.Warning);
Remove_inbound (Id_inbound (Packet.all));
return;
end;
else
Item.Packet := G2.Packet.Parsing.From_element_array (Data);
end if;
G2_queue.Put (Item);
-- Mark traffic
Traffic.Add ((
Arrival => Calendar.Clock,
Protocol => Protocol_descr,
Way => Traffic.Incoming,
From => U (Socket.Image (Item.Udp_source)),
Name => U (G2.Packet.Name (Item.Packet)),
Data => Safe_descr (
(Packet.Header.nFlags and Flag_ack) /= 0)
& U (G2.Packet.To_hex (Item.Packet))));
end;
-- Trace.Log ("G2.Transceiver: Deleting completed received (" &
-- Id_inbound (Packet.all) & ")");
-- Count a complete packet received
G2.Packets_received := Natural'Min (
G2.Packets_received + 1, Natural'Last - 1);
Remove_inbound (Id_inbound (Packet.all));
-- Supposedly it's the same being deleted from the collection
-- Free (Packet);
end Complete;
---------------------
-- Drop_out_packet --
---------------------
-- We'll search a packet and free/delete it.
-- It could not be there if has been dropped by excess timeout.
procedure Drop_out_packet (
nSequence : in Interfaces.Unsigned_16;
Success : in Boolean)
is
R : Fragment_list.Iterator_type := Fragment_list.Find (
Packets_out, nSequence);
P : Packet_access;
begin
if R /= Fragment_list.Back (Packets_out) then
P := Fragment_list.Element (R);
-- Cancel timeouts:
for N in 1 .. Integer (P.Header.nCount) loop
if not P.Fragments (N).Valid then
Event_queue.Cancel (
Parent.Timeouts, P.Fragments (N).Timeout);
end if;
end loop;
-- Delete
if Success then
-- Trace.Log ("G2.Transceiver: Packet" &
-- nSequence'Img & " acknowledged.",
-- File => S (Logfile));
null;
else
--Trace.Log ("G2.Transceiver: Packet" &
--nSequence'Img & " out dropped.",
--File => S (Logfile));
null;
end if;
Fragment_list.Delete (Packets_out, R);
Free (P);
else
Trace.Log (
"G2.Transceiver.Drop_out_packet: Target packet missing ("
& Misc.To_string (Natural (nSequence)) & ")",
Trace.Warning);
end if;
end Drop_out_packet;
-----------------------
-- Get_older_inbound --
-----------------------
procedure Get_older_inbound (P : out Packet_access) is
use PLbA;
F : Iterator_type := First (Packets_in_by_arrival);
begin
if F = Back (Packets_in_by_arrival) then
P := null;
else
P := Element (F);
end if;
end Get_older_inbound;
------------------
-- Get_rcv_time --
------------------
function Get_rcv_time return Calendar.Time is
begin
return Next_receive;
end Get_rcv_time;
-------------------
-- Get_send_time --
-------------------
function Get_send_time return Calendar.Time is
begin
return Next_sending;
end Get_send_time;
----------------------------
-- Get_Outbound_Udp_Delay --
----------------------------
-- Says the remaining time to empty the outbound queue
function Get_Outbound_Udp_Delay return Duration is
begin
return Duration (Udp_Msgs_Size) / Duration (Parent.BW_Out);
end Get_Outbound_Udp_Delay;
-------------
-- Lengths --
-------------
function Length_by_arrival return Natural is
begin
return PLbA.Length (Packets_in_by_arrival);
end Length_by_arrival;
function Length_by_source return Natural is
begin
return PLbS.Length (Packets_in_by_source);
end Length_by_source;
function Length_outbound return Natural is
begin
return Fragment_list.Length (Packets_out);
end Length_outbound;
function Length_throttled_out return Natural is
begin
return Udp_message_list.Length (Udp_msgs);
end Length_throttled_out;
--------------------
-- Receive_packet --
--------------------
procedure Receive_packet is
use Ada.Streams;
use type Ada.Calendar.Time;
Buffer : aliased Stream_element_array
(1 .. Max_fragment_size + Header_size);
Last : Stream_element_offset;
Addr : Socket.Sock_addr_type;
P : Packet_access;
Packet : aliased Packet_type (0);
Head : Packet_header;
for Head'Address use Buffer'Address;
pragma Import (Ada, Head);
begin
begin
Socket.Receive (Udp.all, Buffer, Last, Addr);
-- Set next reception time:
Next_receive := Calendar.Clock +
Duration (Last) / Duration (Parent.BW_in);
--Trace.Log ("Scheduled delay for udp rcv : " &
-- Duration'Image (
-- Duration (Last) / Duration (Parent.BW_in)),
-- Trace.Debug);
-- Stat received packets
--G2.Packets_received := Natural'Min (
-- G2.Packets_received + 1, Natural'Last - 1);
if not Security.Is_allowed (Addr.Addr) then
Trace.Log ("G2.Transceiver.Receive_packet: Packet dropped " &
"from " & Socket.Image (Addr.Addr) & " [Security ban]",
Trace.Debug);
return;
end if;
exception
when Socket.Socket_error =>
-- Trace.Log ("G2.Transceiver.Receive_packet: " & Trace.Report (E));
return;
end;
-- Packet too large
if Last = Buffer'Last then
raise Constraint_error;
end if;
-- Don't go further if it's unknown
if Head.szTag /= GND_tag then
Trace.Log ("UDP dropped: Unknown tag: " & Head.szTag,
Trace.Warning, S (Logfile));
return;
end if;
-- Understandable?
if (Head.nFlags and 2#00001100#) /= 0 then
Trace.Log ("UDP dropped: Unknown flags:" & Head.nFlags'Img,
Trace.Warning, S (Logfile));
return;
end if;
-- Search an existing equal:
Packet.Header := Head;
Packet.Source := Addr;
-- ACK?
if Head.nCount = 0 then
--Trace.Log (" <-- ACK " & To_string (Packet.Header) & "/" &
-- Socket.Image (Packet.Source));
-- Mark traffic
--Traffic.Add ((
-- Arrival => Calendar.Clock,
-- Protocol => Protocol_descr,
-- Way => Traffic.Incoming,
-- From => U (Socket.Image (Packet.Source)),
-- Name => U ("ACK"),
-- Data => U (To_string (Packet.Header))));
declare
Result : Fragment_list.Iterator_type := Fragment_list.Find (
Packets_out, Head.nSequence);
nPart : Integer := Integer (Head.nPart);
R : Packet_access renames Fragment_list.Element (Result);
begin
if Result /= Fragment_list.Back (Packets_out) then
R.Fragments (nPart).Valid := true;
-- Cancel timeout:
Event_queue.Cancel (
Parent.Timeouts, R.Fragments (nPart).Timeout);
-- Completely acked?
if Is_complete (R.all) then
Drop_out_packet (Head.nSequence, Success => true);
end if;
else
-- How can it be? we received an ACK for a packet we have
-- not sent.
-- Maybe it has been already dropped.
--Trace.Log (
-- "G2.Transceiver.Receive_packet: ACK for missing " &
-- "packet " & To_string (Packet.Header), Trace.Debug);
null;
end if;
end;
else -- Not ACK
declare
Result : PLbS.Iterator_type := PLbS.Find (
Packets_in_by_source, Id_inbound (Packet));
use type PLbS.Iterator_Type;
begin
--Trace.Log (" <-- FRG " & To_string (Packet.Header) & "/" &
-- Socket.Image (Packet.Source));
-- Create/update an incoming packet.
if Result /= PLbS.Back (Packets_in_by_source) then
--Trace.Log (" <-- FND " & Id_inbound (Packet));
P := PLbA.Element (
PLbS.Element (Result));
else
--Trace.Log (" <-- NEW " & Id_inbound (Packet));
Allocate_inbound (
Packet'Unchecked_Access, P, Natural (Head.nCount));
end if;
P.Header := Packet.Header;
P.Source := Packet.Source;
-- Now actualize data:
if not P.Fragments (Integer (P.Header.nPart)).Valid then
P.Fragments (Integer (P.Header.nPart)).Data (
1 .. Last - Header_size) :=
Buffer (Header_size + 1 .. Last);
P.Fragments (Integer (P.Header.nPart)).Last :=
Last - Header_size;
P.Fragments (Integer (P.Header.nPart)).Valid := true;
end if;
-- Ack
if (Head.nFlags and Flag_ack) /= 0 then
Send_ack (P);
end if;
-- Complete
if Is_complete (P.all) then
Complete (P);
end if;
end;
end if;
exception
when others =>
raise;
end Receive_packet;
--------------------
-- Remove_inbound --
--------------------
procedure Remove_inbound (Id : in String)
is
use PLbS;
P : Packet_access;
I : Iterator_type := Find (Packets_in_by_source, Id);
Alias : PLbA.Iterator_type;
begin
if I = Back (Packets_in_by_source) then
Trace.Log ("G2.Transceiver.Remove_inbound: Packet missing: " &
Id, Trace.Warning);
return;
end if;
Alias := Element (I);
PLbS.Delete (Packets_in_by_source, I);
P := PLbA.Element (Alias);
PLbA.Delete (Packets_in_by_arrival, Alias);
Free (P);
end Remove_inbound;
--------------------------
-- Remove_older_inbound --
--------------------------
procedure Remove_older_inbound is
use PLbA;
F : Iterator_type := First (Packets_in_by_arrival);
P : Packet_access;
begin
if F = Back (Packets_in_by_arrival) then
return;
end if;
P := Element (F);
PLbS.Delete (Packets_in_by_source, Id_inbound (P.all));
PLbA.Delete (Packets_in_by_arrival, F);
Free (P);
end Remove_older_inbound;
--------------------
-- Retry_fragment --
--------------------
procedure Retry_fragment (Part : in out Pending_retransmission) is
Retry : Retry_context;
begin
Part.Failures := Part.Failures + 1;
if Part.Failures * Duration'(Retry_timeout) >= Transmit_timeout then
-- Drop packet!
Drop_out_packet (Part.nSequence, Success => false);
else
declare
Result : Fragment_list.Iterator_type :=
Fragment_list.Find (Packets_out, Part.nSequence);
R : Packet_access;
use Ada.Real_time;
begin
if Result /= Fragment_list.Back (Packets_out) then
R := Fragment_list.Element (Result);
--Trace.Log (
-- "G2.Transceiver: Re-sending fragment because of" &
-- Part.Failures'Img & " timeout(s): " &
-- To_string (R.Header), File => S (Logfile));
-- Retransmit part:
Send_fragment (R, Part.nPart);
-- Re-create timeout
Retry.Retry := Part;
Retry.Transceiver := Object_access (Parent);
Event_queue.Create (
Parent.Timeouts,
R.Fragments (Integer (Part.nPart)).Timeout,
Clock + To_time_span (Retry_timeout),
Queue_retry'Access,
Retry);
else
Trace.Log (
"G2.Transceiver: Missing packet attempting retry:" &
Part.nSequence'Img);
end if;
end;
end if;
end Retry_fragment;
--------------
-- Send_ack --
--------------
procedure Send_ack (P : in Packet_access) is
H : Packet_header := P.Header;
type Header_array is new Stream_element_array (1 .. Header_size);
function To_array is new Unchecked_conversion (
Packet_header, Header_array);
Last : Stream_element_offset;
use type Network_Settings.Routings;
begin
-- Not send if firewalled:
if Network_Settings.Internet_Route = Network_Settings.None and then
Socket.Ip.Is_Public (Socket.Image (P.Source))
then
Trace.Log ("G2.Transceiver.Send_Ack: Dropping ACK because of firewalling.", Trace.Debug);
return; -- <-- EARLY EXIT, DROPPING ACK WHILE FIREWALLED
end if;
-- Stat packets sent
--G2.Packets_sent := Natural'Min (
-- G2.Packets_sent + 1, Natural'Last - 1);
--Trace.Log (" --> ACK " & To_string (P.Header) &
-- "/" & Socket.Image (P.Source),
-- File => S (Logfile));
H.nFlags := 0;
H.nCount := 0;
Socket.Send (
Udp.all, Stream_element_array (to_array (H)), Last, P.Source);
-- Mark traffic
--Traffic.Add ((
-- Arrival => Calendar.Clock,
-- Protocol => Protocol_descr,
-- Way => Traffic.Outgoing,
-- From => U (Socket.Image (P.Source)),
-- Name => U ("ACK"),
-- Data => U (To_string (P.Header))));
exception
when Socket.Security_ban =>
Trace.Log ("G2.Transceiver.Send_ack: ACK dropped for " &
Socket.Image (P.Source.Addr) & " [Security ban]",
Trace.Debug);
end Send_ack;
----------
-- Send --
----------
procedure Send (Item : in G2.Packet.Queue.Item_type) is
Buffer : Stream_element_array (1 .. Max_packet_size);
Offset : Stream_element_offset := Buffer'First;
Stream : aliased Memory_stream_constrained.Stream_type;
Parts : Natural;
Packet : Packet_access;
Remain : Stream_element_offset;
Zdone : Boolean := false;
begin
Memory_stream_constrained.Create
(Stream, Buffer'Address, Buffer'Length);
-- Serialize:
Serialize: begin
G2.Packet.Write (Stream'Access, Item.Packet);
exception
when E : Constraint_error =>
Trace.Log ("G2.Transceiver.Send.Serialize: " &
Trace.Report (E), Trace.Debug);
Trace.Log (
"G2.Transceiver.Send.Serialize: UDP packet too large.",
Trace.Warning);
return;
end Serialize;
-- Try to compress payload:
begin
declare
ZBuffer : Stream_element_array := Zutil.Deflate (
Buffer (1 .. Memory_stream_constrained.Index (Stream)));
begin
Buffer (1 .. ZBuffer'Length) := ZBuffer;
ZDone := true;
end;
exception
when Zutil.Cannot_deflate =>
null;
end;
-- Create fragments:
Parts := (Integer (Memory_stream_constrained.Index (Stream)) - 1) /
Fragment_size + 1;
if Parts > Natural (Interfaces.Unsigned_8'Last) then
Trace.Log ("G2.Transceiver.Send: Packet too big:" &
Natural'Image (Parts) & " fragments needed", Trace.Warning);
return;
end if;
-- Packet exists allocated only within this frame:
begin
Allocate_outbound (Packet, Parts);
Packet.Header.nSequence := Next_packet;
Next_packet := Next_packet + 1;
Packet.Header.szTag := GND_tag;
Packet.Header.nFlags := 0;
Packet.Header.nCount := Unsigned_8 (Parts);
Packet.Destination := Item.Udp_destination;
Packet.Safe := Item.Udp_safe;
if Packet.Safe then
Packet.Header.nFlags := Packet.Header.nFlags or Flag_ack;
end if;
if ZDone then
Packet.Header.nFlags := Packet.Header.nFlags or Flag_deflate;
end if;
for N in 1 .. Parts loop
if N /= Parts then
Packet.Fragments (N).Data (1 .. Fragment_size) :=
Buffer (Offset .. Offset + Fragment_size - 1);
Packet.Fragments (N).Last := Fragment_size;
Offset := Offset + Fragment_size;
else
Remain :=
Memory_stream_constrained.Index (Stream) rem
Fragment_size;
if Remain = 0 then -- A full exactly ended fragment
Remain := Fragment_size;
end if;
Packet.Fragments (N).Data (1 .. Remain) :=
Buffer (Offset .. Offset + Remain - 1);
Packet.Fragments (N).Last := Remain;
end if;
end loop;
-- Send it!
Send_fragments (Packet);
exception
when Socket.Security_ban =>
Trace.Log ("G2.Transceiver [Send]: Security ban for " &
Socket.Image (Packet.Destination), Trace.Debug);
if Packet /= null then
Free (Packet);
end if;
return;
when E : others =>
Trace.Log ("G2.Transceiver [Send]: " & Trace.Report (E),
Trace.Error);
if Packet /= null then
Free (Packet);
end if;
return;
end;
if Packet.Safe then
Add_safe_packet (Packet);
else
Free (Packet);
end if;
exception
when E : others =>
Trace.Log ("G2.Transceiver.Send: " & Trace.Report (E),
Trace.Error);
if Packet /= null then
Free (Packet);
end if;
end Send;
-------------------
-- Send_fragment --
-------------------
procedure Send_fragment (
Packet : in out Packet_access;
Part : in Interfaces.Unsigned_8)
is
Buffer : Stream_element_array (1 .. Fragment_size + Header_size);
Last : Stream_element_offset;
M : aliased Memory_stream_constrained.Stream_type;
N : Integer := Integer (Part);
begin
Memory_stream_constrained.Create (M, Buffer'Address, Buffer'Length);
Packet.Header.nPart := Part;
Packet_header'Write (M'Access, Packet.Header);
Memory_stream_constrained.Write (M,
Packet.Fragments (N).Data (1 .. Packet.Fragments (N).Last));
Last := Packet.Fragments (N).Last + Header_size;
--Trace.Log (" --> FRG " & To_string (Packet.Header) & "/" &
-- Socket.Image (Packet.Destination), File => S (Logfile));
-- Enqueue fragment in LIFO fashion:
Udp_msgs_size := Udp_msgs_size + Natural (Last);
Udp_message_list.Prepend (
Udp_msgs,
Create (Buffer (1 .. Last), Packet.Destination));
end Send_fragment;
--------------------
-- Send_fragments --
--------------------
procedure Send_fragments (Packet : in out Packet_access) is
begin
for N in 1 .. Packet.Header.nCount loop
Send_fragment (Packet, N);
end loop;
end Send_fragments;
----------------------------------
-- Send_next_fragment_to_socket --
----------------------------------
-- Sends it for real and sets next sending time
procedure Send_next_fragment_to_socket is
use Udp_message_list;
use Ada.Calendar;
Msg : Udp_Message;
Last : Stream_element_offset;
Dropped : Natural := 0;
Now : Time := Clock;
I : Udp_message_list.Iterator_type;
begin
if Is_empty (Udp_msgs) then
Next_sending := Clock + 1.0;
else
Msg := Element (First (Udp_msgs));
Udp_msgs_size := Udp_msgs_size - Msg.Data'Length;
Delete_first (Udp_msgs);
begin
Socket.Send (
Udp.all,
Msg.Data.all,
Last,
Msg.Dest);
exception
when Socket.Security_ban =>
Trace.Log ("G2.Transceiver.Send_next_fragment_to_socket: " &
"Security ban", Trace.Debug);
end;
-- Drop timeouted packets:
loop
I := Udp_message_list.Last (Udp_msgs);
exit when Is_empty (Udp_msgs) or else
Now - Element (I).Date <=
Globals.Options.G2_UdpOutboundTimeout;
Udp_msgs_size := Udp_msgs_size - Element (I).Data'Length;
Delete_last (Udp_msgs); -- Drop it!
Dropped := Dropped + 1;
end loop;
if Dropped > 0 then
Trace.Log ("G2.Transceiver.Sender_task: Dropped" &
Dropped'Img & " too old UDP outbound packets",
Trace.Debug);
end if;
-- Compute next sending:
Next_sending := Clock +
Duration (Msg.Data'Length) / Duration (Parent.BW_out);
--Trace.Log ("Scheduled delay for udp send: " &
-- Duration'Image (
-- Duration (Msg.Data'Length) / Duration (Parent.BW_out)),
-- Trace.Debug);
end if;
end Send_next_fragment_to_socket;
-----------
-- Start --
-----------
procedure Start (
S : in Socket.Object_access;
Queue : in G2.Packet.Queue.Object_access) is
begin
Udp := S;
G2_queue := Queue;
end Start;
end Core_type;
------------------------------------------------------------------------
-- Sender_task --
------------------------------------------------------------------------
task body Sender_task is
begin
select
accept Start;
or
terminate;
end Select;
Main : loop
exit when Globals.Requested_exit;
delay until Parent.Core.Get_send_time;
begin
Parent.Core.Send_next_fragment_to_socket;
exception
when E : others =>
Trace.Log ("G2.Transceiver.Sender: " & Trace.Report (E),
Trace.Error);
end;
end loop Main;
Trace.Log ("G2.Transceiver.Sender exited");
end Sender_task;
------------------------------------------------------------------------
-- Dispatcher_task --
------------------------------------------------------------------------
task body Dispatcher_task is
Cron, Cron2, Cron3, Cron4 : Chronos.Object;
use type Calendar.Time;
P : Packet_access;
Num_alive_packets : Natural;
begin
select
accept Start;
or
terminate;
end select;
Main : loop
exit when Globals.Requested_exit;
begin
-- Check pipes:
Globals.Main_throttle.Start_work;
Chronos.Reset (Cron4);
if Calendar.Clock >= Parent.Core.Get_rcv_time then
while Parent.Core.Available and then
Chronos.Elapsed (Cron4) < 0.9
loop
Parent.Core.Receive_packet;
Globals.Main_throttle.Cycle_work;
end loop;
end if;
-- Drop too old incompletes:
if Chronos.Elapsed (Cron3) > 15.0 then
Chronos.Reset (Cron3);
-- Trace.Log ("G2.Transceiver: starting old packets purge");
loop
Parent.Core.Get_older_inbound (P);
if P /= null then
Trace.Log ("G2.Transceiver: " &
"Purging incoming packet with age " &
Misc.Image (Calendar.Clock - P.Arrived));
end if;
exit when P = null or else
Calendar.Clock - P.Arrived < Receive_timeout;
Parent.Core.Remove_older_inbound;
Globals.Main_throttle.Cycle_work;
end loop;
end if;
-- Report
if Chronos.Elapsed (Cron) > 60.0 then
Chronos.Reset (Cron);
Num_alive_packets := Statistics.Integers.Value (
Statistics.Integers.Integer_value (
Statistics.Object.Get (Stat_alive_packets)));
Trace.Log ("******** G2 Transceiver Report ********");
Trace.Log (" Alive packets: " &
Misc.To_string (Num_alive_packets));
Trace.Log (" Available inbound: " &
Misc.To_string (Parent.Core.Available_inbound));
Trace.Log (" Available outbound: " &
Misc.To_string (Parent.Core.Available_outbound));
Trace.Log (" Pending inbound : " &
Misc.To_string (Parent.Core.Length_by_source));
Trace.Log (" Pending outbound: " &
Misc.To_string (Parent.Core.Length_outbound));
Trace.Log ("***************************************");
end if;
-- Stats
if Chronos.Elapsed (Cron2) > 0.7 then
Chronos.Reset (Cron2);
Statistics.Object.Set (
"Network - G2 - UDPT - Pending inbound",
Statistics.Integers.Create (
Parent.Core.Length_by_source));
Statistics.Object.Set (
"Network - G2 - UDPT - Pending outbound",
Statistics.Integers.Create (Parent.Core.Length_outbound));
Statistics.Object.Set (
"Network - G2 - UDPT - Available inbound",
Statistics.Integers.Create (Parent.Core.Available_inbound));
Statistics.Object.Set (
"Network - G2 - UDPT - Available outbound",
Statistics.Integers.Create (
Parent.Core.Available_outbound));
end if;
exception
when E : others =>
Trace.Log ("G2.Transceiver.Dispatcher: " & Trace.Report (E),
Trace.Error);
end;
delay 0.1;
end loop Main;
Trace.Log ("G2.Transceiver.Dispatcher exited.");
end Dispatcher_task;
------------------------------------------------------------------------
-- Queue_retry --
------------------------------------------------------------------------
procedure Queue_retry (Context : in Event_queue.Context_access) is
R : Retry_access := Retry_access (Context);
begin
R.Transceiver.Core.Retry_fragment (R.Retry);
end Queue_retry;
end Adagio.G2.Transceiver_types.Prot;