问题描述
我有一个简单的 TidTcpserver 在控制台上工作并接受数据。我的问题是当客户端发送流但交换数据的速度非常快时,服务器在 70 行后冻结,服务器的 cpu 负载达到 70%;我不知道如何在每次发送之间不添加睡眠的情况下解决。下面是 Client 和 Server 的示例。你能帮我解决这个问题吗(服务器端)谢谢。
program Srv;
{$I Synopse.inc}
uses
{$IFDEF UNIX}{$IFDEF UseCThreads}
cthreads,{$ENDIF}{$ENDIF}
Classes,SysUtils,CustApp,Generics.Collections,IdTcpserver,IdCustomTcpserver,IdContext,IdGlobal,Db,mORMot,mORMotsqlite3,IdSync,functions,SynCommons,Synsqlite3Static;
type
{ TMyApplication }
TMyApplication = class(TCustomApplication)
var IdTcpserver: TIdTcpserver;
protected
procedure DoRun; override;
procedure ServerOnConnect(AContext: TIdContext);
procedure ServerOnExecute(AContext: TIdContext);
function ReceiveStream(AContext: TIdContext;Size:integer; var AStream: TStream);
public
constructor Create(TheOwner: TComponent); override;
destructor Destroy; override;
end;
type
TLog = class(TIdNotify)
protected
FMsg: string;
procedure DoNotify; override;
public
class procedure LogMsg(const AMsg: string);
end;
{ TMyApplication }
procedure TLog.DoNotify;
var i:integer;
begin
writeln(FMsg);
end;
class procedure TLog.LogMsg(const AMsg: string);
begin
with TLog.Create do
try
FMsg := AMsg;
Notify;
except
Free;
raise;
end;
end;
function TMyApplication.ReceiveStream(AContext: TIdContext; var AStream: TStream)
: Boolean; overload;
var
LSize: LongInt;
begin
Result := True;
try
LSize := AContext.Connection.IOHandler.ReadLongInt();
AContext.Connection.IOHandler.ReadStream(AStream,LSize,False)
AStream.Seek(0,soFromBeginning);
except
Result := False;
end;
end;
procedure TMyApplication.ServerOnExecute(AContext: TIdContext);
var AStream:TMemoryStream;
begin
if (Acontext.Connection.IOHandler.InputBufferIsEmpty) then
begin
TLog.LogMsg('--: '+random(100000).ToString); //After executing Client this line is displayed 70 time and cpu load is from 40 % to 70%
AStream:=TMemoryStream.Create;
try
ReceiveStream(AContext,TStream(AStream));
// .. here we use AStream to execute some stuff
finally
Astream.free;
end;
end;
procedure TMyApplication.ServerOnConnect(AContext: TIdContext);
begin
TLog.LogMsg('connect');
end;
procedure TMyApplication.DoRun;
begin
IdTcpserver := tIdTcpserver.Create;
IdTcpserver.ListenQueue := 15;
IdTcpserver.MaxConnections := 0;
IdTcpserver.TerminateWaitTime := 5000;
with IdTcpserver.Bindings.Add
do begin
IP := '0.0.0.0';
Port := 80;
IPVersion:=Id_IPv4;
end;
IdTcpserver.OnConnect := ServerOnConnect;
IdTcpserver.Ondisconnect := ServerOnDiconnect;
IdTcpserver.OnExecute := ServerOnExecute;
IdTcpserver.Active := True;
while true do
begin
Classes.CheckSynchronize() ;
sleep(10);
end;
readln;
Terminate;
end;
constructor TMyApplication.Create(TheOwner: TComponent);
begin
inherited Create(TheOwner);
StopOnException := True;
end;
destructor TMyApplication.Destroy;
begin
IdTcpserver.Free;
inherited Destroy;
end;
var
Application: TMyApplication;
begin
Application := TMyApplication.Create(nil);
Application.Title := 'My Application';
Application.Run;
Application.Free;
end.
客户
function TForm1.SendStream(AClient: TIdTCPClient; AStream: TStream): Boolean; overload;
var
StreamSize: LongInt;
begin
try
Result := True;
try
AStream.Seek(0,soFromBeginning);
StreamSize := (AStream.Size);
AClient.IOHandler.Write(LongInt(StreamSize));
AClient.IOHandler.WriteBufferOpen;
AClient.IOHandler.Write(AStream,False);
AClient.IOHandler.WriteBufferFlush;
finally
AClient.IOHandler.WriteBufferClose;
end;
except
Result := False;
end;
end;
procedure TForm1.Button1Click(Sender: TObject);
var
Packet:TPacket;
AStream:TMemoryStream;
begin
for i:=0 to 1000 do
begin
Application.ProcessMessages;
With Packet do
begin
MX := random(10000);
MY := random(10000);
end;
AStream:=TMemoryStream.Create;
try
AStream.Write(Packet,SizeOf(TPacket));
SendStream(IdTCPClientCmd,TStream(AStream));
finally
AStream.Free;
end;
end;
end;
解决方法
在服务器端,您的 InputBufferIsEmpty()
检查是反向的。如果客户端发送大量数据,InputBufferIsEmpty()
最终可能会变成 False
,这将导致您的服务器代码进入一个实际上不读取任何内容的紧密的不屈服循环。只需完全摆脱检查并让 ReceiveStream()
阻塞,直到有数据包可供读取。
另外,为什么您将服务器的 ListenQueue
设置为 15,而将 MaxConnections
设置为 0
? MaxConnections=0
将强制服务器立即关闭接受的每个客户端连接,因此 OnExecute
事件永远不会有机会被调用。
在客户端,不需要在每次循环迭代中销毁和重新创建 TMemoryStream
,您应该重用该对象。
但更重要的是,您没有正确使用写缓冲,因此要么修复它,要么摆脱它。我会选择后者,因为您要发送大量小数据包,所以让 TCP 的默认合并为您处理缓冲。
并且 TIdIOHandler.Write(TStream)
/TIdIOHandler.ReadStream()
可以为您交换流大小,您无需手动执行此操作。
试试这个:
服务器
program Srv;
{$I Synopse.inc}
uses
{$IFDEF UNIX}{$IFDEF UseCThreads}
cthreads,{$ENDIF}{$ENDIF}
Classes,SysUtils,CustApp,Generics.Collections,IdTCPServer,IdCustomTCPServer,IdContext,IdGlobal,Db,mORMot,mORMotSQLite3,IdSync,functions,SynCommons,SynSQLite3Static;
type
{ TMyApplication }
TMyApplication = class(TCustomApplication)
var
IdTCPServer: TIdTCPServer;
protected
procedure DoRun; override;
procedure ServerOnConnect(AContext: TIdContext);
procedure ServerOnExecute(AContext: TIdContext);
function ReceiveStream(AContext: TIdContext; Size: Integer; var AStream: TStream);
public
constructor Create(TheOwner: TComponent); override;
destructor Destroy; override;
end;
type
TLog = class(TIdNotify)
protected
FMsg: string;
procedure DoNotify; override;
public
class procedure LogMsg(const AMsg: string);
end;
{ TMyApplication }
procedure TLog.DoNotify;
begin
WriteLn(FMsg);
end;
class procedure TLog.LogMsg(const AMsg: string);
begin
with TLog.Create do
try
FMsg := AMsg;
Notify;
except
Free;
raise;
end;
end;
function TMyApplication.ReceiveStream(AContext: TIdContext; AStream: TStream): Boolean; overload;
begin
try
AContext.Connection.IOHandler.ReadStream(AStream,-1,False);
AStream.Position := 0;
Result := True;
except
Result := False;
end;
end;
procedure TMyApplication.ServerOnExecute(AContext: TIdContext);
var
AStream: TMemoryStream;
begin
AStream := TMemoryStream.Create;
try
if not ReceiveStream(AContext,AStream) then
begin
AContext.Connection.Disconnect;
Exit;
end;
TLog.LogMsg('--: '+random(100000).ToString); //After executing Client this line is displayed 70 time and CPU load is from 40 % to 70%
// .. here we use AStream to execute some stuff
finally
AStream.Free;
end;
end;
procedure TMyApplication.ServerOnConnect(AContext: TIdContext);
begin
TLog.LogMsg('connect');
AContext.Connection.IOHandler.LargeStream := False;
end;
procedure TMyApplication.DoRun;
begin
IdTCPServer := TIdTCPServer.Create;
IdTCPServer.ListenQueue := 15;
IdTCPServer.MaxConnections := 1;
IdTCPServer.TerminateWaitTime := 5000;
with IdTCPServer.Bindings.Add do
begin
IP := '0.0.0.0';
Port := 80;
IPVersion := Id_IPv4;
end;
IdTCPServer.OnConnect := ServerOnConnect;
IdTCPServer.OnDisconnect := ServerOnDiconnect;
IdTCPServer.OnExecute := ServerOnExecute;
IdTCPServer.Active := True;
while True do
begin
Classes.CheckSynchronize();
Sleep(10);
end;
ReadLn;
Terminate;
end;
constructor TMyApplication.Create(TheOwner: TComponent);
begin
inherited Create(TheOwner);
StopOnException := True;
end;
destructor TMyApplication.Destroy;
begin
IdTCPServer.Free;
inherited Destroy;
end;
var
Application: TMyApplication;
begin
Application := TMyApplication.Create(nil);
Application.Title := 'My Application';
Application.Run;
Application.Free;
end.
客户
function TForm1.SendStream(AClient: TIdTCPClient; AStream: TStream): Boolean; overload;
begin
try
AClient.IOHandler.LargeStream := False; // <-- or,set this 1 time after TIdTCPClient.Connect() exits...
AClient.IOHandler.Write(AStream,True);
Result := True;
except
Result := False;
end;
end;
procedure TForm1.Button1Click(Sender: TObject);
var
Packet: TPacket;
AStream: TMemoryStream;
i: Integer;
begin
AStream := TMemoryStream.Create;
try
AStream.Size := SizeOf(TPacket);
for i := 0 to 1000 do
begin
Application.ProcessMessages;
with Packet do
begin
MX := random(10000);
MY := random(10000);
end;
AStream.Position := 0;
AStream.Write(Packet,SizeOf(TPacket));
SendStream(IdTCPClientCmd,AStream);
end;
finally
AStream.Free;
end;
end;